Start pending workflows in TaskManager

we had tried doing this in the WorkflowManager, but we decided that
we want to handle ALL pending jobs and "soft blockers" to jobs with the
TaskManager/DependencyGraph and not duplicate that logic in the
WorkflowManager.
This commit is contained in:
Elijah DeLee 2022-07-11 08:06:51 -04:00 committed by Seth Foster
parent e603c23b40
commit 76d76d13b0
No known key found for this signature in database
GPG Key ID: 86E90D96F7184028

View File

@ -15,6 +15,7 @@ from django.db import transaction, connection
from django.utils.translation import gettext_lazy as _, gettext_noop
from django.utils.timezone import now as tz_now
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
# AWX
from awx.main.dispatch.reaper import reap_job
@ -84,14 +85,17 @@ class TaskBase:
return True
return False
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
@timeit
def get_tasks(self, filter_args):
qs = UnifiedJob.objects.filter(**filter_args).exclude(launch_type='sync').order_by('created').prefetch_related('instance_group')
return [task for task in qs if not type(task) is WorkflowJob]
wf_approval_ctype_id = ContentType.objects.get_for_model(WorkflowApproval).id
qs = (
UnifiedJob.objects.filter(**filter_args)
.exclude(launch_type='sync')
.exclude(polymorphic_ctype_id=wf_approval_ctype_id)
.order_by('created')
.prefetch_related('instance_group')
)
self.all_tasks = [t for t in qs]
def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
@ -137,9 +141,9 @@ class WorkflowManager(TaskBase):
super().__init__(prefix="workflow_manager")
@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
def spawn_workflow_graph_jobs(self):
result = []
for workflow_job in workflow_jobs:
for workflow_job in self.all_tasks:
if self.timed_out():
logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early")
self.schedule_manager.schedule()
@ -272,34 +276,13 @@ class WorkflowManager(TaskBase):
@timeit
def get_tasks(self):
workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')]
workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')]
workflow_to_start = []
running_wfjt_ids = {wf.unified_job_template_id for wf in workflow_jobs_running}
for wf in workflow_jobs_pending:
if wf.allow_simultaneous or wf.unified_job_template_id not in running_wfjt_ids:
wf.status = 'running'
workflow_to_start.append(wf)
running_wfjt_ids.add(wf.unified_job_template_id)
logger.debug('Transitioning %s to running status.', wf.log_format)
self.start_task_limit -= 1
if self.start_task_limit == 0:
break
if self.timed_out():
logger.warning(f"Workflow manager has reached time out processing pending workflows, exiting loop early")
break
else:
logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format)
WorkflowJob.objects.bulk_update(workflow_to_start, ['status'])
workflow_jobs_running.extend(workflow_to_start)
return workflow_jobs_running
self.all_tasks = [wf for wf in WorkflowJob.objects.filter(status='running')]
@timeit
def _schedule(self):
running_workflow_tasks = self.get_tasks()
if len(running_workflow_tasks) > 0:
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.get_tasks()
if len(self.all_tasks) > 0:
self.spawn_workflow_graph_jobs()
self.timeout_approval_node()
@ -334,12 +317,12 @@ class DependencyManager(TaskBase):
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
def get_inventory_source_tasks(self, all_sorted_tasks):
def get_inventory_source_tasks(self):
inventory_ids = set()
for task in all_sorted_tasks:
for task in self.all_tasks:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]
def get_latest_inventory_update(self, inventory_source):
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
@ -473,17 +456,17 @@ class DependencyManager(TaskBase):
return created_dependencies
def process_tasks(self, all_sorted_tasks):
self.generate_dependencies(all_sorted_tasks)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(all_sorted_tasks))
def process_tasks(self):
self.generate_dependencies(self.all_tasks)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks))
@timeit
def _schedule(self):
all_sorted_tasks = self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))
self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))
if len(all_sorted_tasks) > 0:
self.all_inventory_sources = self.get_inventory_source_tasks(all_sorted_tasks)
self.process_tasks(all_sorted_tasks)
if len(self.all_tasks) > 0:
self.get_inventory_source_tasks()
self.process_tasks()
ScheduleTaskManager().schedule()
@ -507,12 +490,12 @@ class TaskManager(TaskBase):
self.schedule_manager = ScheduleTaskManager()
super().__init__(prefix="task_manager")
def after_lock_init(self, all_sorted_tasks):
def after_lock_init(self):
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
self.dependency_graph = DependencyGraph()
self.instances = TaskManagerInstances(all_sorted_tasks)
self.instances = TaskManagerInstances(self.all_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
@ -572,10 +555,19 @@ class TaskManager(TaskBase):
task.save()
# TODO: run error handler to fail sub-tasks and send notifications
else:
if type(task) is WorkflowJob:
task.status = 'running'
task.send_notification_templates('running')
logger.debug('Transitioning %s to running status.', task.log_format)
# Call this to ensure Workflow nodes get spawned in timely manner
ScheduleWorkflowManager().schedule()
# at this point we already have control/execution nodes selected for the following cases
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.')
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
)
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
@ -623,6 +615,13 @@ class TaskManager(TaskBase):
tasks_to_update_job_explanation.append(task)
continue
if isinstance(task, WorkflowJob):
# Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph.
# Double check that using just the DependencyGraph works for Workflows and Sliced Jobs.
self.dependency_graph.add_job(task)
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue
found_acceptable_queue = False
preferred_instance_groups = task.preferred_instance_groups
@ -725,22 +724,22 @@ class TaskManager(TaskBase):
logger.error(f'{j.execution_node} is not a registered instance; reaping {j.log_format}')
reap_job(j, 'failed')
def process_tasks(self, all_sorted_tasks):
running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']]
def process_tasks(self):
running_tasks = [t for t in self.all_tasks if t.status in ['waiting', 'running']]
self.process_running_tasks(running_tasks)
self.subsystem_metrics.inc(f"{self.prefix}_running_processed", len(running_tasks))
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
pending_tasks = [t for t in self.all_tasks if t.status == 'pending']
self.process_pending_tasks(pending_tasks)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(pending_tasks))
@timeit
def _schedule(self):
all_sorted_tasks = self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True))
self.get_tasks(dict(status__in=["pending", "waiting", "running"], dependencies_processed=True))
self.after_lock_init(all_sorted_tasks)
self.after_lock_init()
self.reap_jobs_from_orphaned_instances()
if len(all_sorted_tasks) > 0:
self.process_tasks(all_sorted_tasks)
if len(self.all_tasks) > 0:
self.process_tasks()