Merge pull request #3848 from jangsutsr/3528_workflow_job_cancel_endpoint

Implement workflow job cancel endpoint
This commit is contained in:
Aaron Tan 2016-11-08 12:16:39 -05:00 committed by GitHub
commit 05790c5b5c
6 changed files with 43 additions and 6 deletions

View File

@ -2232,17 +2232,22 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
#res['notifications'] = reverse('api:system_job_notifications_list', args=(obj.pk,))
res['workflow_nodes'] = reverse('api:workflow_job_workflow_nodes_list', args=(obj.pk,))
res['labels'] = reverse('api:workflow_job_label_list', args=(obj.pk,))
# TODO: Cancel job
'''
if obj.can_cancel or True:
res['cancel'] = reverse('api:workflow_job_cancel', args=(obj.pk,))
'''
return res
# TODO:
class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer):
pass
class WorkflowJobCancelSerializer(WorkflowJobSerializer):
can_cancel = serializers.BooleanField(read_only=True)
class Meta:
fields = ('can_cancel',)
class WorkflowNodeBaseSerializer(BaseSerializer):
job_type = serializers.CharField(allow_blank=True, allow_null=True, required=False, default=None)
job_tags = serializers.CharField(allow_blank=True, allow_null=True, required=False, default=None)

View File

@ -273,7 +273,7 @@ workflow_job_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/$', 'workflow_job_detail'),
url(r'^(?P<pk>[0-9]+)/workflow_nodes/$', 'workflow_job_workflow_nodes_list'),
url(r'^(?P<pk>[0-9]+)/labels/$', 'workflow_job_label_list'),
# url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_cancel'),
url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_cancel'),
#url(r'^(?P<pk>[0-9]+)/notifications/$', 'workflow_job_notifications_list'),
)

View File

@ -72,6 +72,7 @@ from awx.api.serializers import * # noqa
from awx.api.metadata import RoleMetadata
from awx.main.consumers import emit_channel_notification
from awx.main.models.unified_jobs import ACTIVE_STATES
from awx.main.scheduler.tasks import run_job_complete
logger = logging.getLogger('awx.api.views')
@ -2862,6 +2863,23 @@ class WorkflowJobWorkflowNodesList(SubListAPIView):
parent_key = 'workflow_job'
new_in_310 = True
class WorkflowJobCancel(RetrieveAPIView):
model = WorkflowJob
serializer_class = WorkflowJobCancelSerializer
is_job_cancel = True
new_in_310 = True
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.can_cancel:
obj.cancel()
#TODO: Figure out whether an immediate schedule is needed.
run_job_complete.delay(obj.id)
return Response(status=status.HTTP_202_ACCEPTED)
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class SystemJobTemplateList(ListAPIView):
model = SystemJobTemplate

View File

@ -423,4 +423,3 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow
'''
def start_celery_task(self, opts, error_callback, success_callback):
return None

View File

@ -130,7 +130,12 @@ class TaskManager():
def process_finished_workflow_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
dag = WorkflowDAG(workflow_job)
if dag.is_workflow_done():
if workflow_job.cancel_flag:
workflow_job.status = 'canceled'
workflow_job.save()
dag.cancel_node_jobs()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
elif dag.is_workflow_done():
if workflow_job._has_failed():
workflow_job.status = 'failed'
else:

View File

@ -48,6 +48,16 @@ class WorkflowDAG(SimpleDAG):
nodes.extend(children_all)
return [n['node_object'] for n in nodes_found]
def cancel_node_jobs(self):
for n in self.nodes:
obj = n['node_object']
job = obj.job
if not job:
continue
elif job.can_cancel:
job.cancel()
def is_workflow_done(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes