From 689a216726ded213bc823cfd163dba3cacb5c6d8 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Thu, 21 Apr 2022 13:05:06 -0400 Subject: [PATCH] move static methods used by task manager (#12050) * move static methods used by task manager These static methods were being used to act on Instance-like objects that were SimpleNamespace objects with the necessary attributes. This change introduces dedicated classes to replace the SimpleNamespace objects and moves the formerlly staticmethods to a place where they are more relevant instead of tacked onto models to which they were only loosly related. Accept in-memory data structure in init methods for tests * initialize remaining capacity AFTER we built map of instances --- awx/main/models/ha.py | 57 ------------ awx/main/scheduler/task_manager.py | 71 +++++---------- awx/main/scheduler/task_manager_models.py | 103 ++++++++++++++++++++++ awx/main/tests/unit/models/test_ha.py | 13 +-- 4 files changed, 133 insertions(+), 111 deletions(-) create mode 100644 awx/main/scheduler/task_manager_models.py diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index dc2fe76e01..a7d6e51a82 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -2,7 +2,6 @@ # All Rights Reserved. from decimal import Decimal -import random import logging import os @@ -161,25 +160,6 @@ class Instance(HasPolicyEditsMixin, BaseModel): def remaining_capacity(self): return self.capacity - self.consumed_capacity - @staticmethod - def update_remaining_capacity(instances, jobs): - """Takes mapping of hostname to SimpleNamespace instance like objects and a list of jobs. - - Computes remaining capacity for all the instances based on currently running and waiting jobs. - - No return value, updates the "remaining_capacity" field on the SimpleNamespace instance like object in place. - For use in the task manager to avoid refetching jobs from the database. - """ - for job in jobs: - if job.status not in ['waiting', 'running']: - continue - control_instance = instances.get(job.controller_node, '') - execution_instance = instances.get(job.execution_node, '') - if execution_instance and execution_instance.node_type in ('hybrid', 'execution'): - instances[job.execution_node].remaining_capacity -= job.task_impact - if control_instance and control_instance.node_type in ('hybrid', 'control'): - instances[job.controller_node].remaining_capacity -= settings.AWX_CONTROL_NODE_TASK_IMPACT - @property def jobs_running(self): return UnifiedJob.objects.filter( @@ -194,12 +174,6 @@ class Instance(HasPolicyEditsMixin, BaseModel): def jobs_total(self): return UnifiedJob.objects.filter(execution_node=self.hostname).count() - @staticmethod - def choose_online_control_plane_node(): - return random.choice( - Instance.objects.filter(enabled=True, capacity__gt=0).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True) - ) - def get_cleanup_task_kwargs(self, **kwargs): """ Produce options to use for the command: ansible-runner worker cleanup @@ -385,37 +359,6 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): class Meta: app_label = 'main' - @staticmethod - def fit_task_to_most_remaining_capacity_instance(task, instances, impact=None, capacity_type=None, add_hybrid_control_cost=False): - impact = impact if impact else task.task_impact - capacity_type = capacity_type if capacity_type else task.capacity_type - instance_most_capacity = None - most_remaining_capacity = -1 - for i in instances: - if i.node_type not in (capacity_type, 'hybrid'): - continue - would_be_remaining = i.remaining_capacity - impact - # hybrid nodes _always_ control their own tasks - if add_hybrid_control_cost and i.node_type == 'hybrid': - would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT - if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity): - instance_most_capacity = i - most_remaining_capacity = would_be_remaining - return instance_most_capacity - - @staticmethod - def find_largest_idle_instance(instances, capacity_type='execution'): - largest_instance = None - for i in instances: - if i.node_type not in (capacity_type, 'hybrid'): - continue - if (hasattr(i, 'jobs_running') and i.jobs_running == 0) or i.remaining_capacity == i.capacity: - if largest_instance is None: - largest_instance = i - elif i.capacity > largest_instance.capacity: - largest_instance = i - return largest_instance - def set_default_policy_fields(self): self.policy_instance_list = [] self.policy_instance_minimum = 0 diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 2301f7ec00..d5c31194f2 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -6,7 +6,6 @@ from datetime import timedelta import logging import uuid import json -from types import SimpleNamespace # Django from django.db import transaction, connection @@ -19,7 +18,6 @@ from awx.main.dispatch.reaper import reap_job from awx.main.models import ( AdHocCommand, Instance, - InstanceGroup, InventorySource, InventoryUpdate, Job, @@ -37,6 +35,8 @@ from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, sch from awx.main.utils.common import create_partition from awx.main.signals import disable_activity_stream from awx.main.scheduler.dependency_graph import DependencyGraph +from awx.main.scheduler.task_manager_models import TaskManagerInstances +from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups from awx.main.utils import decrypt_field @@ -54,49 +54,22 @@ class TaskManager: The NOOP case is short-circuit logic. If the task manager realizes that another instance of the task manager is already running, then it short-circuits and decides not to run. """ - self.graph = dict() # start task limit indicates how many pending jobs can be started on this # .schedule() run. Starting jobs is expensive, and there is code in place to reap # the task manager after 5 minutes. At scale, the task manager can easily take more than # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. self.start_task_limit = settings.START_TASK_LIMIT - self.time_delta_job_explanation = timedelta(seconds=30) def after_lock_init(self, all_sorted_tasks): """ Init AFTER we know this instance of the task manager will run because the lock is acquired. """ - instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop') - self.real_instances = {i.hostname: i for i in instances} - self.controlplane_ig = None self.dependency_graph = DependencyGraph() - - instances_partial = [ - SimpleNamespace( - obj=instance, - node_type=instance.node_type, - remaining_capacity=instance.capacity, # Updated with Instance.update_remaining_capacity by looking at all active tasks - capacity=instance.capacity, - hostname=instance.hostname, - ) - for instance in instances - ] - - instances_by_hostname = {i.hostname: i for i in instances_partial} - - # updates remaining capacity value based on currently running and waiting tasks - Instance.update_remaining_capacity(instances_by_hostname, all_sorted_tasks) - - for rampart_group in InstanceGroup.objects.prefetch_related('instances'): - if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: - self.controlplane_ig = rampart_group - self.graph[rampart_group.name] = dict( - instances=[ - instances_by_hostname[instance.hostname] for instance in rampart_group.instances.all() if instance.hostname in instances_by_hostname - ], - ) + self.instances = TaskManagerInstances(all_sorted_tasks) + self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) + self.controlplane_ig = self.instance_groups.controlplane_ig def job_blocked_by(self, task): # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph @@ -244,7 +217,7 @@ class TaskManager: schedule_task_manager() return result - def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): + def start_task(self, task, instance_group, dependent_tasks=None, instance=None): self.start_task_limit -= 1 if self.start_task_limit == 0: # schedule another run immediately after this task manager @@ -277,10 +250,10 @@ class TaskManager: schedule_task_manager() # at this point we already have control/execution nodes selected for the following cases else: - task.instance_group = rampart_group + task.instance_group = instance_group execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {rampart_group.name}{execution_node_msg}.' + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' ) with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) @@ -478,8 +451,8 @@ class TaskManager: control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT else: control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT - control_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance( - task, self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], impact=control_impact, capacity_type='control' + control_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance( + task, instance_group_name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, impact=control_impact, capacity_type='control' ) if not control_instance: self.task_needs_capacity(task, tasks_to_update_job_explanation) @@ -493,29 +466,29 @@ class TaskManager: task.execution_node = control_instance.hostname control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact) self.dependency_graph.add_job(task) - execution_instance = self.real_instances[control_instance.hostname] + execution_instance = self.instances[control_instance.hostname].obj task.log_lifecycle("controller_node_chosen") task.log_lifecycle("execution_node_chosen") self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True continue - for rampart_group in preferred_instance_groups: - if rampart_group.is_container_group: + for instance_group in preferred_instance_groups: + if instance_group.is_container_group: self.dependency_graph.add_job(task) - self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None) + self.start_task(task, instance_group, task.get_jobs_fail_chain(), None) found_acceptable_queue = True break # TODO: remove this after we have confidence that OCP control nodes are reporting node_type=control if settings.IS_K8S and task.capacity_type == 'execution': - logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name)) + logger.debug("Skipping group {}, task cannot run on control plane".format(instance_group.name)) continue # at this point we know the instance group is NOT a container group # because if it was, it would have started the task and broke out of the loop. - execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance( - task, self.graph[rampart_group.name]['instances'], add_hybrid_control_cost=True - ) or InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'], capacity_type=task.capacity_type) + execution_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance( + task, instance_group_name=instance_group.name, add_hybrid_control_cost=True + ) or self.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type) if execution_instance: task.execution_node = execution_instance.hostname @@ -530,18 +503,18 @@ class TaskManager: task.log_lifecycle("execution_node_chosen") logger.debug( "Starting {} in group {} instance {} (remaining_capacity={})".format( - task.log_format, rampart_group.name, execution_instance.hostname, execution_instance.remaining_capacity + task.log_format, instance_group.name, execution_instance.hostname, execution_instance.remaining_capacity ) ) - execution_instance = self.real_instances[execution_instance.hostname] + execution_instance = self.instances[execution_instance.hostname].obj self.dependency_graph.add_job(task) - self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) + self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True break else: logger.debug( "No instance available in group {} to run job {} w/ capacity requirement {}".format( - rampart_group.name, task.log_format, task.task_impact + instance_group.name, task.log_format, task.task_impact ) ) if not found_acceptable_queue: diff --git a/awx/main/scheduler/task_manager_models.py b/awx/main/scheduler/task_manager_models.py new file mode 100644 index 0000000000..556cc94f64 --- /dev/null +++ b/awx/main/scheduler/task_manager_models.py @@ -0,0 +1,103 @@ +# Copyright (c) 2022 Ansible by Red Hat +# All Rights Reserved. +import logging + +from django.conf import settings + +from awx.main.models import ( + Instance, + InstanceGroup, +) + +logger = logging.getLogger('awx.main.scheduler') + + +class TaskManagerInstance: + """A class representing minimal data the task manager needs to represent an Instance.""" + + def __init__(self, obj): + self.obj = obj + self.node_type = obj.node_type + self.remaining_capacity = obj.capacity + self.capacity = obj.capacity + self.hostname = obj.hostname + + +class TaskManagerInstances: + def __init__(self, active_tasks, instances=None): + self.instances_by_hostname = dict() + if instances is None: + instances = ( + Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop').only('node_type', 'capacity', 'hostname', 'enabled') + ) + for instance in instances: + self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance) + + # initialize remaining capacity based on currently waiting and running tasks + for task in active_tasks: + if task.status not in ['waiting', 'running']: + continue + control_instance = self.instances_by_hostname.get(task.controller_node, '') + execution_instance = self.instances_by_hostname.get(task.execution_node, '') + if execution_instance and execution_instance.node_type in ('hybrid', 'execution'): + self.instances_by_hostname[task.execution_node].remaining_capacity -= task.task_impact + if control_instance and control_instance.node_type in ('hybrid', 'control'): + self.instances_by_hostname[task.controller_node].remaining_capacity -= settings.AWX_CONTROL_NODE_TASK_IMPACT + + def __getitem__(self, hostname): + return self.instances_by_hostname.get(hostname) + + def __contains__(self, hostname): + return hostname in self.instances_by_hostname + + +class TaskManagerInstanceGroups: + """A class representing minimal data the task manager needs to represent an InstanceGroup.""" + + def __init__(self, instances_by_hostname=None, instance_groups=None): + self.instance_groups = dict() + self.controlplane_ig = None + + if instance_groups is not None: # for testing + self.instance_groups = instance_groups + else: + for instance_group in InstanceGroup.objects.prefetch_related('instances').only('name', 'instances'): + if instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: + self.controlplane_ig = instance_group + self.instance_groups[instance_group.name] = dict( + instances=[ + instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname + ], + ) + + def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False): + impact = impact if impact else task.task_impact + capacity_type = capacity_type if capacity_type else task.capacity_type + instance_most_capacity = None + most_remaining_capacity = -1 + instances = self.instance_groups[instance_group_name]['instances'] + + for i in instances: + if i.node_type not in (capacity_type, 'hybrid'): + continue + would_be_remaining = i.remaining_capacity - impact + # hybrid nodes _always_ control their own tasks + if add_hybrid_control_cost and i.node_type == 'hybrid': + would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT + if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity): + instance_most_capacity = i + most_remaining_capacity = would_be_remaining + return instance_most_capacity + + def find_largest_idle_instance(self, instance_group_name, capacity_type='execution'): + largest_instance = None + instances = self.instance_groups[instance_group_name]['instances'] + for i in instances: + if i.node_type not in (capacity_type, 'hybrid'): + continue + if (hasattr(i, 'jobs_running') and i.jobs_running == 0) or i.remaining_capacity == i.capacity: + if largest_instance is None: + largest_instance = i + elif i.capacity > largest_instance.capacity: + largest_instance = i + return largest_instance diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py index 86b8d65aaf..e21d980dd5 100644 --- a/awx/main/tests/unit/models/test_ha.py +++ b/awx/main/tests/unit/models/test_ha.py @@ -4,6 +4,7 @@ from unittest.mock import Mock from decimal import Decimal from awx.main.models import InstanceGroup, Instance +from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups @pytest.mark.parametrize('capacity_adjustment', [0.0, 0.25, 0.5, 0.75, 1, 1.5, 3]) @@ -59,9 +60,10 @@ class TestInstanceGroup(object): ], ) def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): - ig = InstanceGroup(id=10) + InstanceGroup(id=10) + tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances}}) - instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances) + instance_picked = tm_igs.fit_task_to_most_remaining_capacity_instance(task, 'controlplane') if instance_fit_index is None: assert instance_picked is None, reason @@ -82,13 +84,14 @@ class TestInstanceGroup(object): def filter_offline_instances(*args): return filter(lambda i: i.capacity > 0, instances) - ig = InstanceGroup(id=10) + InstanceGroup(id=10) instances_online_only = filter_offline_instances(instances) + tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances_online_only}}) if instance_fit_index is None: - assert ig.find_largest_idle_instance(instances_online_only) is None, reason + assert tm_igs.find_largest_idle_instance('controlplane') is None, reason else: - assert ig.find_largest_idle_instance(instances_online_only) == instances[instance_fit_index], reason + assert tm_igs.find_largest_idle_instance('controlplane') == instances[instance_fit_index], reason def test_cleanup_params_defaults():