mirror of
https://github.com/ansible/awx.git
synced 2026-03-20 02:17:37 -02:30
update done and fail detection for workflow
* Instead of traversing the workflow graph to determine if a workflow is done or has failed; instead, loop through all the nodes in the graph and grab only the relevant nodes.
This commit is contained in:
@@ -115,45 +115,26 @@ class WorkflowDAG(SimpleDAG):
|
|||||||
return cancel_finished
|
return cancel_finished
|
||||||
|
|
||||||
def is_workflow_done(self):
|
def is_workflow_done(self):
|
||||||
root_nodes = self.get_root_nodes()
|
for node in self.nodes:
|
||||||
nodes = root_nodes
|
obj = node['node_object']
|
||||||
is_failed = False
|
if obj.do_not_run is False and not obj.job:
|
||||||
|
return False
|
||||||
|
elif obj.job and obj.job.status not in ['successful', 'failed', 'canceled', 'error']:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
for index, n in enumerate(nodes):
|
def has_workflow_failed(self):
|
||||||
obj = n['node_object']
|
failed_nodes = []
|
||||||
job = obj.job
|
for node in self.nodes:
|
||||||
|
obj = node['node_object']
|
||||||
if obj.unified_job_template is None:
|
if obj.job and obj.job.status in ['failed', 'anceled', 'error']:
|
||||||
is_failed = True
|
failed_nodes.append(node)
|
||||||
continue
|
for node in failed_nodes:
|
||||||
elif obj.do_not_run is False and not job:
|
obj = node['node_object']
|
||||||
return False, False
|
if (len(self.get_dependencies(obj, 'failure_nodes')) +
|
||||||
elif obj.do_not_run is True:
|
len(self.get_dependencies(obj, 'always_nodes'))) == 0:
|
||||||
continue
|
return True
|
||||||
|
return False
|
||||||
children_success = self.get_dependencies(obj, 'success_nodes')
|
|
||||||
children_failed = self.get_dependencies(obj, 'failure_nodes')
|
|
||||||
children_always = self.get_dependencies(obj, 'always_nodes')
|
|
||||||
if not is_failed and job.status != 'successful':
|
|
||||||
children_all = children_success + children_failed + children_always
|
|
||||||
for child in children_all:
|
|
||||||
if child['node_object'].job:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
is_failed = True if children_all else job.status in ['failed', 'canceled', 'error']
|
|
||||||
|
|
||||||
if job.status == 'canceled':
|
|
||||||
continue
|
|
||||||
elif job.status in ['error', 'failed']:
|
|
||||||
nodes.extend(children_failed + children_always)
|
|
||||||
elif job.status == 'successful':
|
|
||||||
nodes.extend(children_success + children_always)
|
|
||||||
else:
|
|
||||||
# Job is about to run or is running. Hold our horses and wait for
|
|
||||||
# the job to finish. We can't proceed down the graph path until we
|
|
||||||
# have the job result.
|
|
||||||
return False, False
|
|
||||||
return True, is_failed
|
|
||||||
|
|
||||||
r'''
|
r'''
|
||||||
Determine if all nodes have been decided on being marked do_not_run.
|
Determine if all nodes have been decided on being marked do_not_run.
|
||||||
@@ -188,7 +169,7 @@ class WorkflowDAG(SimpleDAG):
|
|||||||
if node in (self.get_dependencies(p, 'success_nodes') +
|
if node in (self.get_dependencies(p, 'success_nodes') +
|
||||||
self.get_dependencies(p, 'always_nodes')):
|
self.get_dependencies(p, 'always_nodes')):
|
||||||
return False
|
return False
|
||||||
elif p.job.status in ['failed', 'error']:
|
elif p.job.status in ['failed', 'error', 'canceled']:
|
||||||
if node in (self.get_dependencies(p, 'failure_nodes') +
|
if node in (self.get_dependencies(p, 'failure_nodes') +
|
||||||
self.get_dependencies(p, 'always_nodes')):
|
self.get_dependencies(p, 'always_nodes')):
|
||||||
return False
|
return False
|
||||||
@@ -222,4 +203,3 @@ class WorkflowDAG(SimpleDAG):
|
|||||||
self.get_dependencies(obj, 'failure_nodes') +
|
self.get_dependencies(obj, 'failure_nodes') +
|
||||||
self.get_dependencies(obj, 'always_nodes'))
|
self.get_dependencies(obj, 'always_nodes'))
|
||||||
return [n['node_object'] for n in nodes_marked_do_not_run]
|
return [n['node_object'] for n in nodes_marked_do_not_run]
|
||||||
|
|
||||||
|
|||||||
@@ -163,6 +163,7 @@ class TaskManager():
|
|||||||
dag = WorkflowDAG(workflow_job)
|
dag = WorkflowDAG(workflow_job)
|
||||||
status_changed = False
|
status_changed = False
|
||||||
if workflow_job.cancel_flag:
|
if workflow_job.cancel_flag:
|
||||||
|
workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True)
|
||||||
logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format)
|
logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format)
|
||||||
cancel_finished = dag.cancel_node_jobs()
|
cancel_finished = dag.cancel_node_jobs()
|
||||||
if cancel_finished:
|
if cancel_finished:
|
||||||
@@ -172,11 +173,12 @@ class TaskManager():
|
|||||||
workflow_job.save(update_fields=['status', 'start_args'])
|
workflow_job.save(update_fields=['status', 'start_args'])
|
||||||
status_changed = True
|
status_changed = True
|
||||||
else:
|
else:
|
||||||
is_done, has_failed = dag.is_workflow_done()
|
|
||||||
workflow_nodes = dag.mark_dnr_nodes()
|
workflow_nodes = dag.mark_dnr_nodes()
|
||||||
map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes)
|
map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes)
|
||||||
|
is_done = dag.is_workflow_done()
|
||||||
if not is_done:
|
if not is_done:
|
||||||
continue
|
continue
|
||||||
|
has_failed = dag.has_workflow_failed()
|
||||||
logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
|
logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
|
||||||
result.append(workflow_job.id)
|
result.append(workflow_job.id)
|
||||||
new_status = 'failed' if has_failed else 'successful'
|
new_status = 'failed' if has_failed else 'successful'
|
||||||
|
|||||||
Reference in New Issue
Block a user