diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8c18f5343a..790cd04c05 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -15,6 +15,7 @@ from django.db import transaction, connection from django.utils.translation import gettext_lazy as _, gettext_noop from django.utils.timezone import now as tz_now from django.conf import settings +from django.contrib.contenttypes.models import ContentType # AWX from awx.main.dispatch.reaper import reap_job @@ -84,14 +85,17 @@ class TaskBase: return True return False - def get_running_workflow_jobs(self): - graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs - @timeit def get_tasks(self, filter_args): - qs = UnifiedJob.objects.filter(**filter_args).exclude(launch_type='sync').order_by('created').prefetch_related('instance_group') - return [task for task in qs if not type(task) is WorkflowJob] + wf_approval_ctype_id = ContentType.objects.get_for_model(WorkflowApproval).id + qs = ( + UnifiedJob.objects.filter(**filter_args) + .exclude(launch_type='sync') + .exclude(polymorphic_ctype_id=wf_approval_ctype_id) + .order_by('created') + .prefetch_related('instance_group') + ) + self.all_tasks = [t for t in qs] def record_aggregate_metrics(self, *args): if not settings.IS_TESTING(): @@ -137,9 +141,9 @@ class WorkflowManager(TaskBase): super().__init__(prefix="workflow_manager") @timeit - def spawn_workflow_graph_jobs(self, workflow_jobs): + def spawn_workflow_graph_jobs(self): result = [] - for workflow_job in workflow_jobs: + for workflow_job in self.all_tasks: if self.timed_out(): logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early") self.schedule_manager.schedule() @@ -272,34 +276,13 @@ class WorkflowManager(TaskBase): @timeit def get_tasks(self): - workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')] - workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')] - workflow_to_start = [] - running_wfjt_ids = {wf.unified_job_template_id for wf in workflow_jobs_running} - for wf in workflow_jobs_pending: - if wf.allow_simultaneous or wf.unified_job_template_id not in running_wfjt_ids: - wf.status = 'running' - workflow_to_start.append(wf) - running_wfjt_ids.add(wf.unified_job_template_id) - logger.debug('Transitioning %s to running status.', wf.log_format) - self.start_task_limit -= 1 - if self.start_task_limit == 0: - break - if self.timed_out(): - logger.warning(f"Workflow manager has reached time out processing pending workflows, exiting loop early") - break - else: - logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format) - - WorkflowJob.objects.bulk_update(workflow_to_start, ['status']) - workflow_jobs_running.extend(workflow_to_start) - return workflow_jobs_running + self.all_tasks = [wf for wf in WorkflowJob.objects.filter(status='running')] @timeit def _schedule(self): - running_workflow_tasks = self.get_tasks() - if len(running_workflow_tasks) > 0: - self.spawn_workflow_graph_jobs(running_workflow_tasks) + self.get_tasks() + if len(self.all_tasks) > 0: + self.spawn_workflow_graph_jobs() self.timeout_approval_node() @@ -334,12 +317,12 @@ class DependencyManager(TaskBase): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) - def get_inventory_source_tasks(self, all_sorted_tasks): + def get_inventory_source_tasks(self): inventory_ids = set() - for task in all_sorted_tasks: + for task in self.all_tasks: if isinstance(task, Job): inventory_ids.add(task.inventory_id) - return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") @@ -473,17 +456,17 @@ class DependencyManager(TaskBase): return created_dependencies - def process_tasks(self, all_sorted_tasks): - self.generate_dependencies(all_sorted_tasks) - self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks)) + def process_tasks(self): + self.generate_dependencies(self.all_tasks) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks)) @timeit def _schedule(self): - all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) + self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) - if len(all_sorted_tasks) > 0: - self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks) - self.process_tasks(all_sorted_tasks) + if len(self.all_tasks) > 0: + self.get_inventory_source_tasks() + self.process_tasks() ScheduleTaskManager().schedule() @@ -507,12 +490,12 @@ class TaskManager(TaskBase): self.schedule_manager = ScheduleTaskManager() super().__init__(prefix="task_manager") - def after_lock_init(self, all_sorted_tasks): + def after_lock_init(self): """ Init AFTER we know this instance of the task manager will run because the lock is acquired. """ self.dependency_graph = DependencyGraph() - self.instances = TaskManagerInstances(all_sorted_tasks) + self.instances = TaskManagerInstances(self.all_tasks) self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances) self.controlplane_ig = self.instance_groups.controlplane_ig @@ -572,10 +555,19 @@ class TaskManager(TaskBase): task.save() # TODO: run error handler to fail sub-tasks and send notifications else: + if type(task) is WorkflowJob: + task.status = 'running' + task.send_notification_templates('running') + logger.debug('Transitioning %s to running status.', task.log_format) + # Call this to ensure Workflow nodes get spawned in timely manner + ScheduleWorkflowManager().schedule() # at this point we already have control/execution nodes selected for the following cases - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug(f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.') + else: + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug( + f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' + ) with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) task.save() @@ -623,6 +615,13 @@ class TaskManager(TaskBase): tasks_to_update_job_explanation.append(task) continue + if isinstance(task, WorkflowJob): + # Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph. + # Double check that using just the DependencyGraph works for Workflows and Sliced Jobs. + self.dependency_graph.add_job(task) + self.start_task(task, None, task.get_jobs_fail_chain(), None) + continue + found_acceptable_queue = False preferred_instance_groups = task.preferred_instance_groups @@ -725,22 +724,22 @@ class TaskManager(TaskBase): logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}') reap_job(j, 'failed') - def process_tasks(self, all_sorted_tasks): - running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] + def process_tasks(self): + running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']] self.process_running_tasks(running_tasks) self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks)) - pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] + pending_tasks = [t for t in self.all_tasks if t.status == 'pending'] self.process_pending_tasks(pending_tasks) self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks)) @timeit def _schedule(self): - all_sorted_tasks = self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) + self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True)) - self.after_lock_init(all_sorted_tasks) + self.after_lock_init() self.reap_jobs_from_orphaned_instances() - if len(all_sorted_tasks) > 0: - self.process_tasks(all_sorted_tasks) + if len(self.all_tasks) > 0: + self.process_tasks()