From 957b2b71886126760e7c8652fe284d07303f4230 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Mon, 11 Jul 2022 15:07:55 -0400 Subject: [PATCH] Cache preferred instance groups When creating unified job, stash the list of pk values from the instance groups returned from preferred_instance_groups so that the task management system does not need to call out to this method repeatedly. .preferred_instance_groups_cache is the new field --- ...xpires.py => 0165_task_manager_refactor.py} | 7 +++++++ awx/main/models/ad_hoc_commands.py | 3 +++ awx/main/models/jobs.py | 3 +++ awx/main/models/projects.py | 3 +++ awx/main/models/unified_jobs.py | 18 ++++++++++++++++-- awx/main/models/workflow.py | 6 ++++++ awx/main/scheduler/task_manager.py | 12 ++++++++---- awx/main/scheduler/task_manager_models.py | 16 ++++++++++++++++ 8 files changed, 62 insertions(+), 6 deletions(-) rename awx/main/migrations/{0165_workflowapproval_expires.py => 0165_task_manager_refactor.py} (66%) diff --git a/awx/main/migrations/0165_workflowapproval_expires.py b/awx/main/migrations/0165_task_manager_refactor.py similarity index 66% rename from awx/main/migrations/0165_workflowapproval_expires.py rename to awx/main/migrations/0165_task_manager_refactor.py index b4fa0c6f39..a8e51e6e2f 100644 --- a/awx/main/migrations/0165_workflowapproval_expires.py +++ b/awx/main/migrations/0165_task_manager_refactor.py @@ -20,4 +20,11 @@ class Migration(migrations.Migration): null=True, ), ), + migrations.AddField( + model_name='unifiedjob', + name='preferred_instance_groups_cache', + field=models.JSONField( + blank=True, default=None, editable=False, help_text='A cached list with pk values from preferred instance groups.', null=True + ), + ), ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index d0608bd652..f45a03e0be 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -90,6 +90,9 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): extra_vars_dict = VarsDictProperty('extra_vars', True) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + def clean_inventory(self): inv = self.inventory if not inv: diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index b1926435b1..622cfb009c 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -1213,6 +1213,9 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): extra_vars_dict = VarsDictProperty('extra_vars', True) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + @classmethod def _get_parent_field_name(cls): return 'system_job_template' diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 580f029a0b..f2163c69ae 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -513,6 +513,9 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage help_text=_('The SCM Revision discovered by this update for the given project and branch.'), ) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + def _get_parent_field_name(self): return 'project' diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index f1702cddba..30685d9371 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -381,6 +381,10 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn unified_job.survey_passwords = new_job_passwords kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch + unified_job.preferred_instance_groups_cache = [ig.pk for ig in unified_job.preferred_instance_groups] + + unified_job._set_default_dependencies_processed() + from awx.main.signals import disable_activity_stream, activity_stream_create with disable_activity_stream(): @@ -693,6 +697,13 @@ class UnifiedJob( on_delete=polymorphic.SET_NULL, help_text=_('The Instance group the job was run under'), ) + preferred_instance_groups_cache = models.JSONField( + blank=True, + null=True, + default=None, + editable=False, + help_text=_("A cached list with pk values from preferred instance groups."), + ) organization = models.ForeignKey( 'Organization', blank=True, @@ -808,6 +819,9 @@ class UnifiedJob( update_fields = self._update_parent_instance_no_save(parent_instance) parent_instance.save(update_fields=update_fields) + def _set_default_dependencies_processed(self): + pass + def save(self, *args, **kwargs): """Save the job, with current status, to the database. Ensure that all data is consistent before doing so. @@ -1514,8 +1528,8 @@ class UnifiedJob( 'state': state, 'work_unit_id': self.work_unit_id, } - if self.unified_job_template: - extra["template_name"] = self.unified_job_template.name + if self.name: + extra["task_name"] = self.name if state == "blocked" and blocked_by: blocked_by_msg = f"{blocked_by._meta.model_name}-{blocked_by.id}" msg = f"{self._meta.model_name}-{self.id} blocked by {blocked_by_msg}" diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 3664331f44..6d59670a27 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -623,6 +623,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio ) is_sliced_job = models.BooleanField(default=False) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + @property def workflow_nodes(self): return self.workflow_job_nodes @@ -800,6 +803,9 @@ class WorkflowApproval(UnifiedJob, JobNotificationMixin): on_delete=models.SET_NULL, ) + def _set_default_dependencies_processed(self): + self.dependencies_processed = True + @classmethod def _get_unified_job_template_class(cls): return WorkflowApprovalTemplate diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index a1fa8289ec..2ef89a74bc 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -512,6 +512,12 @@ class TaskManager(TaskBase): ScheduleTaskManager().schedule() from awx.main.tasks.system import handle_work_error, handle_work_success + # update capacity for control node and execution node + if task.controller_node: + self.instances[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) + if task.execution_node: + self.instances[task.execution_node].consume_capacity(task.task_impact) + dependent_tasks = dependent_tasks or [] task_actual = { @@ -598,7 +604,8 @@ class TaskManager(TaskBase): continue found_acceptable_queue = False - preferred_instance_groups = task.preferred_instance_groups + + preferred_instance_groups = self.instance_groups.get_instance_groups_from_task_cache(task) # Determine if there is control capacity for the task if task.capacity_type == 'control': @@ -618,7 +625,6 @@ class TaskManager(TaskBase): # All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups if task.capacity_type == 'control': task.execution_node = control_instance.hostname - control_instance.consume_capacity(control_impact) execution_instance = self.instances[control_instance.hostname].obj task.log_lifecycle("controller_node_chosen") task.log_lifecycle("execution_node_chosen") @@ -649,9 +655,7 @@ class TaskManager(TaskBase): control_instance = execution_instance task.controller_node = execution_instance.hostname - control_instance.consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) task.log_lifecycle("controller_node_chosen") - execution_instance.consume_capacity(task.task_impact) task.log_lifecycle("execution_node_chosen") logger.debug( "Starting {} in group {} instance {} (remaining_capacity={})".format( diff --git a/awx/main/scheduler/task_manager_models.py b/awx/main/scheduler/task_manager_models.py index 678e545152..cade939343 100644 --- a/awx/main/scheduler/task_manager_models.py +++ b/awx/main/scheduler/task_manager_models.py @@ -67,6 +67,7 @@ class TaskManagerInstanceGroups: def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None): self.instance_groups = dict() self.controlplane_ig = None + self.pk_ig_map = dict() if instance_groups is not None: # for testing self.instance_groups = instance_groups @@ -81,6 +82,7 @@ class TaskManagerInstanceGroups: instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname ], ) + self.pk_ig_map[instance_group.pk] = instance_group def get_remaining_capacity(self, group_name): instances = self.instance_groups[group_name]['instances'] @@ -121,3 +123,17 @@ class TaskManagerInstanceGroups: elif i.capacity > largest_instance.capacity: largest_instance = i return largest_instance + + def get_instance_groups_from_task_cache(self, task): + igs = [] + if task.preferred_instance_groups_cache: + for pk in task.preferred_instance_groups_cache: + ig = self.pk_ig_map.get(pk, None) + if ig: + igs.append(ig) + else: + logger.warn(f"Unknown instance group with pk {pk} for task {task}") + if len(igs) == 0: + logger.warn(f"No instance groups in cache exist, defaulting to global instance groups for task {task}") + return task.global_instance_groups + return igs