handle missing unified job template in workflow

* Workflow Node without unified_job_template is treated as a job marked
as failure; when deciding what path to execute.
* Remove optimization of marking dnr nodes due to it making the
algorithm incorrect.
This commit is contained in:
chris meyers 2018-11-18 11:25:03 -05:00 committed by mabashian
parent 5b459e3c5d
commit f30f52a0a8
2 changed files with 63 additions and 49 deletions

View File

@ -66,6 +66,10 @@ class WorkflowDAG(SimpleDAG):
if p.do_not_run is True:
continue
# job template relationship deleted, don't run the node and take the failure path
if p.do_not_run is False and not p.job and p.unified_job_template is None:
return True
# Node might run a job
if p.do_not_run is False and not p.job:
return False
@ -86,7 +90,7 @@ class WorkflowDAG(SimpleDAG):
continue
node_ids_visited.add(obj.id)
if obj.do_not_run is True:
if obj.do_not_run is True and obj.unified_job_template:
continue
if obj.job:
@ -96,6 +100,9 @@ class WorkflowDAG(SimpleDAG):
elif obj.job.status == 'successful':
nodes.extend(self.get_dependencies(obj, 'success_nodes') +
self.get_dependencies(obj, 'always_nodes'))
elif obj.unified_job_template is None:
nodes.extend(self.get_dependencies(obj, 'failure_nodes') +
self.get_dependencies(obj, 'always_nodes'))
else:
if self._are_relevant_parents_finished(n):
nodes_found.append(n)
@ -127,16 +134,18 @@ class WorkflowDAG(SimpleDAG):
failed_nodes = []
for node in self.nodes:
obj = node['node_object']
if obj.unified_job_template is None:
return True, "Workflow job node {} related unified job template missing".format(obj.id)
if obj.job and obj.job.status in ['failed', 'canceled', 'error']:
failed_nodes.append(node)
elif obj.do_not_run is True and obj.unified_job_template is None:
failed_nodes.append(node)
for node in failed_nodes:
obj = node['node_object']
if (len(self.get_dependencies(obj, 'failure_nodes')) +
len(self.get_dependencies(obj, 'always_nodes'))) == 0:
return True, "Workflow job node {} has a status of '{}' without an error handler path".format(obj.id, obj.job.status)
if obj.unified_job_template is None:
return True, "Workflow job node {} related unified job template missing and is without an error handle path".format(obj.id)
else:
return True, "Workflow job node {} has a status of '{}' without an error handler path".format(obj.id, obj.job.status)
return False, None
r'''
@ -151,6 +160,8 @@ class WorkflowDAG(SimpleDAG):
'''
def _are_all_nodes_dnr_decided(self, workflow_nodes):
for n in workflow_nodes:
if n.unified_job_template is None and n.do_not_run is False:
return False
if n.do_not_run is False and not n.job:
return False
return True
@ -178,50 +189,30 @@ class WorkflowDAG(SimpleDAG):
return False
else:
return False
elif p.unified_job_template is None:
if node in (self.get_dependencies(p, 'failure_nodes') +
self.get_dependencies(p, 'always_nodes')):
return False
else:
return False
return True
r'''
Useful in a failure scenario. Will mark all nodes that might have run a job
and haven't already run a job as do_not_run=True
Return an array of workflow nodes that were marked do_not_run = True
'''
def _mark_all_remaining_nodes_dnr(self):
objs = []
for node in self.nodes:
obj = node['node_object']
if obj.do_not_run is False and not obj.job:
obj.do_not_run = True
objs.append(obj)
return objs
def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes = copy.copy(root_nodes)
nodes_marked_do_not_run = []
node_ids_visited = set()
for index, n in enumerate(nodes):
obj = n['node_object']
if obj.id in node_ids_visited:
continue
node_ids_visited.add(obj.id)
'''
Special case. On a workflow job template relaunch it's possible for
the job template associated with the job to have been deleted. If
this is the case, fail the workflow job and mark it done.
'''
if obj.unified_job_template is None:
return self._mark_all_remaining_nodes_dnr()
if obj.do_not_run is False and not obj.job and n not in root_nodes:
if obj.do_not_run is False and not obj.job:
parent_nodes = filter(lambda n: not n.do_not_run,
[p['node_object'] for p in self.get_dependents(obj)])
if self._are_all_nodes_dnr_decided(parent_nodes):
if self._should_mark_node_dnr(n, parent_nodes):
if obj.unified_job_template is None:
obj.do_not_run = True
nodes_marked_do_not_run.append(n)
elif n not in root_nodes and self._should_mark_node_dnr(n, parent_nodes):
obj.do_not_run = True
nodes_marked_do_not_run.append(n)

View File

@ -11,7 +11,7 @@ class Job():
class WorkflowNode(object):
def __init__(self, id=None, job=None, do_not_run=False, unified_job_template=None):
self.id = id if id else uuid.uuid4()
self.id = id if id is not None else uuid.uuid4()
self.job = job
self.do_not_run = do_not_run
self.unified_job_template = unified_job_template
@ -19,8 +19,12 @@ class WorkflowNode(object):
@pytest.fixture
def wf_node_generator(mocker):
pytest.count = 0
def fn(**kwargs):
return WorkflowNode(**kwargs)
wfn = WorkflowNode(id=pytest.count, **kwargs)
pytest.count += 1
return wfn
return fn
@ -177,19 +181,6 @@ class TestIsWorkflowDone():
nodes[2].job = Job(status='failed')
return (g, nodes)
def test_is_workflow_done(self, workflow_dag_2):
g = workflow_dag_2[0]
assert g.is_workflow_done() is False
def test_is_workflow_done_failed(self, workflow_dag_failed):
(g, nodes) = workflow_dag_failed
assert g.is_workflow_done() is True
assert g.has_workflow_failed() == (True, "Workflow job node {} has a status of 'failed' without an error handler path".format(nodes[2].id))
class TestHasWorkflowFailed():
@pytest.fixture
def workflow_dag_canceled(self, wf_node_generator):
g = WorkflowDAG()
@ -207,6 +198,38 @@ class TestHasWorkflowFailed():
nodes[0].job.status = 'failed'
return (g, nodes)
def test_done(self, workflow_dag_2):
g = workflow_dag_2[0]
assert g.is_workflow_done() is False
def test_workflow_done_and_failed(self, workflow_dag_failed):
(g, nodes) = workflow_dag_failed
assert g.is_workflow_done() is True
assert g.has_workflow_failed() == (True, "Workflow job node {} has a status of 'failed' without an error handler path".format(nodes[2].id))
def test_is_workflow_done_no_unified_job_tempalte_end(self, workflow_dag_failed):
(g, nodes) = workflow_dag_failed
nodes[2].unified_job_template = None
assert g.is_workflow_done() is True
assert g.has_workflow_failed() == \
(True, "Workflow job node {} related unified job template missing"
" and is without an error handle path".format(nodes[2].id))
def test_is_workflow_done_no_unified_job_tempalte_begin(self, workflow_dag_1):
(g, nodes) = workflow_dag_1
nodes[0].unified_job_template = None
g.mark_dnr_nodes()
assert g.is_workflow_done() is True
assert g.has_workflow_failed() == \
(True, "Workflow job node {} related unified job template missing"
" and is without an error handle path".format(nodes[0].id))
def test_canceled_should_fail(self, workflow_dag_canceled):
(g, nodes) = workflow_dag_canceled