From 29d91da1d279137997a3ea2383eb442645a400f2 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Fri, 1 Jul 2022 13:50:38 -0400 Subject: [PATCH] we can do all the work in one loop more than saving the loop, we save building the WorkflowDag twice which makes LOTS of queries!!! Also, do a bulk update on the WorkflowJobNodes instead of saving in a loop :fear: --- awx/main/scheduler/task_manager.py | 170 +++++++++++++---------------- 1 file changed, 78 insertions(+), 92 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index b85bfb6982..6970518df1 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -30,6 +30,7 @@ from awx.main.models import ( UnifiedJob, WorkflowApproval, WorkflowJob, + WorkflowJobNode, WorkflowJobTemplate, ) from awx.main.scheduler.dag_workflow import WorkflowDAG @@ -132,69 +133,6 @@ class WorkflowManager(TaskBase): @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): - logger.debug(f"=== {workflow_jobs}") - for workflow_job in workflow_jobs: - if workflow_job.cancel_flag: - logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) - continue - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - if spawn_nodes: - logger.debug('Spawning jobs for %s', workflow_job.log_format) - else: - logger.debug('No nodes to spawn for %s', workflow_job.log_format) - for spawn_node in spawn_nodes: - if spawn_node.unified_job_template is None: - continue - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - can_start = True - if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): - workflow_ancestors = job.get_ancestor_workflows() - if spawn_node.unified_job_template in set(workflow_ancestors): - can_start = False - logger.info( - 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - display_list = [spawn_node.unified_job_template] + workflow_ancestors - job.job_explanation = gettext_noop( - "Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" - ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) - else: - logger.debug( - 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( - job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] - ) - ) - if not job._resources_sufficient_for_launch(): - can_start = False - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" - ) - if can_start: - if workflow_job.start_args: - start_args = json.loads(decrypt_field(workflow_job, 'start_args')) - else: - start_args = {} - can_start = job.signal_start(**start_args) - if not can_start: - job.job_explanation = gettext_noop( - "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" - ) - if not can_start: - job.status = 'failed' - job.save(update_fields=['status', 'job_explanation']) - job.websocket_emit_status('failed') - - # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? - # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - def process_finished_workflow_jobs(self, workflow_jobs): result = [] for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) @@ -211,31 +149,90 @@ class WorkflowManager(TaskBase): status_changed = True else: workflow_nodes = dag.mark_dnr_nodes() - for n in workflow_nodes: - n.save(update_fields=['do_not_run']) + WorkflowJobNode.objects.bulk_update(workflow_nodes, ['do_not_run']) + # If workflow is now done, we do special things to mark it as done. is_done = dag.is_workflow_done() - if not is_done: - continue - has_failed, reason = dag.has_workflow_failed() - logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') - result.append(workflow_job.id) - new_status = 'failed' if has_failed else 'successful' - logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) - update_fields = ['status', 'start_args'] - workflow_job.status = new_status - if reason: - logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') - workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") - update_fields.append('job_explanation') - workflow_job.start_args = '' # blank field to remove encrypted passwords - workflow_job.save(update_fields=update_fields) - status_changed = True + if is_done: + has_failed, reason = dag.has_workflow_failed() + logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') + result.append(workflow_job.id) + new_status = 'failed' if has_failed else 'successful' + logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status)) + update_fields = ['status', 'start_args'] + workflow_job.status = new_status + if reason: + logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}') + workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed") + update_fields.append('job_explanation') + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=update_fields) + status_changed = True + if status_changed: if workflow_job.spawned_by_workflow: schedule_task_manager() workflow_job.websocket_emit_status(workflow_job.status) # Operations whose queries rely on modifications made during the atomic scheduling session workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed') + + if workflow_job.status == 'running': + spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.debug('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) + for spawn_node in spawn_nodes: + if spawn_node.unified_job_template is None: + continue + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template in set(workflow_ancestors): + can_start = False + logger.info( + 'Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + display_list = [spawn_node.unified_job_template] + workflow_ancestors + job.job_explanation = gettext_noop( + "Workflow Job spawned from workflow could not start because it " + "would result in recursion (spawn order, most recent first: {})" + ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) + else: + logger.debug( + 'Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.pk, [wa.pk for wa in workflow_ancestors] + ) + ) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was missing a related resource such as project or inventory" + ) + if can_start: + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) + if not can_start: + job.job_explanation = gettext_noop( + "Job spawned from workflow could not start because it " "was not in the right state or required manual credentials" + ) + if not can_start: + job.status = 'failed' + job.save(update_fields=['status', 'job_explanation']) + job.websocket_emit_status('failed') + + # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? + # emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + return result def timeout_approval_node(self): @@ -265,18 +262,7 @@ class WorkflowManager(TaskBase): def _schedule(self): running_workflow_tasks = self.get_tasks() if len(running_workflow_tasks) > 0: - self.process_finished_workflow_jobs(running_workflow_tasks) - - previously_running_workflow_tasks = running_workflow_tasks - running_workflow_tasks = [] - for workflow_job in previously_running_workflow_tasks: - if workflow_job.status == 'running': - running_workflow_tasks.append(workflow_job) - else: - logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) - self.spawn_workflow_graph_jobs(running_workflow_tasks) - self.timeout_approval_node()