From 00d71cea50a8e3c98515c92d285389a01e255e49 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 16 Nov 2018 13:37:11 -0500 Subject: [PATCH] detect workflow nodes without job templates * Fail workflow job run when encountering a Workflow Job Nodes with no related job templates. --- awx/main/scheduler/dag_workflow.py | 26 +++++++++++ .../tests/functional/models/test_workflow.py | 20 ++++++--- .../tests/unit/scheduler/test_dag_workflow.py | 43 +++---------------- 3 files changed, 48 insertions(+), 41 deletions(-) diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 854fb16dc7..67834d6149 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -127,6 +127,9 @@ class WorkflowDAG(SimpleDAG): failed_nodes = [] for node in self.nodes: obj = node['node_object'] + + if obj.unified_job_template is None: + return True if obj.job and obj.job.status in ['failed', 'canceled', 'error']: failed_nodes.append(node) for node in failed_nodes: @@ -179,6 +182,21 @@ class WorkflowDAG(SimpleDAG): return False return True + r''' + Useful in a failure scenario. Will mark all nodes that might have run a job + and haven't already run a job as do_not_run=True + + Return an array of workflow nodes that were marked do_not_run = True + ''' + def _mark_all_remaining_nodes_dnr(self): + objs = [] + for node in self.nodes: + obj = node['node_object'] + if obj.do_not_run is False and not obj.job: + obj.do_not_run = True + objs.append(obj) + return objs + def mark_dnr_nodes(self): root_nodes = self.get_root_nodes() nodes = copy.copy(root_nodes) @@ -191,6 +209,14 @@ class WorkflowDAG(SimpleDAG): continue node_ids_visited.add(obj.id) + ''' + Special case. On a workflow job template relaunch it's possible for + the job template associated with the job to have been deleted. If + this is the case, fail the workflow job and mark it done. + ''' + if obj.unified_job_template is None: + return self._mark_all_remaining_nodes_dnr() + if obj.do_not_run is False and not obj.job and n not in root_nodes: parent_nodes = filter(lambda n: not n.do_not_run, [p['node_object'] for p in self.get_dependents(obj)]) diff --git a/awx/main/tests/functional/models/test_workflow.py b/awx/main/tests/functional/models/test_workflow.py index 87ddc8747c..17fa705c47 100644 --- a/awx/main/tests/functional/models/test_workflow.py +++ b/awx/main/tests/functional/models/test_workflow.py @@ -64,7 +64,9 @@ class TestWorkflowDAGFunctional(TransactionTestCase): def test_workflow_done(self): wfj = self.workflow_job(states=['failed', None, None, 'successful', None]) dag = WorkflowDAG(workflow_job=wfj) - is_done, has_failed = dag.is_workflow_done() + assert 3 == len(dag.mark_dnr_nodes()) + is_done = dag.is_workflow_done() + has_failed = dag.has_workflow_failed() self.assertTrue(is_done) self.assertFalse(has_failed) @@ -73,28 +75,36 @@ class TestWorkflowDAGFunctional(TransactionTestCase): jt.delete() relaunched = wfj.create_relaunch_workflow_job() dag = WorkflowDAG(workflow_job=relaunched) - is_done, has_failed = dag.is_workflow_done() + dag.mark_dnr_nodes() + is_done = dag.is_workflow_done() + has_failed = dag.has_workflow_failed() self.assertTrue(is_done) self.assertTrue(has_failed) def test_workflow_fails_for_no_error_handler(self): wfj = self.workflow_job(states=['successful', 'failed', None, None, None]) dag = WorkflowDAG(workflow_job=wfj) - is_done, has_failed = dag.is_workflow_done() + dag.mark_dnr_nodes() + is_done = dag.is_workflow_done() + has_failed = dag.has_workflow_failed() self.assertTrue(is_done) self.assertTrue(has_failed) def test_workflow_fails_leaf(self): wfj = self.workflow_job(states=['successful', 'successful', 'failed', None, None]) dag = WorkflowDAG(workflow_job=wfj) - is_done, has_failed = dag.is_workflow_done() + dag.mark_dnr_nodes() + is_done = dag.is_workflow_done() + has_failed = dag.has_workflow_failed() self.assertTrue(is_done) self.assertTrue(has_failed) def test_workflow_not_finished(self): wfj = self.workflow_job(states=['new', None, None, None, None]) dag = WorkflowDAG(workflow_job=wfj) - is_done, has_failed = dag.is_workflow_done() + dag.mark_dnr_nodes() + is_done = dag.is_workflow_done() + has_failed = dag.has_workflow_failed() self.assertFalse(is_done) self.assertFalse(has_failed) diff --git a/awx/main/tests/unit/scheduler/test_dag_workflow.py b/awx/main/tests/unit/scheduler/test_dag_workflow.py index 4ab8c8e592..e126456d4e 100644 --- a/awx/main/tests/unit/scheduler/test_dag_workflow.py +++ b/awx/main/tests/unit/scheduler/test_dag_workflow.py @@ -9,33 +9,18 @@ class Job(): self.status = status -class WorkflowNodeBase(object): - def __init__(self, id=None, job=None): +class WorkflowNode(object): + def __init__(self, id=None, job=None, do_not_run=False, unified_job_template=None): self.id = id if id else uuid.uuid4() self.job = job - - -class WorkflowNodeDNR(WorkflowNodeBase): - def __init__(self, do_not_run=False, **kwargs): - super(WorkflowNodeDNR, self).__init__(**kwargs) self.do_not_run = do_not_run - - -class WorkflowNodeUJT(WorkflowNodeDNR): - def __init__(self, unified_job_template=None, **kwargs): - super(WorkflowNodeUJT, self).__init__(**kwargs) self.unified_job_template = unified_job_template @pytest.fixture -def WorkflowNodeClass(): - return WorkflowNodeBase - - -@pytest.fixture -def wf_node_generator(mocker, WorkflowNodeClass): +def wf_node_generator(mocker): def fn(**kwargs): - return WorkflowNodeClass(**kwargs) + return WorkflowNode(**kwargs) return fn @@ -94,12 +79,10 @@ class TestWorkflowDAG(): class TestDNR(): - @pytest.fixture - def WorkflowNodeClass(self): - return WorkflowNodeDNR - def test_mark_dnr_nodes(self, workflow_dag_1): (g, nodes) = workflow_dag_1 + for n in nodes: + n.unified_job_template = object() r''' S0 @@ -142,10 +125,6 @@ class TestDNR(): class TestIsWorkflowDone(): - @pytest.fixture - def WorkflowNodeClass(self): - return WorkflowNodeUJT - @pytest.fixture def workflow_dag_2(self, workflow_dag_1): (g, nodes) = workflow_dag_1 @@ -211,10 +190,6 @@ class TestIsWorkflowDone(): class TestHasWorkflowFailed(): - @pytest.fixture - def WorkflowNodeClass(self): - return WorkflowNodeBase - @pytest.fixture def workflow_dag_canceled(self, wf_node_generator): g = WorkflowDAG() @@ -244,14 +219,10 @@ class TestHasWorkflowFailed(): class TestBFSNodesToRun(): - @pytest.fixture - def WorkflowNodeClass(self): - return WorkflowNodeDNR - @pytest.fixture def workflow_dag_canceled(self, wf_node_generator): g = WorkflowDAG() - nodes = [wf_node_generator() for i in range(4)] + nodes = [wf_node_generator(unified_job_template=object()) for i in range(4)] map(lambda n: g.add_node(n), nodes) r''' C0