From cdbc5127a6e2e2ca35e39b4336ed50357fdb315e Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Tue, 8 Nov 2016 11:12:49 -0500 Subject: [PATCH] Workflow notification backend. --- awx/main/scheduler/__init__.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 8f6e1b9617..060c2d0bce 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -28,6 +28,7 @@ from awx.main.scheduler.partial import ( AdHocCommandDict, WorkflowJobDict, ) +from awx.main.tasks import _send_notification_templates # Celery from celery.task.control import inspect @@ -128,6 +129,7 @@ class TaskManager(): # See comment in tasks.py::RunWorkflowJob::run() def process_finished_workflow_jobs(self, workflow_jobs): + result = [] for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) if workflow_job.cancel_flag: @@ -136,12 +138,14 @@ class TaskManager(): dag.cancel_node_jobs() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) elif dag.is_workflow_done(): + result.append(workflow_job.id) if workflow_job._has_failed(): workflow_job.status = 'failed' else: workflow_job.status = 'successful' workflow_job.save() connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) + return result def get_active_tasks(self): inspector = inspect() @@ -332,6 +336,7 @@ class TaskManager(): self.process_pending_tasks(pending_tasks) def _schedule(self): + finished_wfjs = [] all_sorted_tasks = self.get_tasks() if len(all_sorted_tasks) > 0: latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) @@ -344,11 +349,12 @@ class TaskManager(): self.process_inventory_sources(inventory_id_sources) running_workflow_tasks = self.get_running_workflow_jobs() - self.process_finished_workflow_jobs(running_workflow_tasks) + finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) self.spawn_workflow_graph_jobs(running_workflow_tasks) self.process_tasks(all_sorted_tasks) + return finished_wfjs def schedule(self): with transaction.atomic(): @@ -358,6 +364,9 @@ class TaskManager(): except DatabaseError: return - self._schedule() + finished_wfjs = self._schedule() + # Operations whose queries rely on modifications made during the atomic scheduling session + for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs): + _send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')