From 1a960229d7b9440f74c105d82a4f0c0438594466 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Tue, 1 Nov 2016 16:35:04 -0400 Subject: [PATCH 1/5] Basic architecture added. --- awx/api/serializers.py | 11 ++++++++--- awx/api/urls.py | 2 +- awx/api/views.py | 7 +++++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 4ae7caf54a..08073a7bc0 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2232,17 +2232,22 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): #res['notifications'] = reverse('api:system_job_notifications_list', args=(obj.pk,)) res['workflow_nodes'] = reverse('api:workflow_job_workflow_nodes_list', args=(obj.pk,)) res['labels'] = reverse('api:workflow_job_label_list', args=(obj.pk,)) - # TODO: Cancel job - ''' if obj.can_cancel or True: res['cancel'] = reverse('api:workflow_job_cancel', args=(obj.pk,)) - ''' return res # TODO: class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer): pass +class WorkflowJobCancelSerializer(WorkflowJobSerializer): + + can_cancel = serializers.BooleanField(read_only=True) + + class Meta: + fields = ('can_cancel',) + + class WorkflowNodeBaseSerializer(BaseSerializer): job_type = serializers.CharField(allow_blank=True, allow_null=True, required=False, default=None) job_tags = serializers.CharField(allow_blank=True, allow_null=True, required=False, default=None) diff --git a/awx/api/urls.py b/awx/api/urls.py index f760506309..bdb9063c39 100644 --- a/awx/api/urls.py +++ b/awx/api/urls.py @@ -273,7 +273,7 @@ workflow_job_urls = patterns('awx.api.views', url(r'^(?P[0-9]+)/$', 'workflow_job_detail'), url(r'^(?P[0-9]+)/workflow_nodes/$', 'workflow_job_workflow_nodes_list'), url(r'^(?P[0-9]+)/labels/$', 'workflow_job_label_list'), -# url(r'^(?P[0-9]+)/cancel/$', 'workflow_job_cancel'), + url(r'^(?P[0-9]+)/cancel/$', 'workflow_job_cancel'), #url(r'^(?P[0-9]+)/notifications/$', 'workflow_job_notifications_list'), ) diff --git a/awx/api/views.py b/awx/api/views.py index d11e394793..ac0d49136d 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2862,6 +2862,13 @@ class WorkflowJobWorkflowNodesList(SubListAPIView): parent_key = 'workflow_job' new_in_310 = True +class WorkflowJobCancel(RetrieveAPIView): + + model = WorkflowJob + serializer_class = WorkflowJobCancelSerializer + is_job_cancel = True + new_in_310 = True + class SystemJobTemplateList(ListAPIView): model = SystemJobTemplate From 3778914aa9a570520ab56c7eac3d53efe7b25528 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Wed, 2 Nov 2016 15:23:13 -0400 Subject: [PATCH 2/5] Implement recursive spawned job cancel. --- awx/api/views.py | 8 ++++++++ awx/main/models/workflow.py | 4 ++++ awx/main/scheduler/__init__.py | 4 +++- awx/main/scheduler/dag_workflow.py | 23 +++++++++++++++++++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/awx/api/views.py b/awx/api/views.py index ac0d49136d..f1047b6bfd 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2869,6 +2869,14 @@ class WorkflowJobCancel(RetrieveAPIView): is_job_cancel = True new_in_310 = True + def post(self, request, *args, **kwargs): + obj = self.get_object() + if obj.can_cancel: + obj.cancel() + return Response(status=status.HTTP_202_ACCEPTED) + else: + return self.http_method_not_allowed(request, *args, **kwargs) + class SystemJobTemplateList(ListAPIView): model = SystemJobTemplate diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 158efabeb4..892209ea50 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -424,3 +424,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow def start_celery_task(self, opts, error_callback, success_callback): return None + def cancel(self): + if self.can_cancel: + self.status = 'canceled' + self.save(update_fields=['status']) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 7c7e9e5ab3..aa0e223420 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -130,7 +130,9 @@ class TaskManager(): def process_finished_workflow_jobs(self, workflow_jobs): for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) - if dag.is_workflow_done(): + if workflow_job.status == 'canceled': + dag.bfs_nodes_to_cancel() + elif dag.is_workflow_done(): if workflow_job._has_failed(): workflow_job.status = 'failed' else: diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 7be2c70489..742c337dc0 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -48,6 +48,29 @@ class WorkflowDAG(SimpleDAG): nodes.extend(children_all) return [n['node_object'] for n in nodes_found] + def bfs_nodes_to_cancel(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + continue + elif job.can_cancel: + job.cancel() + elif job.status == 'failed': + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status == 'successful': + children_success = self.get_dependencies(obj, 'success_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_success + children_always + nodes.extend(children_all) + def is_workflow_done(self): root_nodes = self.get_root_nodes() nodes = root_nodes From 3c0d60075c10049b03354fdcdf121a258298fe2c Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Wed, 2 Nov 2016 17:36:23 -0400 Subject: [PATCH 3/5] Fix bug by using cancel_flag. --- awx/api/views.py | 3 +++ awx/main/models/workflow.py | 5 ----- awx/main/scheduler/__init__.py | 5 ++++- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/awx/api/views.py b/awx/api/views.py index f1047b6bfd..b85d8ab903 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -72,6 +72,7 @@ from awx.api.serializers import * # noqa from awx.api.metadata import RoleMetadata from awx.main.consumers import emit_channel_notification from awx.main.models.unified_jobs import ACTIVE_STATES +#from awx.main.scheduler.tasks import run_job_complete logger = logging.getLogger('awx.api.views') @@ -2873,6 +2874,8 @@ class WorkflowJobCancel(RetrieveAPIView): obj = self.get_object() if obj.can_cancel: obj.cancel() + #TODO: Figure out whether an immediate schedule is needed. + #run_job_complete.delay(obj.id) return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs) diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 892209ea50..ff457fd207 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -423,8 +423,3 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow ''' def start_celery_task(self, opts, error_callback, success_callback): return None - - def cancel(self): - if self.can_cancel: - self.status = 'canceled' - self.save(update_fields=['status']) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index aa0e223420..6237c6eeb7 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -130,8 +130,11 @@ class TaskManager(): def process_finished_workflow_jobs(self, workflow_jobs): for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) - if workflow_job.status == 'canceled': + if workflow_job.cancel_flag: + workflow_job.status = 'canceled' + workflow_job.save() dag.bfs_nodes_to_cancel() + connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) elif dag.is_workflow_done(): if workflow_job._has_failed(): workflow_job.status = 'failed' From 9e244b640b43dcc87ccef9e0d60356ae2bcd56c2 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Thu, 3 Nov 2016 18:31:36 -0400 Subject: [PATCH 4/5] Simplify internal node job cancel mechanism. --- awx/main/scheduler/__init__.py | 2 +- awx/main/scheduler/dag_workflow.py | 17 ++--------------- 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 6237c6eeb7..24b4a07edb 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -133,7 +133,7 @@ class TaskManager(): if workflow_job.cancel_flag: workflow_job.status = 'canceled' workflow_job.save() - dag.bfs_nodes_to_cancel() + dag.cancel_node_jobs() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) elif dag.is_workflow_done(): if workflow_job._has_failed(): diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 742c337dc0..1995434f48 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -48,11 +48,8 @@ class WorkflowDAG(SimpleDAG): nodes.extend(children_all) return [n['node_object'] for n in nodes_found] - def bfs_nodes_to_cancel(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - - for index, n in enumerate(nodes): + def cancel_node_jobs(self): + for n in self.nodes: obj = n['node_object'] job = obj.job @@ -60,16 +57,6 @@ class WorkflowDAG(SimpleDAG): continue elif job.can_cancel: job.cancel() - elif job.status == 'failed': - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status == 'successful': - children_success = self.get_dependencies(obj, 'success_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_success + children_always - nodes.extend(children_all) def is_workflow_done(self): root_nodes = self.get_root_nodes() From a8a3835308623b450ea122a52f00a7d2e643e26f Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Tue, 8 Nov 2016 11:54:06 -0500 Subject: [PATCH 5/5] Apply immediate reschedule. --- awx/api/views.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awx/api/views.py b/awx/api/views.py index b85d8ab903..cc0cb8e100 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -72,7 +72,7 @@ from awx.api.serializers import * # noqa from awx.api.metadata import RoleMetadata from awx.main.consumers import emit_channel_notification from awx.main.models.unified_jobs import ACTIVE_STATES -#from awx.main.scheduler.tasks import run_job_complete +from awx.main.scheduler.tasks import run_job_complete logger = logging.getLogger('awx.api.views') @@ -2875,7 +2875,7 @@ class WorkflowJobCancel(RetrieveAPIView): if obj.can_cancel: obj.cancel() #TODO: Figure out whether an immediate schedule is needed. - #run_job_complete.delay(obj.id) + run_job_complete.delay(obj.id) return Response(status=status.HTTP_202_ACCEPTED) else: return self.http_method_not_allowed(request, *args, **kwargs)