From 4c35adad6c4396aa25136f9016b6b7687c69b931 Mon Sep 17 00:00:00 2001 From: Rebeccah Date: Mon, 6 Jan 2020 16:36:39 -0500 Subject: [PATCH] added logic to include workflow convergence nodes to nodes to run or not run based on their parents successful statuses --- awx/main/scheduler/dag_workflow.py | 64 ++++++++++++++++++------------ 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index e4b926c853..de07751d3b 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -55,7 +55,7 @@ class WorkflowDAG(SimpleDAG): def _are_relevant_parents_finished(self, node): obj = node['node_object'] - parent_nodes = [p['node_object'] for p in self.get_dependents(obj)] + parent_nodes = [p['node_object'] for p in self.get_parents(obj)] for p in parent_nodes: if p.do_not_run is True: continue @@ -73,7 +73,6 @@ class WorkflowDAG(SimpleDAG): nodes = self.get_root_nodes() nodes_found = [] node_ids_visited = set() - for index, n in enumerate(nodes): obj = n['node_object'] if obj.id in node_ids_visited: @@ -82,21 +81,34 @@ class WorkflowDAG(SimpleDAG): if obj.do_not_run is True: continue - - if obj.job: - if obj.job.status in ['failed', 'error', 'canceled']: - nodes.extend(self.get_dependencies(obj, 'failure_nodes') + - self.get_dependencies(obj, 'always_nodes')) - elif obj.job.status == 'successful': - nodes.extend(self.get_dependencies(obj, 'success_nodes') + - self.get_dependencies(obj, 'always_nodes')) - elif obj.unified_job_template is None: - nodes.extend(self.get_dependencies(obj, 'failure_nodes') + - self.get_dependencies(obj, 'always_nodes')) else: - if self._are_relevant_parents_finished(n): - nodes_found.append(n) - #BECCAH TODO somewhere around here add in ANY and ALL logic + if obj.all_parents_must_converge is True: + if self._are_relevant_parents_finished(n): + # if the current node is a convergence node and all the + # relevant parents are finished then the node should run + parent_nodes = [p['node_object'] for p in self.get_parents(obj)] + successful_convergence = True + for p in parent_nodes: + if obj not in self.get_children(p, p.job.status): + # if the child list doesn't include the obj, then the parent didn't + # meet the criteria needed to run the child, meaning it's a DNR + successful_convergence = False + if successful_convergence == True: + nodes_found.append(n) + elif obj.all_parents_must_converge is False: + if obj.job: + if obj.job.status in ['failed', 'error', 'canceled']: + nodes.extend(self.get_children(obj, 'failure_nodes') + + self.get_children(obj, 'always_nodes')) + elif obj.job.status == 'successful': + nodes.extend(self.get_children(obj, 'success_nodes') + + self.get_children(obj, 'always_nodes')) + elif obj.unified_job_template is None: + nodes.extend(self.get_children(obj, 'failure_nodes') + + self.get_children(obj, 'always_nodes')) + else: + if self._are_relevant_parents_finished(n) is True: + nodes_found.append(n) return [n['node_object'] for n in nodes_found] def cancel_node_jobs(self): @@ -136,8 +148,8 @@ class WorkflowDAG(SimpleDAG): for node in failed_nodes: obj = node['node_object'] - if (len(self.get_dependencies(obj, 'failure_nodes')) + - len(self.get_dependencies(obj, 'always_nodes'))) == 0: + if (len(self.get_children(obj, 'failure_nodes')) + + len(self.get_children(obj, 'always_nodes'))) == 0: if obj.unified_job_template is None: res = True failed_unified_job_template_node_ids.append(str(obj.id)) @@ -192,18 +204,18 @@ class WorkflowDAG(SimpleDAG): pass elif p.job: if p.job.status == 'successful': - if node in (self.get_dependencies(p, 'success_nodes') + - self.get_dependencies(p, 'always_nodes')): + if node in (self.get_children(p, 'success_nodes') + + self.get_children(p, 'always_nodes')): return False elif p.job.status in ['failed', 'error', 'canceled']: - if node in (self.get_dependencies(p, 'failure_nodes') + - self.get_dependencies(p, 'always_nodes')): + if node in (self.get_children(p, 'failure_nodes') + + self.get_children(p, 'always_nodes')): return False else: return False elif p.do_not_run is False and p.unified_job_template is None: - if node in (self.get_dependencies(p, 'failure_nodes') + - self.get_dependencies(p, 'always_nodes')): + if node in (self.get_children(p, 'failure_nodes') + + self.get_children(p, 'always_nodes')): return False else: return False @@ -217,10 +229,10 @@ class WorkflowDAG(SimpleDAG): obj = node['node_object'] if obj.do_not_run is False and not obj.job and node not in root_nodes: - parent_nodes = [p['node_object'] for p in self.get_dependents(obj)] + parent_nodes = [p['node_object'] for p in self.get_parents(obj)] if self._are_all_nodes_dnr_decided(parent_nodes): if self._should_mark_node_dnr(node, parent_nodes): obj.do_not_run = True nodes_marked_do_not_run.append(node) - return [n['node_object'] for n in nodes_marked_do_not_run] + return [n['node_object'] for n in nodes_marked_do_not_run] \ No newline at end of file