Merge pull request #3915 from jangsutsr/3516_implement_workflow_notifications

Implement workflow notifications
This commit is contained in:
Aaron Tan 2016-11-10 11:05:02 -05:00 committed by GitHub
commit ece29737ac
5 changed files with 80 additions and 14 deletions

View File

@ -2201,10 +2201,9 @@ class WorkflowJobTemplateSerializer(LabelsListMixin, UnifiedJobTemplateSerialize
launch = reverse('api:workflow_job_template_launch', args=(obj.pk,)),
workflow_nodes = reverse('api:workflow_job_template_workflow_nodes_list', args=(obj.pk,)),
labels = reverse('api:workflow_job_template_label_list', args=(obj.pk,)),
# TODO: Implement notifications
#notification_templates_any = reverse('api:system_job_template_notification_templates_any_list', args=(obj.pk,)),
#notification_templates_success = reverse('api:system_job_template_notification_templates_success_list', args=(obj.pk,)),
#notification_templates_error = reverse('api:system_job_template_notification_templates_error_list', args=(obj.pk,)),
notification_templates_any = reverse('api:workflow_job_template_notification_templates_any_list', args=(obj.pk,)),
notification_templates_success = reverse('api:workflow_job_template_notification_templates_success_list', args=(obj.pk,)),
notification_templates_error = reverse('api:workflow_job_template_notification_templates_error_list', args=(obj.pk,)),
survey_spec = reverse('api:workflow_job_template_survey_spec', args=(obj.pk,)),
))
return res
@ -2228,8 +2227,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
if obj.workflow_job_template:
res['workflow_job_template'] = reverse('api:workflow_job_template_detail',
args=(obj.workflow_job_template.pk,))
# TODO:
#res['notifications'] = reverse('api:system_job_notifications_list', args=(obj.pk,))
res['notifications'] = reverse('api:workflow_job_notifications_list', args=(obj.pk,))
res['workflow_nodes'] = reverse('api:workflow_job_workflow_nodes_list', args=(obj.pk,))
res['labels'] = reverse('api:workflow_job_label_list', args=(obj.pk,))
if obj.can_cancel or True:

View File

@ -265,6 +265,9 @@ workflow_job_template_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/schedules/$', 'workflow_job_template_schedules_list'),
url(r'^(?P<pk>[0-9]+)/survey_spec/$', 'workflow_job_template_survey_spec'),
url(r'^(?P<pk>[0-9]+)/workflow_nodes/$', 'workflow_job_template_workflow_nodes_list'),
url(r'^(?P<pk>[0-9]+)/notification_templates_any/$', 'workflow_job_template_notification_templates_any_list'),
url(r'^(?P<pk>[0-9]+)/notification_templates_error/$', 'workflow_job_template_notification_templates_error_list'),
url(r'^(?P<pk>[0-9]+)/notification_templates_success/$', 'workflow_job_template_notification_templates_success_list'),
url(r'^(?P<pk>[0-9]+)/labels/$', 'workflow_job_template_label_list'),
# url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_template_cancel'),
)
@ -275,7 +278,7 @@ workflow_job_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/workflow_nodes/$', 'workflow_job_workflow_nodes_list'),
url(r'^(?P<pk>[0-9]+)/labels/$', 'workflow_job_label_list'),
url(r'^(?P<pk>[0-9]+)/cancel/$', 'workflow_job_cancel'),
#url(r'^(?P<pk>[0-9]+)/notifications/$', 'workflow_job_notifications_list'),
url(r'^(?P<pk>[0-9]+)/notifications/$', 'workflow_job_notifications_list'),
)

View File

@ -2857,6 +2857,29 @@ class WorkflowJobTemplateSchedulesList(SubListCreateAttachDetachAPIView):
parent_model = WorkflowJobTemplate
relationship = 'schedules'
parent_key = 'unified_job_template'
class WorkflowJobTemplateNotificationTemplatesAnyList(SubListCreateAttachDetachAPIView):
model = NotificationTemplate
serializer_class = NotificationTemplateSerializer
parent_model = WorkflowJobTemplate
relationship = 'notification_templates_any'
new_in_310 = True
class WorkflowJobTemplateNotificationTemplatesErrorList(SubListCreateAttachDetachAPIView):
model = NotificationTemplate
serializer_class = NotificationTemplateSerializer
parent_model = WorkflowJobTemplate
relationship = 'notification_templates_error'
new_in_310 = True
class WorkflowJobTemplateNotificationTemplatesSuccessList(SubListCreateAttachDetachAPIView):
model = NotificationTemplate
serializer_class = NotificationTemplateSerializer
parent_model = WorkflowJobTemplate
relationship = 'notification_templates_success'
new_in_310 = True
# TODO:
@ -2900,6 +2923,14 @@ class WorkflowJobCancel(RetrieveAPIView):
else:
return self.http_method_not_allowed(request, *args, **kwargs)
class WorkflowJobNotificationsList(SubListAPIView):
model = Notification
serializer_class = NotificationSerializer
parent_model = WorkflowJob
relationship = 'notifications'
new_in_310 = True
class SystemJobTemplateList(ListAPIView):
model = SystemJobTemplate

View File

@ -13,7 +13,10 @@ from jsonfield import JSONField
# AWX
from awx.main.models import UnifiedJobTemplate, UnifiedJob
from awx.main.models.notifications import JobNotificationMixin
from awx.main.models.notifications import (
NotificationTemplate,
JobNotificationMixin
)
from awx.main.models.base import BaseModel, CreatedModifiedModel, VarsDictProperty
from awx.main.models.rbac import (
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
@ -306,7 +309,18 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl
# TODO: don't allow running of job template if same workflow template running
return False
# TODO: Notifications
@property
def notification_templates(self):
base_notification_templates = NotificationTemplate.objects.all()
error_notification_templates = list(base_notification_templates
.filter(unifiedjobtemplate_notification_templates_for_errors__in=[self]))
success_notification_templates = list(base_notification_templates
.filter(unifiedjobtemplate_notification_templates_for_success__in=[self]))
any_notification_templates = list(base_notification_templates
.filter(unifiedjobtemplate_notification_templates_for_any__in=[self]))
return dict(error=list(error_notification_templates),
success=list(success_notification_templates),
any=list(any_notification_templates))
# TODO: Surveys
#def create_job(self, **kwargs):
@ -429,6 +443,19 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
def get_absolute_url(self):
return reverse('api:workflow_job_detail', args=(self.pk,))
def notification_data(self):
result = super(WorkflowJob, self).notification_data()
str_arr = ['Workflow job summary:', '']
for node in self.workflow_job_nodes.all().select_related('job'):
if node.job is None:
node_job_description = 'no job.'
else:
node_job_description = ('job #{0}, "{1}", which finished with status {2}.'
.format(node.job.id, node.job.name, node.job.status))
str_arr.append("- node #{0} spawns {1}".format(node.id, node_job_description))
result['body'] = '\n'.join(str_arr)
return result
# TODO: Ask UI if this is needed ?
#def get_ui_url(self):
# return urlparse.urljoin(tower_settings.TOWER_URL_BASE, "/#/workflow_jobs/{}".format(self.pk))
@ -437,11 +464,9 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
def task_impact(self):
return 0
# TODO: workflow job notifications
def get_notification_templates(self):
return []
return self.workflow_job_template.notification_templates
# TODO: workflow job notifications
def get_notification_friendly_name(self):
return "Workflow Job"

View File

@ -28,6 +28,7 @@ from awx.main.scheduler.partial import (
AdHocCommandDict,
WorkflowJobDict,
)
from awx.main.tasks import _send_notification_templates
# Celery
from celery.task.control import inspect
@ -128,6 +129,7 @@ class TaskManager():
# See comment in tasks.py::RunWorkflowJob::run()
def process_finished_workflow_jobs(self, workflow_jobs):
result = []
for workflow_job in workflow_jobs:
dag = WorkflowDAG(workflow_job)
if workflow_job.cancel_flag:
@ -136,12 +138,14 @@ class TaskManager():
dag.cancel_node_jobs()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
elif dag.is_workflow_done():
result.append(workflow_job.id)
if workflow_job._has_failed():
workflow_job.status = 'failed'
else:
workflow_job.status = 'successful'
workflow_job.save()
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
return result
def get_active_tasks(self):
inspector = inspect()
@ -332,6 +336,7 @@ class TaskManager():
self.process_pending_tasks(pending_tasks)
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
if len(all_sorted_tasks) > 0:
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
@ -344,11 +349,12 @@ class TaskManager():
self.process_inventory_sources(inventory_id_sources)
running_workflow_tasks = self.get_running_workflow_jobs()
self.process_finished_workflow_jobs(running_workflow_tasks)
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
self.spawn_workflow_graph_jobs(running_workflow_tasks)
self.process_tasks(all_sorted_tasks)
return finished_wfjs
def schedule(self):
with transaction.atomic():
@ -358,6 +364,9 @@ class TaskManager():
except DatabaseError:
return
self._schedule()
finished_wfjs = self._schedule()
# Operations whose queries rely on modifications made during the atomic scheduling session
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
_send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')