From 4e787cc0798cda1d79a35e1d1446cee975ccd3da Mon Sep 17 00:00:00 2001 From: Rebeccah Date: Wed, 29 Jan 2020 15:05:57 -0500 Subject: [PATCH] made marking nodes as DNR more 'eager', added more unit tests, and added convergence check to bfs_nodes_to_run with new changes to the eagerness of DNR marking since it needs it to prevent convergence nodes from running too quickly --- awx/main/scheduler/dag_workflow.py | 49 ++-- .../tests/unit/scheduler/test_dag_workflow.py | 241 ++++++++++++++++++ 2 files changed, 268 insertions(+), 22 deletions(-) diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 6153ffcbdc..0bbc28d97a 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -80,14 +80,14 @@ class WorkflowDAG(SimpleDAG): node_ids_visited.add(obj.id) if obj.do_not_run is True: continue - elif obj.job: + elif obj.job and not obj.all_parents_must_converge: if obj.job.status in ['failed', 'error', 'canceled']: nodes.extend(self.get_children(obj, 'failure_nodes') + self.get_children(obj, 'always_nodes')) elif obj.job.status == 'successful': nodes.extend(self.get_children(obj, 'success_nodes') + self.get_children(obj, 'always_nodes')) - elif obj.unified_job_template is None: + elif obj.unified_job_template is None and not obj.all_parents_must_converge: nodes.extend(self.get_children(obj, 'failure_nodes') + self.get_children(obj, 'always_nodes')) else: @@ -204,6 +204,15 @@ class WorkflowDAG(SimpleDAG): return False return True + + r''' + determine if the current node is a convergence node by checking if all the + parents are finished then checking to see if all parents meet the needed + path criteria to run the convergence child. + (i.e. parent must fail, parent must succeed, etc. to proceed) + + Return a list object + ''' def mark_dnr_nodes(self): root_nodes = self.get_root_nodes() nodes_marked_do_not_run = [] @@ -212,28 +221,24 @@ class WorkflowDAG(SimpleDAG): obj = node['node_object'] parent_nodes = [p['node_object'] for p in self.get_parents(obj)] if not obj.do_not_run and not obj.job and node not in root_nodes: - if self._are_all_nodes_dnr_decided(parent_nodes): - # if the current node is a convergence node and all the - # parents are finished then check to see if all parents - # met the needed criteria to run the convergence child - # (i.e. parent must fail, parent must succeed) - if obj.all_parents_must_converge: - if self._are_relevant_parents_finished(node): - if any(p.do_not_run for p in parent_nodes): + if obj.all_parents_must_converge: + if any(p.do_not_run for p in parent_nodes): + obj.do_not_run = True + nodes_marked_do_not_run.append(node) + continue + for p in parent_nodes: + if p.job and p.job.status in ["successful", "failed"]: + if p.job and p.job.status == "successful": + status = "success_nodes" + elif p.job and p.job.status == "failed": + status = "failure_nodes" + if (p not in [node['node_object'] for node in self.get_parents(obj, status)] + and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]): obj.do_not_run = True nodes_marked_do_not_run.append(node) - else: - for p in parent_nodes: - if p.job.status == "successful": - status = "success_nodes" - elif p.job.status == "failed": - status = "failure_nodes" - if (p not in [node['node_object'] for node in self.get_parents(obj, status)] - and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]): - obj.do_not_run = True - nodes_marked_do_not_run.append(node) - break - else: + break + else: + if self._are_all_nodes_dnr_decided(parent_nodes): if self._should_mark_node_dnr(node, parent_nodes): obj.do_not_run = True nodes_marked_do_not_run.append(node) diff --git a/awx/main/tests/unit/scheduler/test_dag_workflow.py b/awx/main/tests/unit/scheduler/test_dag_workflow.py index 6a1d81386d..459f18111e 100644 --- a/awx/main/tests/unit/scheduler/test_dag_workflow.py +++ b/awx/main/tests/unit/scheduler/test_dag_workflow.py @@ -133,6 +133,247 @@ class TestDNR(): assert 1 == len(do_not_run_nodes) assert nodes[3] == do_not_run_nodes[0] +class TestAllWorkflowNodes(): + # test workflow convergence is functioning as expected + @pytest.fixture + def simple_all_convergence(self, wf_node_generator): + g = WorkflowDAG() + nodes = [wf_node_generator() for i in range(4)] + for n in nodes: + g.add_node(n) + + r''' + 0 + /\ + S / \ S + / \ + 1 2 + \ / + F \ / S + \/ + 3 + + ''' + g.add_edge(nodes[0], nodes[1], "success_nodes") + g.add_edge(nodes[0], nodes[2], "success_nodes") + g.add_edge(nodes[1], nodes[3], "failure_nodes") + g.add_edge(nodes[2], nodes[3], "success_nodes") + nodes[3].all_parents_must_converge = True + nodes[0].job = Job(status='successful') + nodes[1].job = Job(status='failed') + nodes[2].job = Job(status='successful') + return (g, nodes) + + def test_simple_all_convergence(self, simple_all_convergence): + (g, nodes) = simple_all_convergence + dnr_nodes = g.mark_dnr_nodes() + assert 0 == len(dnr_nodes), "no nodes should be marked DNR" + + nodes_to_run = g.bfs_nodes_to_run() + assert 1 == len(nodes_to_run), "Node 3, and only node 3, should be chosen to run" + assert nodes[3] == nodes_to_run[0], "Only node 3 should be chosen to run" + + @pytest.fixture + def workflow_all_converge_1(self, wf_node_generator): + g = WorkflowDAG() + nodes = [wf_node_generator() for i in range(3)] + for n in nodes: + g.add_node(n) + r''' + 0 + |\ F + | \ + S| 1 + | / + |/ A + 2 + ''' + g.add_edge(nodes[0], nodes[1], "failure_nodes") + g.add_edge(nodes[0], nodes[2], "success_nodes") + g.add_edge(nodes[1], nodes[2], "always_nodes") + nodes[2].all_parents_must_converge = True + nodes[0].job = Job(status='successful') + return (g, nodes) + + def test_all_converge_edge_case_1(self, workflow_all_converge_1): + (g, nodes) = workflow_all_converge_1 + dnr_nodes = g.mark_dnr_nodes() + assert 2 == len(dnr_nodes), "node[1] and node[2] should be marked DNR" + assert nodes[1] == dnr_nodes[0], "Node 1 should be marked DNR" + assert nodes[2] == dnr_nodes[1], "Node 2 should be marked DNR" + + nodes_to_run = g.bfs_nodes_to_run() + assert 0 == len(nodes_to_run), "No nodes should be chosen to run" + + @pytest.fixture + def workflow_all_converge_2(self, wf_node_generator): + """The ordering of _1 and this test, _2, is _slightly_ different. + The hope is that topological sorting results in 2 being processed before 3 + and/or 3 before 2. + """ + g = WorkflowDAG() + nodes = [wf_node_generator() for i in range(3)] + for n in nodes: + g.add_node(n) + r''' + 0 + |\ S + | \ + F| 1 + | / + |/ A + 2 + ''' + g.add_edge(nodes[0], nodes[1], "success_nodes") + g.add_edge(nodes[0], nodes[2], "failure_nodes") + g.add_edge(nodes[1], nodes[2], "always_nodes") + nodes[2].all_parents_must_converge = True + nodes[0].job = Job(status='successful') + return (g, nodes) + + def test_all_converge_edge_case_2(self, workflow_all_converge_2): + (g, nodes) = workflow_all_converge_2 + dnr_nodes = g.mark_dnr_nodes() + assert 1 == len(dnr_nodes), "1 and only 1 node should be marked DNR" + assert nodes[2] == dnr_nodes[0], "Node 3 should be marked DNR" + + nodes_to_run = g.bfs_nodes_to_run() + assert 1 == len(nodes_to_run), "Node 2, and only node 2, should be chosen to run" + assert nodes[1] == nodes_to_run[0], "Only node 2 should be chosen to run" + + @pytest.fixture + def workflow_all_converge_will_run(self, wf_node_generator): + g = WorkflowDAG() + nodes = [wf_node_generator() for i in range(4)] + for n in nodes: + g.add_node(n) + r''' + 0 1 2 + S \ F | / S + \ | / + \ | / + \|/ + | + 3 + ''' + g.add_edge(nodes[0], nodes[3], "success_nodes") + g.add_edge(nodes[1], nodes[3], "failure_nodes") + g.add_edge(nodes[2], nodes[3], "success_nodes") + nodes[3].all_parents_must_converge = True + + nodes[0].job = Job(status='successful') + nodes[1].job = Job(status='failed') + nodes[2].job = Job(status='running') + return (g, nodes) + + def test_workflow_all_converge_will_run(self, workflow_all_converge_will_run): + (g, nodes) = workflow_all_converge_will_run + dnr_nodes = g.mark_dnr_nodes() + assert 0 == len(dnr_nodes), "No nodes should get marked DNR" + + nodes_to_run = g.bfs_nodes_to_run() + assert 0 == len(nodes_to_run), "No nodes should run yet" + + nodes[2].job.status = 'successful' + nodes_to_run = g.bfs_nodes_to_run() + assert 1 == len(nodes_to_run), "1 and only 1 node should want to run" + assert nodes[3] == nodes_to_run[0], "Convergence node should be chosen to run" + + @pytest.fixture + def workflow_all_converge_dnr(self, wf_node_generator): + g = WorkflowDAG() + nodes = [wf_node_generator() for i in range(4)] + for n in nodes: + g.add_node(n) + r''' + 0 1 2 + S \ F | / F + \ | / + \ | / + \|/ + | + 3 + ''' + g.add_edge(nodes[0], nodes[3], "success_nodes") + g.add_edge(nodes[1], nodes[3], "failure_nodes") + g.add_edge(nodes[2], nodes[3], "failure_nodes") + nodes[3].all_parents_must_converge = True + + nodes[0].job = Job(status='successful') + nodes[1].job = Job(status='running') + nodes[2].job = Job(status='failure') + return (g, nodes) + + def test_workflow_all_converge_will_run(self, workflow_all_converge_dnr): + (g, nodes) = workflow_all_converge_dnr + dnr_nodes = g.mark_dnr_nodes() + assert 0 == len(dnr_nodes), "No nodes should get marked DNR" + + nodes_to_run = g.bfs_nodes_to_run() + assert 0 == len(nodes_to_run), "No nodes should run yet" + + # Another tick of the scheduler + nodes[1].job.status = 'successful' + dnr_nodes = g.mark_dnr_nodes() + assert 1 == len(dnr_nodes), "1 and only 1 node should be marked DNR" + assert nodes[3] == dnr_nodes[0], "Convergence node should be marked DNR" + + nodes_to_run = g.bfs_nodes_to_run() + assert 0 == len(nodes_to_run), "Convergence node should NOT be chosen to run because it is DNR" + + @pytest.fixture + def workflow_all_converge_deep_dnr_tree(self, wf_node_generator): + g = WorkflowDAG() + nodes = [wf_node_generator() for i in range(7)] + for n in nodes: + g.add_node(n) + r''' + 0 1 2 + \ | / + S \ S| / F + \ | / + \|/ + | + 3 + /\ + S / \ S + / \ + 4| | 5 + \ / + S \ / S + \/ + 6 + ''' + g.add_edge(nodes[0], nodes[3], "success_nodes") + g.add_edge(nodes[1], nodes[3], "success_nodes") + g.add_edge(nodes[2], nodes[3], "failure_nodes") + g.add_edge(nodes[3], nodes[4], "success_nodes") + g.add_edge(nodes[3], nodes[5], "success_nodes") + g.add_edge(nodes[4], nodes[6], "success_nodes") + g.add_edge(nodes[5], nodes[6], "success_nodes") + nodes[3].all_parents_must_converge = True + nodes[4].all_parents_must_converge = True + nodes[5].all_parents_must_converge = True + nodes[6].all_parents_must_converge = True + + nodes[0].job = Job(status='successful') + nodes[1].job = Job(status='successful') + nodes[2].job = Job(status='successful') + return (g, nodes) + + def test_workflow_all_converge_deep_dnr_tree(self, workflow_all_converge_deep_dnr_tree): + (g, nodes) = workflow_all_converge_deep_dnr_tree + dnr_nodes = g.mark_dnr_nodes() + + assert 4 == len(dnr_nodes), "All nodes w/ no jobs should be marked DNR" + assert nodes[3] in dnr_nodes + assert nodes[4] in dnr_nodes + assert nodes[5] in dnr_nodes + assert nodes[6] in dnr_nodes + + nodes_to_run = g.bfs_nodes_to_run() + assert 0 == len(nodes_to_run), "All non-run nodes should be DNR and NOT candidates to run" + class TestIsWorkflowDone(): @pytest.fixture