diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 0b7aeb7140..55316410b5 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -66,6 +66,10 @@ class WorkflowDAG(SimpleDAG): if p.do_not_run is True: continue + # job template relationship deleted, don't run the node and take the failure path + if p.do_not_run is False and not p.job and p.unified_job_template is None: + return True + # Node might run a job if p.do_not_run is False and not p.job: return False @@ -86,7 +90,7 @@ class WorkflowDAG(SimpleDAG): continue node_ids_visited.add(obj.id) - if obj.do_not_run is True: + if obj.do_not_run is True and obj.unified_job_template: continue if obj.job: @@ -96,6 +100,9 @@ class WorkflowDAG(SimpleDAG): elif obj.job.status == 'successful': nodes.extend(self.get_dependencies(obj, 'success_nodes') + self.get_dependencies(obj, 'always_nodes')) + elif obj.unified_job_template is None: + nodes.extend(self.get_dependencies(obj, 'failure_nodes') + + self.get_dependencies(obj, 'always_nodes')) else: if self._are_relevant_parents_finished(n): nodes_found.append(n) @@ -127,16 +134,18 @@ class WorkflowDAG(SimpleDAG): failed_nodes = [] for node in self.nodes: obj = node['node_object'] - - if obj.unified_job_template is None: - return True, "Workflow job node {} related unified job template missing".format(obj.id) if obj.job and obj.job.status in ['failed', 'canceled', 'error']: failed_nodes.append(node) + elif obj.do_not_run is True and obj.unified_job_template is None: + failed_nodes.append(node) for node in failed_nodes: obj = node['node_object'] if (len(self.get_dependencies(obj, 'failure_nodes')) + len(self.get_dependencies(obj, 'always_nodes'))) == 0: - return True, "Workflow job node {} has a status of '{}' without an error handler path".format(obj.id, obj.job.status) + if obj.unified_job_template is None: + return True, "Workflow job node {} related unified job template missing and is without an error handle path".format(obj.id) + else: + return True, "Workflow job node {} has a status of '{}' without an error handler path".format(obj.id, obj.job.status) return False, None r''' @@ -151,6 +160,8 @@ class WorkflowDAG(SimpleDAG): ''' def _are_all_nodes_dnr_decided(self, workflow_nodes): for n in workflow_nodes: + if n.unified_job_template is None and n.do_not_run is False: + return False if n.do_not_run is False and not n.job: return False return True @@ -178,50 +189,30 @@ class WorkflowDAG(SimpleDAG): return False else: return False + elif p.unified_job_template is None: + if node in (self.get_dependencies(p, 'failure_nodes') + + self.get_dependencies(p, 'always_nodes')): + return False else: return False return True - r''' - Useful in a failure scenario. Will mark all nodes that might have run a job - and haven't already run a job as do_not_run=True - - Return an array of workflow nodes that were marked do_not_run = True - ''' - def _mark_all_remaining_nodes_dnr(self): - objs = [] - for node in self.nodes: - obj = node['node_object'] - if obj.do_not_run is False and not obj.job: - obj.do_not_run = True - objs.append(obj) - return objs - def mark_dnr_nodes(self): root_nodes = self.get_root_nodes() nodes = copy.copy(root_nodes) nodes_marked_do_not_run = [] - node_ids_visited = set() for index, n in enumerate(nodes): obj = n['node_object'] - if obj.id in node_ids_visited: - continue - node_ids_visited.add(obj.id) - ''' - Special case. On a workflow job template relaunch it's possible for - the job template associated with the job to have been deleted. If - this is the case, fail the workflow job and mark it done. - ''' - if obj.unified_job_template is None: - return self._mark_all_remaining_nodes_dnr() - - if obj.do_not_run is False and not obj.job and n not in root_nodes: + if obj.do_not_run is False and not obj.job: parent_nodes = filter(lambda n: not n.do_not_run, [p['node_object'] for p in self.get_dependents(obj)]) if self._are_all_nodes_dnr_decided(parent_nodes): - if self._should_mark_node_dnr(n, parent_nodes): + if obj.unified_job_template is None: + obj.do_not_run = True + nodes_marked_do_not_run.append(n) + elif n not in root_nodes and self._should_mark_node_dnr(n, parent_nodes): obj.do_not_run = True nodes_marked_do_not_run.append(n) diff --git a/awx/main/tests/unit/scheduler/test_dag_workflow.py b/awx/main/tests/unit/scheduler/test_dag_workflow.py index 03a02d748c..de436bf15f 100644 --- a/awx/main/tests/unit/scheduler/test_dag_workflow.py +++ b/awx/main/tests/unit/scheduler/test_dag_workflow.py @@ -11,7 +11,7 @@ class Job(): class WorkflowNode(object): def __init__(self, id=None, job=None, do_not_run=False, unified_job_template=None): - self.id = id if id else uuid.uuid4() + self.id = id if id is not None else uuid.uuid4() self.job = job self.do_not_run = do_not_run self.unified_job_template = unified_job_template @@ -19,8 +19,12 @@ class WorkflowNode(object): @pytest.fixture def wf_node_generator(mocker): + pytest.count = 0 + def fn(**kwargs): - return WorkflowNode(**kwargs) + wfn = WorkflowNode(id=pytest.count, **kwargs) + pytest.count += 1 + return wfn return fn @@ -177,19 +181,6 @@ class TestIsWorkflowDone(): nodes[2].job = Job(status='failed') return (g, nodes) - def test_is_workflow_done(self, workflow_dag_2): - g = workflow_dag_2[0] - - assert g.is_workflow_done() is False - - def test_is_workflow_done_failed(self, workflow_dag_failed): - (g, nodes) = workflow_dag_failed - - assert g.is_workflow_done() is True - assert g.has_workflow_failed() == (True, "Workflow job node {} has a status of 'failed' without an error handler path".format(nodes[2].id)) - - -class TestHasWorkflowFailed(): @pytest.fixture def workflow_dag_canceled(self, wf_node_generator): g = WorkflowDAG() @@ -207,6 +198,38 @@ class TestHasWorkflowFailed(): nodes[0].job.status = 'failed' return (g, nodes) + def test_done(self, workflow_dag_2): + g = workflow_dag_2[0] + + assert g.is_workflow_done() is False + + def test_workflow_done_and_failed(self, workflow_dag_failed): + (g, nodes) = workflow_dag_failed + + assert g.is_workflow_done() is True + assert g.has_workflow_failed() == (True, "Workflow job node {} has a status of 'failed' without an error handler path".format(nodes[2].id)) + + def test_is_workflow_done_no_unified_job_tempalte_end(self, workflow_dag_failed): + (g, nodes) = workflow_dag_failed + + nodes[2].unified_job_template = None + + assert g.is_workflow_done() is True + assert g.has_workflow_failed() == \ + (True, "Workflow job node {} related unified job template missing" + " and is without an error handle path".format(nodes[2].id)) + + def test_is_workflow_done_no_unified_job_tempalte_begin(self, workflow_dag_1): + (g, nodes) = workflow_dag_1 + + nodes[0].unified_job_template = None + g.mark_dnr_nodes() + + assert g.is_workflow_done() is True + assert g.has_workflow_failed() == \ + (True, "Workflow job node {} related unified job template missing" + " and is without an error handle path".format(nodes[0].id)) + def test_canceled_should_fail(self, workflow_dag_canceled): (g, nodes) = workflow_dag_canceled