update done and fail detection for workflow

* Instead of traversing the workflow graph to determine if a workflow is
done or has failed; instead, loop through all the nodes in the graph and
grab only the relevant nodes.
This commit is contained in:
chris meyers
2018-11-13 16:13:12 -05:00
committed by mabashian
parent ae0d0db62c
commit 6529c1bb46
2 changed files with 23 additions and 41 deletions

View File

@@ -115,45 +115,26 @@ class WorkflowDAG(SimpleDAG):
return cancel_finished
def is_workflow_done(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
is_failed = False
for node in self.nodes:
obj = node['node_object']
if obj.do_not_run is False and not obj.job:
return False
elif obj.job and obj.job.status not in ['successful', 'failed', 'canceled', 'error']:
return False
return True
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if obj.unified_job_template is None:
is_failed = True
continue
elif obj.do_not_run is False and not job:
return False, False
elif obj.do_not_run is True:
continue
children_success = self.get_dependencies(obj, 'success_nodes')
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
if not is_failed and job.status != 'successful':
children_all = children_success + children_failed + children_always
for child in children_all:
if child['node_object'].job:
break
else:
is_failed = True if children_all else job.status in ['failed', 'canceled', 'error']
if job.status == 'canceled':
continue
elif job.status in ['error', 'failed']:
nodes.extend(children_failed + children_always)
elif job.status == 'successful':
nodes.extend(children_success + children_always)
else:
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
return False, False
return True, is_failed
def has_workflow_failed(self):
failed_nodes = []
for node in self.nodes:
obj = node['node_object']
if obj.job and obj.job.status in ['failed', 'anceled', 'error']:
failed_nodes.append(node)
for node in failed_nodes:
obj = node['node_object']
if (len(self.get_dependencies(obj, 'failure_nodes')) +
len(self.get_dependencies(obj, 'always_nodes'))) == 0:
return True
return False
r'''
Determine if all nodes have been decided on being marked do_not_run.
@@ -188,7 +169,7 @@ class WorkflowDAG(SimpleDAG):
if node in (self.get_dependencies(p, 'success_nodes') +
self.get_dependencies(p, 'always_nodes')):
return False
elif p.job.status in ['failed', 'error']:
elif p.job.status in ['failed', 'error', 'canceled']:
if node in (self.get_dependencies(p, 'failure_nodes') +
self.get_dependencies(p, 'always_nodes')):
return False
@@ -222,4 +203,3 @@ class WorkflowDAG(SimpleDAG):
self.get_dependencies(obj, 'failure_nodes') +
self.get_dependencies(obj, 'always_nodes'))
return [n['node_object'] for n in nodes_marked_do_not_run]