we can do all the work in one loop

more than saving the loop, we save building the WorkflowDag twice which
makes LOTS of queries!!!

Also, do a bulk update on the WorkflowJobNodes instead of saving in a
loop :fear:
This commit is contained in:
Elijah DeLee
2022-07-01 13:50:38 -04:00
committed by Seth Foster
parent ad08eafb9a
commit 29d91da1d2

View File

@@ -30,6 +30,7 @@ from awx.main.models import (
UnifiedJob, UnifiedJob,
WorkflowApproval, WorkflowApproval,
WorkflowJob, WorkflowJob,
WorkflowJobNode,
WorkflowJobTemplate, WorkflowJobTemplate,
) )
from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.scheduler.dag_workflow import WorkflowDAG
@@ -132,12 +133,49 @@ class WorkflowManager(TaskBase):
@timeit @timeit
def spawn_workflow_graph_jobs(self, workflow_jobs): def spawn_workflow_graph_jobs(self, workflow_jobs):
logger.debug(f"=== {workflow_jobs}") result = []
for workflow_job in workflow_jobs: for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format)
continue
dag = WorkflowDAG(workflow_job) dag = WorkflowDAG(workflow_job)
status_changed = False
if workflow_job.cancel_flag:
workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True)
logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format)
cancel_finished = dag.cancel_node_jobs()
if cancel_finished:
logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format)
workflow_job.status = 'canceled'
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=['status', 'start_args'])
status_changed = True
else:
workflow_nodes = dag.mark_dnr_nodes()
WorkflowJobNode.objects.bulk_update(workflow_nodes, ['do_not_run'])
# If workflow is now done, we do special things to mark it as done.
is_done = dag.is_workflow_done()
if is_done:
has_failed, reason = dag.has_workflow_failed()
logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
result.append(workflow_job.id)
new_status = 'failed' if has_failed else 'successful'
logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status))
update_fields = ['status', 'start_args']
workflow_job.status = new_status
if reason:
logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}')
workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed")
update_fields.append('job_explanation')
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=update_fields)
status_changed = True
if status_changed:
if workflow_job.spawned_by_workflow:
schedule_task_manager()
workflow_job.websocket_emit_status(workflow_job.status)
# Operations whose queries rely on modifications made during the atomic scheduling session
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
if workflow_job.status == 'running':
spawn_nodes = dag.bfs_nodes_to_run() spawn_nodes = dag.bfs_nodes_to_run()
if spawn_nodes: if spawn_nodes:
logger.debug('Spawning jobs for %s', workflow_job.log_format) logger.debug('Spawning jobs for %s', workflow_job.log_format)
@@ -163,7 +201,8 @@ class WorkflowManager(TaskBase):
) )
display_list = [spawn_node.unified_job_template] + workflow_ancestors display_list = [spawn_node.unified_job_template] + workflow_ancestors
job.job_explanation = gettext_noop( job.job_explanation = gettext_noop(
"Workflow Job spawned from workflow could not start because it " "would result in recursion (spawn order, most recent first: {})" "Workflow Job spawned from workflow could not start because it "
"would result in recursion (spawn order, most recent first: {})"
).format(', '.join(['<{}>'.format(tmp) for tmp in display_list])) ).format(', '.join(['<{}>'.format(tmp) for tmp in display_list]))
else: else:
logger.debug( logger.debug(
@@ -194,48 +233,6 @@ class WorkflowManager(TaskBase):
# TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ?
# emit_websocket_notification('/socket.io/jobs', '', dict(id=)) # emit_websocket_notification('/socket.io/jobs', '', dict(id=))
def process_finished_workflow_jobs(self, workflow_jobs):
result = []
for workflow_job in workflow_jobs:
dag = WorkflowDAG(workflow_job)
status_changed = False
if workflow_job.cancel_flag:
workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True)
logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format)
cancel_finished = dag.cancel_node_jobs()
if cancel_finished:
logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format)
workflow_job.status = 'canceled'
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=['status', 'start_args'])
status_changed = True
else:
workflow_nodes = dag.mark_dnr_nodes()
for n in workflow_nodes:
n.save(update_fields=['do_not_run'])
is_done = dag.is_workflow_done()
if not is_done:
continue
has_failed, reason = dag.has_workflow_failed()
logger.debug('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
result.append(workflow_job.id)
new_status = 'failed' if has_failed else 'successful'
logger.debug("Transitioning {} to {} status.".format(workflow_job.log_format, new_status))
update_fields = ['status', 'start_args']
workflow_job.status = new_status
if reason:
logger.info(f'Workflow job {workflow_job.id} failed due to reason: {reason}')
workflow_job.job_explanation = gettext_noop("No error handling paths found, marking workflow as failed")
update_fields.append('job_explanation')
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=update_fields)
status_changed = True
if status_changed:
if workflow_job.spawned_by_workflow:
schedule_task_manager()
workflow_job.websocket_emit_status(workflow_job.status)
# Operations whose queries rely on modifications made during the atomic scheduling session
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
return result return result
def timeout_approval_node(self): def timeout_approval_node(self):
@@ -265,18 +262,7 @@ class WorkflowManager(TaskBase):
def _schedule(self): def _schedule(self):
running_workflow_tasks = self.get_tasks() running_workflow_tasks = self.get_tasks()
if len(running_workflow_tasks) > 0: if len(running_workflow_tasks) > 0:
self.process_finished_workflow_jobs(running_workflow_tasks)
previously_running_workflow_tasks = running_workflow_tasks
running_workflow_tasks = []
for workflow_job in previously_running_workflow_tasks:
if workflow_job.status == 'running':
running_workflow_tasks.append(workflow_job)
else:
logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format)
self.spawn_workflow_graph_jobs(running_workflow_tasks) self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.timeout_approval_node() self.timeout_approval_node()