mirror of
https://github.com/ansible/awx.git
synced 2026-03-20 18:37:39 -02:30
Merge pull request #1652 from ryanpetrello/fix-500
send job notification templates _after_ all events have been processed
This commit is contained in:
@@ -161,14 +161,35 @@ class CallbackBrokerWorker(ConsumerMixin):
|
|||||||
break
|
break
|
||||||
|
|
||||||
if body.get('event') == 'EOF':
|
if body.get('event') == 'EOF':
|
||||||
# EOF events are sent when stdout for the running task is
|
try:
|
||||||
# closed. don't actually persist them to the database; we
|
logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
|
||||||
# just use them to report `summary` websocket events as an
|
# EOF events are sent when stdout for the running task is
|
||||||
# approximation for when a job is "done"
|
# closed. don't actually persist them to the database; we
|
||||||
emit_channel_notification(
|
# just use them to report `summary` websocket events as an
|
||||||
'jobs-summary',
|
# approximation for when a job is "done"
|
||||||
dict(group_name='jobs', unified_job_id=job_identifier)
|
emit_channel_notification(
|
||||||
)
|
'jobs-summary',
|
||||||
|
dict(group_name='jobs', unified_job_id=job_identifier)
|
||||||
|
)
|
||||||
|
# Additionally, when we've processed all events, we should
|
||||||
|
# have all the data we need to send out success/failure
|
||||||
|
# notification templates
|
||||||
|
uj = UnifiedJob.objects.get(pk=job_identifier)
|
||||||
|
if hasattr(uj, 'send_notification_templates'):
|
||||||
|
retries = 0
|
||||||
|
while retries < 5:
|
||||||
|
if uj.finished:
|
||||||
|
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# wait a few seconds to avoid a race where the
|
||||||
|
# events are persisted _before_ the UJ.status
|
||||||
|
# changes from running -> successful
|
||||||
|
retries += 1
|
||||||
|
time.sleep(1)
|
||||||
|
uj = UnifiedJob.objects.get(pk=job_identifier)
|
||||||
|
except Exception:
|
||||||
|
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
retries = 0
|
retries = 0
|
||||||
|
|||||||
@@ -209,3 +209,27 @@ class JobNotificationMixin(object):
|
|||||||
|
|
||||||
def build_notification_failed_message(self):
|
def build_notification_failed_message(self):
|
||||||
return self._build_notification_message('failed')
|
return self._build_notification_message('failed')
|
||||||
|
|
||||||
|
def send_notification_templates(self, status_str):
|
||||||
|
from awx.main.tasks import send_notifications # avoid circular import
|
||||||
|
if status_str not in ['succeeded', 'failed']:
|
||||||
|
raise ValueError(_("status_str must be either succeeded or failed"))
|
||||||
|
try:
|
||||||
|
notification_templates = self.get_notification_templates()
|
||||||
|
except Exception:
|
||||||
|
logger.warn("No notification template defined for emitting notification")
|
||||||
|
notification_templates = None
|
||||||
|
if notification_templates:
|
||||||
|
if status_str == 'succeeded':
|
||||||
|
notification_template_type = 'success'
|
||||||
|
else:
|
||||||
|
notification_template_type = 'error'
|
||||||
|
all_notification_templates = set(notification_templates.get(notification_template_type, []) + notification_templates.get('any', []))
|
||||||
|
if len(all_notification_templates):
|
||||||
|
try:
|
||||||
|
(notification_subject, notification_body) = getattr(self, 'build_notification_%s_message' % status_str)()
|
||||||
|
except AttributeError:
|
||||||
|
raise NotImplementedError("build_notification_%s_message() does not exist" % status_str)
|
||||||
|
send_notifications.delay([n.generate_notification(notification_subject, notification_body).id
|
||||||
|
for n in all_notification_templates],
|
||||||
|
job_id=self.id)
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ from awx.main.utils import get_type_for_model
|
|||||||
from awx.main.signals import disable_activity_stream
|
from awx.main.signals import disable_activity_stream
|
||||||
|
|
||||||
from awx.main.scheduler.dependency_graph import DependencyGraph
|
from awx.main.scheduler.dependency_graph import DependencyGraph
|
||||||
from awx.main import tasks as awx_tasks
|
|
||||||
from awx.main.utils import decrypt_field
|
from awx.main.utils import decrypt_field
|
||||||
|
|
||||||
# Celery
|
# Celery
|
||||||
@@ -499,7 +498,8 @@ class TaskManager():
|
|||||||
except DatabaseError:
|
except DatabaseError:
|
||||||
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
|
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
|
||||||
continue
|
continue
|
||||||
awx_tasks._send_notification_templates(task, 'failed')
|
if hasattr(task, 'send_notification_templates'):
|
||||||
|
task.send_notification_templates('failed')
|
||||||
task.websocket_emit_status(new_status)
|
task.websocket_emit_status(new_status)
|
||||||
logger.error("{}Task {} has no record in celery. Marking as failed".format(
|
logger.error("{}Task {} has no record in celery. Marking as failed".format(
|
||||||
'Isolated ' if isolated else '', task.log_format))
|
'Isolated ' if isolated else '', task.log_format))
|
||||||
@@ -630,4 +630,4 @@ class TaskManager():
|
|||||||
|
|
||||||
# Operations whose queries rely on modifications made during the atomic scheduling session
|
# Operations whose queries rely on modifications made during the atomic scheduling session
|
||||||
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
|
for wfj in WorkflowJob.objects.filter(id__in=finished_wfjs):
|
||||||
awx_tasks._send_notification_templates(wfj, 'succeeded' if wfj.status == 'successful' else 'failed')
|
wfj.send_notification_templates('succeeded' if wfj.status == 'successful' else 'failed')
|
||||||
|
|||||||
@@ -430,30 +430,6 @@ def awx_periodic_scheduler(self):
|
|||||||
state.save()
|
state.save()
|
||||||
|
|
||||||
|
|
||||||
def _send_notification_templates(instance, status_str):
|
|
||||||
if status_str not in ['succeeded', 'failed']:
|
|
||||||
raise ValueError(_("status_str must be either succeeded or failed"))
|
|
||||||
try:
|
|
||||||
notification_templates = instance.get_notification_templates()
|
|
||||||
except Exception:
|
|
||||||
logger.warn("No notification template defined for emitting notification")
|
|
||||||
notification_templates = None
|
|
||||||
if notification_templates:
|
|
||||||
if status_str == 'succeeded':
|
|
||||||
notification_template_type = 'success'
|
|
||||||
else:
|
|
||||||
notification_template_type = 'error'
|
|
||||||
all_notification_templates = set(notification_templates.get(notification_template_type, []) + notification_templates.get('any', []))
|
|
||||||
if len(all_notification_templates):
|
|
||||||
try:
|
|
||||||
(notification_subject, notification_body) = getattr(instance, 'build_notification_%s_message' % status_str)()
|
|
||||||
except AttributeError:
|
|
||||||
raise NotImplementedError("build_notification_%s_message() does not exist" % status_str)
|
|
||||||
send_notifications.delay([n.generate_notification(notification_subject, notification_body).id
|
|
||||||
for n in all_notification_templates],
|
|
||||||
job_id=instance.id)
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
|
@shared_task(bind=True, queue='tower', base=LogErrorsTask)
|
||||||
def handle_work_success(self, result, task_actual):
|
def handle_work_success(self, result, task_actual):
|
||||||
try:
|
try:
|
||||||
@@ -464,8 +440,6 @@ def handle_work_success(self, result, task_actual):
|
|||||||
if not instance:
|
if not instance:
|
||||||
return
|
return
|
||||||
|
|
||||||
_send_notification_templates(instance, 'succeeded')
|
|
||||||
|
|
||||||
from awx.main.scheduler.tasks import run_job_complete
|
from awx.main.scheduler.tasks import run_job_complete
|
||||||
run_job_complete.delay(instance.id)
|
run_job_complete.delay(instance.id)
|
||||||
|
|
||||||
@@ -501,9 +475,6 @@ def handle_work_error(task_id, *args, **kwargs):
|
|||||||
instance.save()
|
instance.save()
|
||||||
instance.websocket_emit_status("failed")
|
instance.websocket_emit_status("failed")
|
||||||
|
|
||||||
if first_instance:
|
|
||||||
_send_notification_templates(first_instance, 'failed')
|
|
||||||
|
|
||||||
# We only send 1 job complete message since all the job completion message
|
# We only send 1 job complete message since all the job completion message
|
||||||
# handling does is trigger the scheduler. If we extend the functionality of
|
# handling does is trigger the scheduler. If we extend the functionality of
|
||||||
# what the job complete message handler does then we may want to send a
|
# what the job complete message handler does then we may want to send a
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ from awx.main.models import (
|
|||||||
Instance,
|
Instance,
|
||||||
WorkflowJob,
|
WorkflowJob,
|
||||||
)
|
)
|
||||||
|
from awx.main.models.notifications import JobNotificationMixin
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -323,7 +324,7 @@ class TestReaper():
|
|||||||
})
|
})
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@mock.patch('awx.main.tasks._send_notification_templates')
|
@mock.patch.object(JobNotificationMixin, 'send_notification_templates')
|
||||||
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], []))
|
@mock.patch.object(TaskManager, 'get_active_tasks', lambda self: ([], []))
|
||||||
def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, waiting_tasks, mocker):
|
def test_cleanup_inconsistent_task(self, notify, active_tasks, considered_jobs, reapable_jobs, running_tasks, waiting_tasks, mocker):
|
||||||
tm = TaskManager()
|
tm = TaskManager()
|
||||||
@@ -338,7 +339,7 @@ class TestReaper():
|
|||||||
j.save.assert_not_called()
|
j.save.assert_not_called()
|
||||||
|
|
||||||
assert notify.call_count == 4
|
assert notify.call_count == 4
|
||||||
notify.assert_has_calls([mock.call(j, 'failed') for j in reapable_jobs], any_order=True)
|
notify.assert_has_calls([mock.call('failed') for j in reapable_jobs], any_order=True)
|
||||||
|
|
||||||
for j in reapable_jobs:
|
for j in reapable_jobs:
|
||||||
j.websocket_emit_status.assert_called_once_with('failed')
|
j.websocket_emit_status.assert_called_once_with('failed')
|
||||||
|
|||||||
Reference in New Issue
Block a user