treat canceled nodes as failed when processing wf

* When deciding what jobs to run next, treat canceled as failed.
* Also add tests.
This commit is contained in:
chris meyers 2018-11-15 14:07:11 -05:00 committed by mabashian
parent d6a8ad0b33
commit c1171fe4ff
2 changed files with 62 additions and 1 deletions

View File

@ -71,7 +71,7 @@ class WorkflowDAG(SimpleDAG):
return False
# Node decidedly got a job; check if job is done
if p.job and p.job.status not in ['successful', 'failed', 'error']:
if p.job and p.job.status not in ['successful', 'failed', 'error', 'canceled']:
return False
return True

View File

@ -208,3 +208,64 @@ class TestIsWorkflowDone():
assert g.is_workflow_done() is True
assert g.has_workflow_failed() is True
class TestHasWorkflowFailed():
@pytest.fixture
def WorkflowNodeClass(self):
return WorkflowNodeBase
@pytest.fixture
def workflow_dag_canceled(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(1)]
map(lambda n: g.add_node(n), nodes)
r'''
F0
'''
nodes[0].job = Job(status='canceled')
return (g, nodes)
@pytest.fixture
def workflow_dag_failure(self, workflow_dag_canceled):
(g, nodes) = workflow_dag_canceled
nodes[0].job.status = 'failed'
return (g, nodes)
def test_canceled_should_fail(self, workflow_dag_canceled):
(g, nodes) = workflow_dag_canceled
assert g.has_workflow_failed() is True
def test_failure_should_fail(self, workflow_dag_failure):
(g, nodes) = workflow_dag_failure
assert g.has_workflow_failed() is True
class TestBFSNodesToRun():
@pytest.fixture
def WorkflowNodeClass(self):
return WorkflowNodeDNR
@pytest.fixture
def workflow_dag_canceled(self, wf_node_generator):
g = WorkflowDAG()
nodes = [wf_node_generator() for i in range(4)]
map(lambda n: g.add_node(n), nodes)
r'''
C0
/ | \
F / A| \ S
/ | \
1 2 3
'''
g.add_edge(nodes[0], nodes[1], "failure_nodes")
g.add_edge(nodes[0], nodes[2], "always_nodes")
g.add_edge(nodes[0], nodes[3], "success_nodes")
nodes[0].job = Job(status='canceled')
return (g, nodes)
def test_cancel_still_runs_children(self, workflow_dag_canceled):
(g, nodes) = workflow_dag_canceled
assert set([nodes[1], nodes[2]]) == set(g.bfs_nodes_to_run())