detect workflow nodes without job templates

* Fail workflow job run when encountering a Workflow Job Nodes with
no related job templates.
This commit is contained in:
chris meyers 2018-11-16 13:37:11 -05:00 committed by mabashian
parent 72263c5c7b
commit 00d71cea50
3 changed files with 48 additions and 41 deletions

View File

@ -127,6 +127,9 @@ class WorkflowDAG(SimpleDAG):
failed_nodes = []
for node in self.nodes:
obj = node['node_object']
if obj.unified_job_template is None:
return True
if obj.job and obj.job.status in ['failed', 'canceled', 'error']:
failed_nodes.append(node)
for node in failed_nodes:
@ -179,6 +182,21 @@ class WorkflowDAG(SimpleDAG):
return False
return True
r'''
Useful in a failure scenario. Will mark all nodes that might have run a job
and haven't already run a job as do_not_run=True
Return an array of workflow nodes that were marked do_not_run = True
'''
def _mark_all_remaining_nodes_dnr(self):
objs = []
for node in self.nodes:
obj = node['node_object']
if obj.do_not_run is False and not obj.job:
obj.do_not_run = True
objs.append(obj)
return objs
def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes = copy.copy(root_nodes)
@ -191,6 +209,14 @@ class WorkflowDAG(SimpleDAG):
continue
node_ids_visited.add(obj.id)
'''
Special case. On a workflow job template relaunch it's possible for
the job template associated with the job to have been deleted. If
this is the case, fail the workflow job and mark it done.
'''
if obj.unified_job_template is None:
return self._mark_all_remaining_nodes_dnr()
if obj.do_not_run is False and not obj.job and n not in root_nodes:
parent_nodes = filter(lambda n: not n.do_not_run,
[p['node_object'] for p in self.get_dependents(obj)])

View File

@ -64,7 +64,9 @@ class TestWorkflowDAGFunctional(TransactionTestCase):
def test_workflow_done(self):
wfj = self.workflow_job(states=['failed', None, None, 'successful', None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
assert 3 == len(dag.mark_dnr_nodes())
is_done = dag.is_workflow_done()
has_failed = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertFalse(has_failed)
@ -73,28 +75,36 @@ class TestWorkflowDAGFunctional(TransactionTestCase):
jt.delete()
relaunched = wfj.create_relaunch_workflow_job()
dag = WorkflowDAG(workflow_job=relaunched)
is_done, has_failed = dag.is_workflow_done()
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_fails_for_no_error_handler(self):
wfj = self.workflow_job(states=['successful', 'failed', None, None, None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_fails_leaf(self):
wfj = self.workflow_job(states=['successful', 'successful', 'failed', None, None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed = dag.has_workflow_failed()
self.assertTrue(is_done)
self.assertTrue(has_failed)
def test_workflow_not_finished(self):
wfj = self.workflow_job(states=['new', None, None, None, None])
dag = WorkflowDAG(workflow_job=wfj)
is_done, has_failed = dag.is_workflow_done()
dag.mark_dnr_nodes()
is_done = dag.is_workflow_done()
has_failed = dag.has_workflow_failed()
self.assertFalse(is_done)
self.assertFalse(has_failed)

View File

@ -9,33 +9,18 @@ class Job():
self.status = status
class WorkflowNodeBase(object):
def __init__(self, id=None, job=None):
class WorkflowNode(object):
def __init__(self, id=None, job=None, do_not_run=False, unified_job_template=None):
self.id = id if id else uuid.uuid4()
self.job = job
class WorkflowNodeDNR(WorkflowNodeBase):
def __init__(self, do_not_run=False, **kwargs):
super(WorkflowNodeDNR, self).__init__(**kwargs)
self.do_not_run = do_not_run
class WorkflowNodeUJT(WorkflowNodeDNR):
def __init__(self, unified_job_template=None, **kwargs):
super(WorkflowNodeUJT, self).__init__(**kwargs)
self.unified_job_template = unified_job_template
@pytest.fixture
def WorkflowNodeClass():
return WorkflowNodeBase
@pytest.fixture
def wf_node_generator(mocker, WorkflowNodeClass):
def wf_node_generator(mocker):
def fn(**kwargs):
return WorkflowNodeClass(**kwargs)
return WorkflowNode(**kwargs)
return fn
@ -94,12 +79,10 @@ class TestWorkflowDAG():
class TestDNR():
@pytest.fixture
def WorkflowNodeClass(self):
return WorkflowNodeDNR
def test_mark_dnr_nodes(self, workflow_dag_1):
(g, nodes) = workflow_dag_1
for n in nodes:
n.unified_job_template = object()
r'''
S0
@ -142,10 +125,6 @@ class TestDNR():
class TestIsWorkflowDone():
@pytest.fixture
def WorkflowNodeClass(self):
return WorkflowNodeUJT
@pytest.fixture
def workflow_dag_2(self, workflow_dag_1):
(g, nodes) = workflow_dag_1
@ -211,10 +190,6 @@ class TestIsWorkflowDone():
class TestHasWorkflowFailed():
@pytest.fixture
def WorkflowNodeClass(self):
return WorkflowNodeBase
@pytest.fixture
def workflow_dag_canceled(self, wf_node_generator):
g = WorkflowDAG()
@ -244,14 +219,10 @@ class TestHasWorkflowFailed():
class TestBFSNodesToRun():
@pytest.fixture
def WorkflowNodeClass(self):
return WorkflowNodeDNR
@pytest.fixture
def workflow_dag_canceled(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(4)]
nodes = [wf_node_generator(unified_job_template=object()) for i in range(4)]
map(lambda n: g.add_node(n), nodes)
r'''
C0