made marking nodes as DNR more 'eager', added more unit tests, and added convergence check to bfs_nodes_to_run with new changes to the eagerness of DNR marking since it needs it to prevent convergence nodes from running too quickly

This commit is contained in:
Rebeccah 2020-01-29 15:05:57 -05:00
parent 2de37ce5df
commit 4e787cc079
2 changed files with 268 additions and 22 deletions

View File

@ -80,14 +80,14 @@ class WorkflowDAG(SimpleDAG):
node_ids_visited.add(obj.id)
if obj.do_not_run is True:
continue
elif obj.job:
elif obj.job and not obj.all_parents_must_converge:
if obj.job.status in ['failed', 'error', 'canceled']:
nodes.extend(self.get_children(obj, 'failure_nodes') +
self.get_children(obj, 'always_nodes'))
elif obj.job.status == 'successful':
nodes.extend(self.get_children(obj, 'success_nodes') +
self.get_children(obj, 'always_nodes'))
elif obj.unified_job_template is None:
elif obj.unified_job_template is None and not obj.all_parents_must_converge:
nodes.extend(self.get_children(obj, 'failure_nodes') +
self.get_children(obj, 'always_nodes'))
else:
@ -204,6 +204,15 @@ class WorkflowDAG(SimpleDAG):
return False
return True
r'''
determine if the current node is a convergence node by checking if all the
parents are finished then checking to see if all parents meet the needed
path criteria to run the convergence child.
(i.e. parent must fail, parent must succeed, etc. to proceed)
Return a list object
'''
def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes_marked_do_not_run = []
@ -212,28 +221,24 @@ class WorkflowDAG(SimpleDAG):
obj = node['node_object']
parent_nodes = [p['node_object'] for p in self.get_parents(obj)]
if not obj.do_not_run and not obj.job and node not in root_nodes:
if self._are_all_nodes_dnr_decided(parent_nodes):
# if the current node is a convergence node and all the
# parents are finished then check to see if all parents
# met the needed criteria to run the convergence child
# (i.e. parent must fail, parent must succeed)
if obj.all_parents_must_converge:
if self._are_relevant_parents_finished(node):
if any(p.do_not_run for p in parent_nodes):
if obj.all_parents_must_converge:
if any(p.do_not_run for p in parent_nodes):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)
continue
for p in parent_nodes:
if p.job and p.job.status in ["successful", "failed"]:
if p.job and p.job.status == "successful":
status = "success_nodes"
elif p.job and p.job.status == "failed":
status = "failure_nodes"
if (p not in [node['node_object'] for node in self.get_parents(obj, status)]
and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)
else:
for p in parent_nodes:
if p.job.status == "successful":
status = "success_nodes"
elif p.job.status == "failed":
status = "failure_nodes"
if (p not in [node['node_object'] for node in self.get_parents(obj, status)]
and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)
break
else:
break
else:
if self._are_all_nodes_dnr_decided(parent_nodes):
if self._should_mark_node_dnr(node, parent_nodes):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)

View File

@ -133,6 +133,247 @@ class TestDNR():
assert 1 == len(do_not_run_nodes)
assert nodes[3] == do_not_run_nodes[0]
class TestAllWorkflowNodes():
# test workflow convergence is functioning as expected
@pytest.fixture
def simple_all_convergence(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(4)]
for n in nodes:
g.add_node(n)
r'''
0
/\
S / \ S
/ \
1 2
\ /
F \ / S
\/
3
'''
g.add_edge(nodes[0], nodes[1], "success_nodes")
g.add_edge(nodes[0], nodes[2], "success_nodes")
g.add_edge(nodes[1], nodes[3], "failure_nodes")
g.add_edge(nodes[2], nodes[3], "success_nodes")
nodes[3].all_parents_must_converge = True
nodes[0].job = Job(status='successful')
nodes[1].job = Job(status='failed')
nodes[2].job = Job(status='successful')
return (g, nodes)
def test_simple_all_convergence(self, simple_all_convergence):
(g, nodes) = simple_all_convergence
dnr_nodes = g.mark_dnr_nodes()
assert 0 == len(dnr_nodes), "no nodes should be marked DNR"
nodes_to_run = g.bfs_nodes_to_run()
assert 1 == len(nodes_to_run), "Node 3, and only node 3, should be chosen to run"
assert nodes[3] == nodes_to_run[0], "Only node 3 should be chosen to run"
@pytest.fixture
def workflow_all_converge_1(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(3)]
for n in nodes:
g.add_node(n)
r'''
0
|\ F
| \
S| 1
| /
|/ A
2
'''
g.add_edge(nodes[0], nodes[1], "failure_nodes")
g.add_edge(nodes[0], nodes[2], "success_nodes")
g.add_edge(nodes[1], nodes[2], "always_nodes")
nodes[2].all_parents_must_converge = True
nodes[0].job = Job(status='successful')
return (g, nodes)
def test_all_converge_edge_case_1(self, workflow_all_converge_1):
(g, nodes) = workflow_all_converge_1
dnr_nodes = g.mark_dnr_nodes()
assert 2 == len(dnr_nodes), "node[1] and node[2] should be marked DNR"
assert nodes[1] == dnr_nodes[0], "Node 1 should be marked DNR"
assert nodes[2] == dnr_nodes[1], "Node 2 should be marked DNR"
nodes_to_run = g.bfs_nodes_to_run()
assert 0 == len(nodes_to_run), "No nodes should be chosen to run"
@pytest.fixture
def workflow_all_converge_2(self, wf_node_generator):
"""The ordering of _1 and this test, _2, is _slightly_ different.
The hope is that topological sorting results in 2 being processed before 3
and/or 3 before 2.
"""
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(3)]
for n in nodes:
g.add_node(n)
r'''
0
|\ S
| \
F| 1
| /
|/ A
2
'''
g.add_edge(nodes[0], nodes[1], "success_nodes")
g.add_edge(nodes[0], nodes[2], "failure_nodes")
g.add_edge(nodes[1], nodes[2], "always_nodes")
nodes[2].all_parents_must_converge = True
nodes[0].job = Job(status='successful')
return (g, nodes)
def test_all_converge_edge_case_2(self, workflow_all_converge_2):
(g, nodes) = workflow_all_converge_2
dnr_nodes = g.mark_dnr_nodes()
assert 1 == len(dnr_nodes), "1 and only 1 node should be marked DNR"
assert nodes[2] == dnr_nodes[0], "Node 3 should be marked DNR"
nodes_to_run = g.bfs_nodes_to_run()
assert 1 == len(nodes_to_run), "Node 2, and only node 2, should be chosen to run"
assert nodes[1] == nodes_to_run[0], "Only node 2 should be chosen to run"
@pytest.fixture
def workflow_all_converge_will_run(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(4)]
for n in nodes:
g.add_node(n)
r'''
0 1 2
S \ F | / S
\ | /
\ | /
\|/
|
3
'''
g.add_edge(nodes[0], nodes[3], "success_nodes")
g.add_edge(nodes[1], nodes[3], "failure_nodes")
g.add_edge(nodes[2], nodes[3], "success_nodes")
nodes[3].all_parents_must_converge = True
nodes[0].job = Job(status='successful')
nodes[1].job = Job(status='failed')
nodes[2].job = Job(status='running')
return (g, nodes)
def test_workflow_all_converge_will_run(self, workflow_all_converge_will_run):
(g, nodes) = workflow_all_converge_will_run
dnr_nodes = g.mark_dnr_nodes()
assert 0 == len(dnr_nodes), "No nodes should get marked DNR"
nodes_to_run = g.bfs_nodes_to_run()
assert 0 == len(nodes_to_run), "No nodes should run yet"
nodes[2].job.status = 'successful'
nodes_to_run = g.bfs_nodes_to_run()
assert 1 == len(nodes_to_run), "1 and only 1 node should want to run"
assert nodes[3] == nodes_to_run[0], "Convergence node should be chosen to run"
@pytest.fixture
def workflow_all_converge_dnr(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(4)]
for n in nodes:
g.add_node(n)
r'''
0 1 2
S \ F | / F
\ | /
\ | /
\|/
|
3
'''
g.add_edge(nodes[0], nodes[3], "success_nodes")
g.add_edge(nodes[1], nodes[3], "failure_nodes")
g.add_edge(nodes[2], nodes[3], "failure_nodes")
nodes[3].all_parents_must_converge = True
nodes[0].job = Job(status='successful')
nodes[1].job = Job(status='running')
nodes[2].job = Job(status='failure')
return (g, nodes)
def test_workflow_all_converge_will_run(self, workflow_all_converge_dnr):
(g, nodes) = workflow_all_converge_dnr
dnr_nodes = g.mark_dnr_nodes()
assert 0 == len(dnr_nodes), "No nodes should get marked DNR"
nodes_to_run = g.bfs_nodes_to_run()
assert 0 == len(nodes_to_run), "No nodes should run yet"
# Another tick of the scheduler
nodes[1].job.status = 'successful'
dnr_nodes = g.mark_dnr_nodes()
assert 1 == len(dnr_nodes), "1 and only 1 node should be marked DNR"
assert nodes[3] == dnr_nodes[0], "Convergence node should be marked DNR"
nodes_to_run = g.bfs_nodes_to_run()
assert 0 == len(nodes_to_run), "Convergence node should NOT be chosen to run because it is DNR"
@pytest.fixture
def workflow_all_converge_deep_dnr_tree(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(7)]
for n in nodes:
g.add_node(n)
r'''
0 1 2
\ | /
S \ S| / F
\ | /
\|/
|
3
/\
S / \ S
/ \
4| | 5
\ /
S \ / S
\/
6
'''
g.add_edge(nodes[0], nodes[3], "success_nodes")
g.add_edge(nodes[1], nodes[3], "success_nodes")
g.add_edge(nodes[2], nodes[3], "failure_nodes")
g.add_edge(nodes[3], nodes[4], "success_nodes")
g.add_edge(nodes[3], nodes[5], "success_nodes")
g.add_edge(nodes[4], nodes[6], "success_nodes")
g.add_edge(nodes[5], nodes[6], "success_nodes")
nodes[3].all_parents_must_converge = True
nodes[4].all_parents_must_converge = True
nodes[5].all_parents_must_converge = True
nodes[6].all_parents_must_converge = True
nodes[0].job = Job(status='successful')
nodes[1].job = Job(status='successful')
nodes[2].job = Job(status='successful')
return (g, nodes)
def test_workflow_all_converge_deep_dnr_tree(self, workflow_all_converge_deep_dnr_tree):
(g, nodes) = workflow_all_converge_deep_dnr_tree
dnr_nodes = g.mark_dnr_nodes()
assert 4 == len(dnr_nodes), "All nodes w/ no jobs should be marked DNR"
assert nodes[3] in dnr_nodes
assert nodes[4] in dnr_nodes
assert nodes[5] in dnr_nodes
assert nodes[6] in dnr_nodes
nodes_to_run = g.bfs_nodes_to_run()
assert 0 == len(nodes_to_run), "All non-run nodes should be DNR and NOT candidates to run"
class TestIsWorkflowDone():
@pytest.fixture