From 76d76d13b0cad260f23962fe25c82c86b5b43e6e Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Mon, 11 Jul 2022 08:06:51 -0400 Subject: [PATCH] Start pending workflows in TaskManager we had tried doing this in the WorkflowManager, but we decided that we want to handle ALL pending jobs and "soft blockers" to jobs with the TaskManager/DependencyGraph and not duplicate that logic in the WorkflowManager. --- awx/main/scheduler/task_manager.py | 109 ++++++++++++++--------------- 1 file changed, 54 insertions(+), 55 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8c18f5343a..790cd04c05 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -15,6 +15,7 @@ from django.db import transaction, connection from django.utils.translation import gettext_lazy as _, gettext_noop from django.utils.timezone import now as tz_now from django.conf import settings +from django.contrib.contenttypes.models import ContentType # AWX from awx.main.dispatch.reaper import reap_job @@ -84,14 +85,17 @@ class TaskBase: return True return False - def get_running_workflow_jobs(self): - graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs - @timeit def get_tasks(self, filter_args): - qs = UnifiedJob.objects.filter(**filter_args).exclude(launch_type='sync').order_by('created').prefetch_related('instance_group') - return [task for task in qs if not type(task) is WorkflowJob] + wf_approval_ctype_id = ContentType.objects.get_for_model(WorkflowApproval).id + qs = ( + UnifiedJob.objects.filter(**filter_args) + .exclude(launch_type='sync') + .exclude(polymorphic_ctype_id=wf_approval_ctype_id) + .order_by('created') + .prefetch_related('instance_group') + ) + self.all_tasks = [t for t in qs] def record_aggregate_metrics(self, *args): if not settings.IS_TESTING(): @@ -137,9 +141,9 @@ class WorkflowManager(TaskBase): super().__init__(prefix="workflow_manager") @timeit - def spawn_workflow_graph_jobs(self, workflow_jobs): + def spawn_workflow_graph_jobs(self): result = [] - for workflow_job in workflow_jobs: + for workflow_job in self.all_tasks: if self.timed_out(): logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early") self.schedule_manager.schedule() @@ -272,34 +276,13 @@ class WorkflowManager(TaskBase): @timeit def get_tasks(self): - workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')] - workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')] - workflow_to_start = [] - running_wfjt_ids = {wf.unified_job_template_id for wf in workflow_jobs_running} - for wf in workflow_jobs_pending: - if wf.allow_simultaneous or wf.unified_job_template_id not in running_wfjt_ids: - wf.status = 'running' - workflow_to_start.append(wf) - running_wfjt_ids.add(wf.unified_job_template_id) - logger.debug('Transitioning %s to running status.', wf.log_format) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - break - if self.timed_out(): - logger.warning(f"Workflow manager has reached time out processing pending workflows, exiting loop early") - break - else: - logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format) - - WorkflowJob.objects.bulk_update(workflow_to_start, ['status']) - workflow_jobs_running.extend(workflow_to_start) - return workflow_jobs_running + self.all_tasks = [wf for wf in WorkflowJob.objects.filter(status='running')] @timeit def _schedule(self): - running_workflow_tasks = self.get_tasks() - if len(running_workflow_tasks) > 0: - self.spawn_workflow_graph_jobs(running_workflow_tasks) + self.get_tasks() + if len(self.all_tasks) > 0: + self.spawn_workflow_graph_jobs() self.timeout_approval_node() @@ -334,12 +317,12 @@ class DependencyManager(TaskBase): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) - def get_inventory_source_tasks(self, all_sorted_tasks): + def get_inventory_source_tasks(self): inventory_ids = set() - for task in all_sorted_tasks: + for task in self.all_tasks: if isinstance(task, Job): inventory_ids.add(task.inventory_id) - return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") @@ -473,17 +456,17 @@ class DependencyManager(TaskBase): return created_dependencies - def process_tasks(self, all_sorted_tasks): - self.generate_dependencies(all_sorted_tasks) - self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) + def process_tasks(self): + self.generate_dependencies(self.all_tasks) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks)) @timeit def _schedule(self): - all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) - if len(all_sorted_tasks) > 0: - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - self.process_tasks(all_sorted_tasks) + if len(self.all_tasks) > 0: + self.get_inventory_source_tasks() + self.process_tasks() ScheduleTaskManager().schedule() @@ -507,12 +490,12 @@ class TaskManager(TaskBase): self.schedule_manager = ScheduleTaskManager() super().__init__(prefix="task_manager") - def after_lock_init(self, all_sorted_tasks): + def after_lock_init(self): """ Init AFTER we know this instance of the task manager will run because the lock is acquired. """ self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) + self.instances = TaskManagerInstances(self.all_tasks) self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) self.controlplane_ig = self.instance_groups.controlplane_ig @@ -572,10 +555,19 @@ class TaskManager(TaskBase): task.save() # TODO: run error handler to fail sub-tasks and send notifications else: + if type(task) is WorkflowJob: + task.status = 'running' + task.send_notification_templates('running') + logger.debug('Transitioning %s to running status.', task.log_format) + # Call this to ensure Workflow nodes get spawned in timely manner + ScheduleWorkflowManager().schedule() # at this point we already have control/execution nodes selected for the following cases - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug(f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.') + else: + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + ) with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) task.save() @@ -623,6 +615,13 @@ class TaskManager(TaskBase): tasks_to_update_job_explanation.append(task) continue + if isinstance(task, WorkflowJob): + # Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph. + # Double check that using just the DependencyGraph works for Workflows and Sliced Jobs. + self.dependency_graph.add_job(task) + self.start_task(task, None, task.get_jobs_fail_chain(), None) + continue + found_acceptable_queue = False preferred_instance_groups = task.preferred_instance_groups @@ -725,22 +724,22 @@ class TaskManager(TaskBase): logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}') reap_job(j, 'failed') - def process_tasks(self, all_sorted_tasks): - running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] + def process_tasks(self): + running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']] self.process_running_tasks(running_tasks) self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks)) - pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] + pending_tasks = [t for t in self.all_tasks if t.status == 'pending'] self.process_pending_tasks(pending_tasks) self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) @timeit def _schedule(self): - all_sorted_tasks = self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) + self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) - self.after_lock_init(all_sorted_tasks) + self.after_lock_init() self.reap_jobs_from_orphaned_instances() - if len(all_sorted_tasks) > 0: - self.process_tasks(all_sorted_tasks) + if len(self.all_tasks) > 0: + self.process_tasks()