Manage pending workflow jobs in Workflow Manager

get_tasks uses UnifiedJob
Additionally, make local overrides run after development settings
This commit is contained in:
Seth Foster
2022-07-01 14:46:29 -04:00
parent 29d91da1d2
commit ff118f2177
3 changed files with 33 additions and 48 deletions

View File

@@ -76,17 +76,8 @@ class TaskBase:
@timeit
def get_tasks(self, filter_args):
jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')]
inventory_updates_qs = InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group')
inventory_updates = [i for i in inventory_updates_qs]
# Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs.
project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')]
system_jobs = [s for s in SystemJob.objects.filter(**filter_args).prefetch_related('instance_group')]
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(**filter_args).prefetch_related('instance_group')]
workflow_jobs = [w for w in WorkflowJob.objects.filter(**filter_args)]
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created)
logger.debug(f"{self.prefix} {all_tasks}")
return all_tasks
qs = UnifiedJob.objects.filter(**filter_args).exclude(launch_type='sync').order_by('created').prefetch_related('instance_group')
return [task for task in qs if not type(task) is WorkflowJob]
def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
@@ -256,7 +247,18 @@ class WorkflowManager(TaskBase):
@timeit
def get_tasks(self):
return self.get_running_workflow_jobs()
workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')]
workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')]
workflow_to_start = []
running_workflow_pk = {wf.pk for wf in workflow_jobs_running}
for wf in workflow_jobs_pending:
if wf.allow_simultaneous or wf.pk not in running_workflow_pk:
wf.status = 'running'
workflow_to_start.append(wf)
WorkflowJob.objects.bulk_update(workflow_to_start, ['status'])
workflow_jobs_running.extend(workflow_to_start)
return workflow_jobs_running
@timeit
def _schedule(self):
@@ -536,18 +538,10 @@ class TaskManager(TaskBase):
task.save()
# TODO: run error handler to fail sub-tasks and send notifications
else:
if type(task) is WorkflowJob:
task.status = 'running'
task.send_notification_templates('running')
logger.debug('Transitioning %s to running status.', task.log_format)
schedule_task_manager()
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
)
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.')
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
@@ -577,7 +571,6 @@ class TaskManager(TaskBase):
@timeit
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
for task in pending_tasks:
if self.start_task_limit <= 0:
@@ -596,16 +589,6 @@ class TaskManager(TaskBase):
found_acceptable_queue = False
preferred_instance_groups = task.preferred_instance_groups
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous:
logger.debug("{} is blocked from running, workflow already running".format(task.log_format))
continue
else:
running_workflow_templates.add(task.unified_job_template_id)
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue
# Determine if there is control capacity for the task
if task.capacity_type == 'control':
control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT