Simplify internal node job cancel mechanism.

This commit is contained in:
Aaron Tan
2016-11-03 18:31:36 -04:00
parent 3c0d60075c
commit 9e244b640b
2 changed files with 3 additions and 16 deletions

View File

@@ -133,7 +133,7 @@ class TaskManager():
if workflow_job.cancel_flag: if workflow_job.cancel_flag:
workflow_job.status = 'canceled' workflow_job.status = 'canceled'
workflow_job.save() workflow_job.save()
dag.bfs_nodes_to_cancel() dag.cancel_node_jobs()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
elif dag.is_workflow_done(): elif dag.is_workflow_done():
if workflow_job._has_failed(): if workflow_job._has_failed():

View File

@@ -48,11 +48,8 @@ class WorkflowDAG(SimpleDAG):
nodes.extend(children_all) nodes.extend(children_all)
return [n['node_object'] for n in nodes_found] return [n['node_object'] for n in nodes_found]
def bfs_nodes_to_cancel(self): def cancel_node_jobs(self):
root_nodes = self.get_root_nodes() for n in self.nodes:
nodes = root_nodes
for index, n in enumerate(nodes):
obj = n['node_object'] obj = n['node_object']
job = obj.job job = obj.job
@@ -60,16 +57,6 @@ class WorkflowDAG(SimpleDAG):
continue continue
elif job.can_cancel: elif job.can_cancel:
job.cancel() job.cancel()
elif job.status == 'failed':
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status == 'successful':
children_success = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
nodes.extend(children_all)
def is_workflow_done(self): def is_workflow_done(self):
root_nodes = self.get_root_nodes() root_nodes = self.get_root_nodes()