Jake McDermott found some behavior that revealed a logical bug that would have caused issues later with ALL convergence nodes in sequential order via the API, although not the UI, and was causing existing issues with Root Nodes spawning repeatedly. To fix this I refactored the code from marking DNR nodes into it's own function that checks parents convergence criteria and leveraged that in bfs_nodes_to_run so that root nodes and convergence nodes can be differentiated but both can be correctly processed, also so that children of convergence nodes can be properly traversed by the function

This commit is contained in:
Rebeccah 2020-02-04 17:02:30 -05:00
parent 4e787cc079
commit 63ae2cac38

View File

@ -69,6 +69,25 @@ class WorkflowDAG(SimpleDAG):
return False
return True
def _all_parents_met_convergence_criteria(self, node):
# This function takes any node and checks that all it's parents have met their criteria to run the child.
# This returns a boolean and is really only useful if the node is an ALL convergence node and is
# intended to be used in conjuction with the node property `all_parents_must_converge`
obj = node['node_object']
parent_nodes = [p['node_object'] for p in self.get_parents(obj)]
for p in parent_nodes:
#node has a status
if p.job and p.job.status in ["successful", "failed"]:
if p.job and p.job.status == "successful":
status = "success_nodes"
elif p.job and p.job.status == "failed":
status = "failure_nodes"
#check that the nodes status matches either a pathway of the same status or is an always path.
if (p not in [node['node_object'] for node in self.get_parents(obj, status)]
and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]):
return False
return True
def bfs_nodes_to_run(self):
nodes = self.get_root_nodes()
nodes_found = []
@ -80,19 +99,25 @@ class WorkflowDAG(SimpleDAG):
node_ids_visited.add(obj.id)
if obj.do_not_run is True:
continue
elif obj.job and not obj.all_parents_must_converge:
elif obj.job:
if obj.job.status in ['failed', 'error', 'canceled']:
nodes.extend(self.get_children(obj, 'failure_nodes') +
self.get_children(obj, 'always_nodes'))
elif obj.job.status == 'successful':
nodes.extend(self.get_children(obj, 'success_nodes') +
self.get_children(obj, 'always_nodes'))
elif obj.unified_job_template is None and not obj.all_parents_must_converge:
elif obj.unified_job_template is None:
nodes.extend(self.get_children(obj, 'failure_nodes') +
self.get_children(obj, 'always_nodes'))
else:
if self._are_relevant_parents_finished(n):
# This catches root nodes or ANY convergence nodes
if not obj.all_parents_must_converge and self._are_relevant_parents_finished(n):
nodes_found.append(n)
# This catches ALL convergence nodes
elif obj.all_parents_must_converge and self._are_relevant_parents_finished(n):
if self._all_parents_met_convergence_criteria(n):
nodes_found.append(n)
return [n['node_object'] for n in nodes_found]
def cancel_node_jobs(self):
@ -222,21 +247,9 @@ class WorkflowDAG(SimpleDAG):
parent_nodes = [p['node_object'] for p in self.get_parents(obj)]
if not obj.do_not_run and not obj.job and node not in root_nodes:
if obj.all_parents_must_converge:
if any(p.do_not_run for p in parent_nodes):
if any(p.do_not_run for p in parent_nodes) or not self._all_parents_met_convergence_criteria(node):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)
continue
for p in parent_nodes:
if p.job and p.job.status in ["successful", "failed"]:
if p.job and p.job.status == "successful":
status = "success_nodes"
elif p.job and p.job.status == "failed":
status = "failure_nodes"
if (p not in [node['node_object'] for node in self.get_parents(obj, status)]
and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)
break
else:
if self._are_all_nodes_dnr_decided(parent_nodes):
if self._should_mark_node_dnr(node, parent_nodes):