all parents should finish before start child

This commit is contained in:
chris meyers 2018-10-17 10:16:40 -04:00 committed by mabashian
parent 77661c6032
commit 914892c3ac

View File

@ -24,32 +24,50 @@ class WorkflowDAG(SimpleDAG):
for related_node in related_nodes:
self.add_edge(workflow_node, related_node, node_type)
'''
Determine if all, relevant, parents node are finished.
Relevant parents are parents that are marked do_not_run False.
:param node: a node entry from SimpleDag.nodes (i.e. a dict with property ['node_object']
Return a boolean
'''
def are_relevant_parents_finished(self, node):
obj = node['node_object']
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
for p in parent_nodes:
if p.do_not_run is True:
continue
# Node might run a job
if p.do_not_run is False and not p.job:
return False
# Node decidedly got a job; check if job is done
if p.job and p.job.status not in ['successful', 'failed']:
return False
return True
def bfs_nodes_to_run(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
nodes = self.get_root_nodes()
nodes_found = []
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job and obj.do_not_run is False:
nodes_found.append(n)
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job and job.status not in ['failed', 'successful']:
if obj.do_not_run is True:
continue
elif job and job.status == 'failed':
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job and job.status == 'successful':
children_success = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
nodes.extend(children_all)
if obj.job:
if obj.job.status == 'failed':
nodes.extend(self.get_dependencies(obj, 'failure_nodes') +
self.get_dependencies(obj, 'always_nodes'))
elif obj.job.status == 'successful':
nodes.extend(self.get_dependencies(obj, 'success_nodes') +
self.get_dependencies(obj, 'always_nodes'))
else:
if self.are_relevant_parents_finished(n):
nodes_found.append(n)
return [n['node_object'] for n in nodes_found]
def cancel_node_jobs(self):