mirror of
https://github.com/ansible/awx.git
synced 2026-02-26 15:36:04 -03:30
Workflow notification backend.
This commit is contained in:
@@ -28,6 +28,7 @@ from awx.main.scheduler.partial import (
|
|||||||
AdHocCommandDict,
|
AdHocCommandDict,
|
||||||
WorkflowJobDict,
|
WorkflowJobDict,
|
||||||
)
|
)
|
||||||
|
from awx.main.tasks import _send_notification_templates
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
from celery.task.control import inspect
|
from celery.task.control import inspect
|
||||||
@@ -128,6 +129,7 @@ class TaskManager():
|
|||||||
|
|
||||||
# See comment in tasks.py::RunWorkflowJob::run()
|
# See comment in tasks.py::RunWorkflowJob::run()
|
||||||
def process_finished_workflow_jobs(self, workflow_jobs):
|
def process_finished_workflow_jobs(self, workflow_jobs):
|
||||||
|
result = []
|
||||||
for workflow_job in workflow_jobs:
|
for workflow_job in workflow_jobs:
|
||||||
dag = WorkflowDAG(workflow_job)
|
dag = WorkflowDAG(workflow_job)
|
||||||
if workflow_job.cancel_flag:
|
if workflow_job.cancel_flag:
|
||||||
@@ -136,12 +138,14 @@ class TaskManager():
|
|||||||
dag.cancel_node_jobs()
|
dag.cancel_node_jobs()
|
||||||
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
||||||
elif dag.is_workflow_done():
|
elif dag.is_workflow_done():
|
||||||
|
result.append(workflow_job.id)
|
||||||
if workflow_job._has_failed():
|
if workflow_job._has_failed():
|
||||||
workflow_job.status = 'failed'
|
workflow_job.status = 'failed'
|
||||||
else:
|
else:
|
||||||
workflow_job.status = 'successful'
|
workflow_job.status = 'successful'
|
||||||
workflow_job.save()
|
workflow_job.save()
|
||||||
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
|
||||||
|
return result
|
||||||
|
|
||||||
def get_active_tasks(self):
|
def get_active_tasks(self):
|
||||||
inspector = inspect()
|
inspector = inspect()
|
||||||
@@ -332,6 +336,7 @@ class TaskManager():
|
|||||||
self.process_pending_tasks(pending_tasks)
|
self.process_pending_tasks(pending_tasks)
|
||||||
|
|
||||||
def _schedule(self):
|
def _schedule(self):
|
||||||
|
finished_wfjs = []
|
||||||
all_sorted_tasks = self.get_tasks()
|
all_sorted_tasks = self.get_tasks()
|
||||||
if len(all_sorted_tasks) > 0:
|
if len(all_sorted_tasks) > 0:
|
||||||
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
|
||||||
@@ -344,11 +349,12 @@ class TaskManager():
|
|||||||
self.process_inventory_sources(inventory_id_sources)
|
self.process_inventory_sources(inventory_id_sources)
|
||||||
|
|
||||||
running_workflow_tasks = self.get_running_workflow_jobs()
|
running_workflow_tasks = self.get_running_workflow_jobs()
|
||||||
self.process_finished_workflow_jobs(running_workflow_tasks)
|
finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks)
|
||||||
|
|
||||||
self.spawn_workflow_graph_jobs(running_workflow_tasks)
|
self.spawn_workflow_graph_jobs(running_workflow_tasks)
|
||||||
|
|
||||||
self.process_tasks(all_sorted_tasks)
|
self.process_tasks(all_sorted_tasks)
|
||||||
|
return finished_wfjs
|
||||||
|
|
||||||
def schedule(self):
|
def schedule(self):
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
@@ -358,6 +364,9 @@ class TaskManager():
|
|||||||
except DatabaseError:
|
except DatabaseError:
|
||||||
return
|
return
|
||||||
|
|
||||||
self._schedule()
|
finished_wfjs = self._schedule()
|
||||||
|
|
||||||
|
# Operations whose queries rely on modifications made during the atomic scheduling session
|
||||||
|
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
|
||||||
|
_send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user