diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 0bbc28d97a..a06c620c9a 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -69,6 +69,25 @@ class WorkflowDAG(SimpleDAG): return False return True + def _all_parents_met_convergence_criteria(self, node): + # This function takes any node and checks that all it's parents have met their criteria to run the child. + # This returns a boolean and is really only useful if the node is an ALL convergence node and is + # intended to be used in conjuction with the node property `all_parents_must_converge` + obj = node['node_object'] + parent_nodes = [p['node_object'] for p in self.get_parents(obj)] + for p in parent_nodes: + #node has a status + if p.job and p.job.status in ["successful", "failed"]: + if p.job and p.job.status == "successful": + status = "success_nodes" + elif p.job and p.job.status == "failed": + status = "failure_nodes" + #check that the nodes status matches either a pathway of the same status or is an always path. + if (p not in [node['node_object'] for node in self.get_parents(obj, status)] + and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]): + return False + return True + def bfs_nodes_to_run(self): nodes = self.get_root_nodes() nodes_found = [] @@ -80,19 +99,25 @@ class WorkflowDAG(SimpleDAG): node_ids_visited.add(obj.id) if obj.do_not_run is True: continue - elif obj.job and not obj.all_parents_must_converge: + elif obj.job: if obj.job.status in ['failed', 'error', 'canceled']: nodes.extend(self.get_children(obj, 'failure_nodes') + self.get_children(obj, 'always_nodes')) elif obj.job.status == 'successful': nodes.extend(self.get_children(obj, 'success_nodes') + self.get_children(obj, 'always_nodes')) - elif obj.unified_job_template is None and not obj.all_parents_must_converge: + elif obj.unified_job_template is None: nodes.extend(self.get_children(obj, 'failure_nodes') + self.get_children(obj, 'always_nodes')) else: - if self._are_relevant_parents_finished(n): + # This catches root nodes or ANY convergence nodes + if not obj.all_parents_must_converge and self._are_relevant_parents_finished(n): nodes_found.append(n) + # This catches ALL convergence nodes + elif obj.all_parents_must_converge and self._are_relevant_parents_finished(n): + if self._all_parents_met_convergence_criteria(n): + nodes_found.append(n) + return [n['node_object'] for n in nodes_found] def cancel_node_jobs(self): @@ -222,21 +247,9 @@ class WorkflowDAG(SimpleDAG): parent_nodes = [p['node_object'] for p in self.get_parents(obj)] if not obj.do_not_run and not obj.job and node not in root_nodes: if obj.all_parents_must_converge: - if any(p.do_not_run for p in parent_nodes): + if any(p.do_not_run for p in parent_nodes) or not self._all_parents_met_convergence_criteria(node): obj.do_not_run = True nodes_marked_do_not_run.append(node) - continue - for p in parent_nodes: - if p.job and p.job.status in ["successful", "failed"]: - if p.job and p.job.status == "successful": - status = "success_nodes" - elif p.job and p.job.status == "failed": - status = "failure_nodes" - if (p not in [node['node_object'] for node in self.get_parents(obj, status)] - and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]): - obj.do_not_run = True - nodes_marked_do_not_run.append(node) - break else: if self._are_all_nodes_dnr_decided(parent_nodes): if self._should_mark_node_dnr(node, parent_nodes):