From febf051748b7af93a4dc2a4b74d3a10fc32710ae Mon Sep 17 00:00:00 2001 From: chris meyers Date: Mon, 19 Nov 2018 16:13:20 -0500 Subject: [PATCH] do not mark ujt None nodes dnr * Leave workflow nodes with no related unified job template nodes do_not_run = False. If we mark it True, we can't differentiate between the actual want to not take that path vs. do not run this because I do not have a valid related unified job template. --- awx/main/scheduler/dag_workflow.py | 102 +++++++++--------- .../tests/unit/scheduler/test_dag_workflow.py | 36 ++++--- 2 files changed, 69 insertions(+), 69 deletions(-) diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 41aba609b7..9c3cb3cc4e 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -1,4 +1,7 @@ +from django.utils.translation import ugettext_lazy as _ +from django.utils.encoding import smart_text + # Python from awx.main.models import ( WorkflowJobTemplateNode, @@ -50,61 +53,37 @@ class WorkflowDAG(SimpleDAG): for edge in always_nodes: self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'always_nodes') - r''' - Determine if all, relevant, parents node are finished. - Relevant parents are parents that are marked do_not_run False. - - :param node: a node entry from SimpleDag.nodes (i.e. a dict with property ['node_object'] - - Return a boolean - ''' def _are_relevant_parents_finished(self, node): obj = node['node_object'] parent_nodes = [p['node_object'] for p in self.get_dependents(obj)] for p in parent_nodes: if p.do_not_run is True: continue - - # job template relationship deleted, don't run the node and take the failure path - if p.do_not_run is False and not p.job and p.unified_job_template is None: - return True - + elif p.unified_job_template is None: + continue # Node might run a job - if p.do_not_run is False and not p.job: + elif not p.job: return False - # Node decidedly got a job; check if job is done - if p.job and p.job.status not in ['successful', 'failed', 'error', 'canceled']: + elif p.job and p.job.status not in ['successful', 'failed', 'error', 'canceled']: return False return True def bfs_nodes_to_run(self): - nodes = self.get_root_nodes() nodes_found = [] - node_ids_visited = set() - for index, n in enumerate(nodes): - obj = n['node_object'] - if obj.id in node_ids_visited: + for node in self.sort_nodes_topological(): + obj = node['node_object'] + if obj.do_not_run is True: continue - node_ids_visited.add(obj.id) - - if obj.do_not_run is True and obj.unified_job_template: + elif obj.job: continue - - if obj.job: - if obj.job.status in ['failed', 'error', 'canceled']: - nodes.extend(self.get_dependencies(obj, 'failure_nodes') + - self.get_dependencies(obj, 'always_nodes')) - elif obj.job.status == 'successful': - nodes.extend(self.get_dependencies(obj, 'success_nodes') + - self.get_dependencies(obj, 'always_nodes')) elif obj.unified_job_template is None: - nodes.extend(self.get_dependencies(obj, 'failure_nodes') + - self.get_dependencies(obj, 'always_nodes')) - else: - if self._are_relevant_parents_finished(n): - nodes_found.append(n) + continue + + if self._are_relevant_parents_finished(node): + nodes_found.append(node) + return [n['node_object'] for n in nodes_found] def cancel_node_jobs(self): @@ -123,7 +102,7 @@ class WorkflowDAG(SimpleDAG): def is_workflow_done(self): for node in self.nodes: obj = node['node_object'] - if obj.do_not_run is False and not obj.job: + if obj.do_not_run is False and not obj.job and obj.unified_job_template: return False elif obj.job and obj.job.status not in ['successful', 'failed', 'canceled', 'error']: return False @@ -131,20 +110,40 @@ class WorkflowDAG(SimpleDAG): def has_workflow_failed(self): failed_nodes = [] + res = False + failed_path_nodes_id_status = [] + failed_unified_job_template_node_ids = [] + for node in self.nodes: obj = node['node_object'] - if obj.job and obj.job.status in ['failed', 'canceled', 'error']: + if obj.do_not_run is False and obj.unified_job_template is None: failed_nodes.append(node) - elif obj.do_not_run is True and obj.unified_job_template is None: + elif obj.job and obj.job.status in ['failed', 'canceled', 'error']: failed_nodes.append(node) + for node in failed_nodes: obj = node['node_object'] if (len(self.get_dependencies(obj, 'failure_nodes')) + len(self.get_dependencies(obj, 'always_nodes'))) == 0: if obj.unified_job_template is None: - return True, "Workflow job node {} related unified job template missing and is without an error handle path".format(obj.id) + res = True + failed_unified_job_template_node_ids.append(str(obj.id)) else: - return True, "Workflow job node {} has a status of '{}' without an error handler path".format(obj.id, obj.job.status) + res = True + failed_path_nodes_id_status.append((str(obj.id), obj.job.status)) + + if res is True: + s = _("No error handle path for workflow job node(s) [{node_status}] workflow job " + "node(s) missing unified job template and error handle path [{no_ufjt}].") + parms = { + 'node_status': '', + 'no_ufjt': '', + } + if len(failed_path_nodes_id_status) > 0: + parms['node_status'] = ",".join(["({},{})".format(id, status) for id, status in failed_path_nodes_id_status]) + if len(failed_unified_job_template_node_ids) > 0: + parms['no_ufjt'] = ",".join(failed_unified_job_template_node_ids) + return True, smart_text(s.format(**parms)) return False, None r''' @@ -159,9 +158,7 @@ class WorkflowDAG(SimpleDAG): ''' def _are_all_nodes_dnr_decided(self, workflow_nodes): for n in workflow_nodes: - if n.unified_job_template is None and n.do_not_run is False: - return False - if n.do_not_run is False and not n.job: + if n.do_not_run is False and not n.job and n.unified_job_template: return False return True @@ -177,7 +174,9 @@ class WorkflowDAG(SimpleDAG): ''' def _should_mark_node_dnr(self, node, parent_nodes): for p in parent_nodes: - if p.job: + if p.do_not_run is True: + pass + elif p.job: if p.job.status == 'successful': if node in (self.get_dependencies(p, 'success_nodes') + self.get_dependencies(p, 'always_nodes')): @@ -188,12 +187,10 @@ class WorkflowDAG(SimpleDAG): return False else: return False - elif p.unified_job_template is None: + elif p.do_not_run is False and p.unified_job_template is None: if node in (self.get_dependencies(p, 'failure_nodes') + self.get_dependencies(p, 'always_nodes')): return False - elif p.do_not_run is True: - pass else: return False return True @@ -205,13 +202,10 @@ class WorkflowDAG(SimpleDAG): for node in self.sort_nodes_topological(): obj = node['node_object'] - if obj.do_not_run is False and not obj.job: + if obj.do_not_run is False and not obj.job and node not in root_nodes: parent_nodes = [p['node_object'] for p in self.get_dependents(obj)] if self._are_all_nodes_dnr_decided(parent_nodes): - if obj.unified_job_template is None: - obj.do_not_run = True - nodes_marked_do_not_run.append(node) - elif node not in root_nodes and self._should_mark_node_dnr(node, parent_nodes): + if self._should_mark_node_dnr(node, parent_nodes): obj.do_not_run = True nodes_marked_do_not_run.append(node) diff --git a/awx/main/tests/unit/scheduler/test_dag_workflow.py b/awx/main/tests/unit/scheduler/test_dag_workflow.py index de436bf15f..81b304e264 100644 --- a/awx/main/tests/unit/scheduler/test_dag_workflow.py +++ b/awx/main/tests/unit/scheduler/test_dag_workflow.py @@ -1,6 +1,9 @@ import pytest import uuid +from django.utils.translation import ugettext_lazy as _ +from django.utils.encoding import smart_text + from awx.main.scheduler.dag_workflow import WorkflowDAG @@ -22,7 +25,7 @@ def wf_node_generator(mocker): pytest.count = 0 def fn(**kwargs): - wfn = WorkflowNode(id=pytest.count, **kwargs) + wfn = WorkflowNode(id=pytest.count, unified_job_template=object(), **kwargs) pytest.count += 1 return wfn return fn @@ -31,7 +34,7 @@ def wf_node_generator(mocker): @pytest.fixture def workflow_dag_1(wf_node_generator): g = WorkflowDAG() - nodes = [wf_node_generator(unified_job_template=object()) for i in range(4)] + nodes = [wf_node_generator() for i in range(4)] map(lambda n: g.add_node(n), nodes) r''' @@ -85,8 +88,6 @@ class TestWorkflowDAG(): class TestDNR(): def test_mark_dnr_nodes(self, workflow_dag_1): (g, nodes) = workflow_dag_1 - for n in nodes: - n.unified_job_template = object() r''' S0 @@ -132,8 +133,6 @@ class TestIsWorkflowDone(): @pytest.fixture def workflow_dag_2(self, workflow_dag_1): (g, nodes) = workflow_dag_1 - for n in nodes: - n.unified_job_template = uuid.uuid4() r''' S0 /\ @@ -184,7 +183,7 @@ class TestIsWorkflowDone(): @pytest.fixture def workflow_dag_canceled(self, wf_node_generator): g = WorkflowDAG() - nodes = [wf_node_generator(unified_job_template=object()) for i in range(1)] + nodes = [wf_node_generator() for i in range(1)] map(lambda n: g.add_node(n), nodes) r''' F0 @@ -207,7 +206,9 @@ class TestIsWorkflowDone(): (g, nodes) = workflow_dag_failed assert g.is_workflow_done() is True - assert g.has_workflow_failed() == (True, "Workflow job node {} has a status of 'failed' without an error handler path".format(nodes[2].id)) + assert g.has_workflow_failed() == \ + (True, smart_text(_("No error handle path for workflow job node(s) [({},{})] workflow job node(s)" + " missing unified job template and error handle path [].").format(nodes[2].id, nodes[2].job.status))) def test_is_workflow_done_no_unified_job_tempalte_end(self, workflow_dag_failed): (g, nodes) = workflow_dag_failed @@ -216,8 +217,8 @@ class TestIsWorkflowDone(): assert g.is_workflow_done() is True assert g.has_workflow_failed() == \ - (True, "Workflow job node {} related unified job template missing" - " and is without an error handle path".format(nodes[2].id)) + (True, smart_text(_("No error handle path for workflow job node(s) [] workflow job node(s) missing" + " unified job template and error handle path [{}].").format(nodes[2].id))) def test_is_workflow_done_no_unified_job_tempalte_begin(self, workflow_dag_1): (g, nodes) = workflow_dag_1 @@ -227,25 +228,29 @@ class TestIsWorkflowDone(): assert g.is_workflow_done() is True assert g.has_workflow_failed() == \ - (True, "Workflow job node {} related unified job template missing" - " and is without an error handle path".format(nodes[0].id)) + (True, smart_text(_("No error handle path for workflow job node(s) [] workflow job node(s) missing" + " unified job template and error handle path [{}].").format(nodes[0].id))) def test_canceled_should_fail(self, workflow_dag_canceled): (g, nodes) = workflow_dag_canceled - assert g.has_workflow_failed() == (True, "Workflow job node {} has a status of 'canceled' without an error handler path".format(nodes[0].id)) + assert g.has_workflow_failed() == \ + (True, smart_text(_("No error handle path for workflow job node(s) [({},{})] workflow job node(s)" + " missing unified job template and error handle path [].").format(nodes[0].id, nodes[0].job.status))) def test_failure_should_fail(self, workflow_dag_failure): (g, nodes) = workflow_dag_failure - assert g.has_workflow_failed() == (True, "Workflow job node {} has a status of 'failed' without an error handler path".format(nodes[0].id)) + assert g.has_workflow_failed() == \ + (True, smart_text(_("No error handle path for workflow job node(s) [({},{})] workflow job node(s)" + " missing unified job template and error handle path [].").format(nodes[0].id, nodes[0].job.status))) class TestBFSNodesToRun(): @pytest.fixture def workflow_dag_canceled(self, wf_node_generator): g = WorkflowDAG() - nodes = [wf_node_generator(unified_job_template=object()) for i in range(4)] + nodes = [wf_node_generator() for i in range(4)] map(lambda n: g.add_node(n), nodes) r''' C0 @@ -262,5 +267,6 @@ class TestBFSNodesToRun(): def test_cancel_still_runs_children(self, workflow_dag_canceled): (g, nodes) = workflow_dag_canceled + g.mark_dnr_nodes() assert set([nodes[1], nodes[2]]) == set(g.bfs_nodes_to_run())