Implement recursive spawned job cancel.

This commit is contained in:
Aaron Tan 2016-11-02 15:23:13 -04:00
parent 1a960229d7
commit 3778914aa9
4 changed files with 38 additions and 1 deletions

View File

@ -2869,6 +2869,14 @@ class WorkflowJobCancel(RetrieveAPIView):
is_job_cancel = True
new_in_310 = True
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class SystemJobTemplateList(ListAPIView):
model = SystemJobTemplate

View File

@ -424,3 +424,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow
def start_celery_task(self, opts, error_callback, success_callback):
return None
def cancel(self):
if self.can_cancel:
self.status = 'canceled'
self.save(update_fields=['status'])

View File

@ -130,7 +130,9 @@ class TaskManager():
def process_finished_workflow_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
dag = WorkflowDAG(workflow_job)
if dag.is_workflow_done():
if workflow_job.status == 'canceled':
dag.bfs_nodes_to_cancel()
elif dag.is_workflow_done():
if workflow_job._has_failed():
workflow_job.status = 'failed'
else:

View File

@ -48,6 +48,29 @@ class WorkflowDAG(SimpleDAG):
nodes.extend(children_all)
return [n['node_object'] for n in nodes_found]
def bfs_nodes_to_cancel(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job:
continue
elif job.can_cancel:
job.cancel()
elif job.status == 'failed':
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status == 'successful':
children_success = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
nodes.extend(children_all)
def is_workflow_done(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes