From d40a5dec8f821f9f34bd52248a083f4d25d32a92 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 13 Mar 2020 17:36:20 -0400 Subject: [PATCH] change when we send job notifications to avoid a race condition success/failure notifications for *playbooks* include summary data about the hosts in based on the contents of the playbook_on_stats event the current implementation suffers from a number of race conditions that sometimes can cause that data to be missing or incomplete; this change makes it so that for *playbooks* we build (and send) the notification in response to the playbook_on_stats event, not the EOF event --- awx/main/dispatch/worker/callback.py | 25 +++++++++++-------------- awx/main/models/events.py | 10 +++++++++- awx/main/tasks.py | 17 +++++++++++++++++ 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 739f1538ea..a38e145de2 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -15,7 +15,9 @@ from django.db.utils import InterfaceError, InternalError, IntegrityError from awx.main.consumers import emit_channel_notification from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, - InventoryUpdateEvent, SystemJobEvent, UnifiedJob) + InventoryUpdateEvent, SystemJobEvent, UnifiedJob, + Job) +from awx.main.tasks import handle_success_and_failure_notifications from awx.main.models.events import emit_event_detail from .base import BaseWorker @@ -137,19 +139,14 @@ class CallbackBrokerWorker(BaseWorker): # have all the data we need to send out success/failure # notification templates uj = UnifiedJob.objects.get(pk=job_identifier) - if hasattr(uj, 'send_notification_templates'): - retries = 0 - while retries < 5: - if uj.finished: - uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') - break - else: - # wait a few seconds to avoid a race where the - # events are persisted _before_ the UJ.status - # changes from running -> successful - retries += 1 - time.sleep(1) - uj = UnifiedJob.objects.get(pk=job_identifier) + + if isinstance(uj, Job): + # *actual playbooks* send their success/failure + # notifications in response to the playbook_on_stats + # event handling code in main.models.events + pass + elif hasattr(uj, 'send_notification_templates'): + handle_success_and_failure_notifications.apply_async([uj.id]) except Exception: logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier)) return diff --git a/awx/main/models/events.py b/awx/main/models/events.py index f34e98de51..eef46bebe3 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -4,7 +4,7 @@ import datetime import logging from collections import defaultdict -from django.db import models, DatabaseError +from django.db import models, DatabaseError, connection from django.utils.dateparse import parse_datetime from django.utils.text import Truncator from django.utils.timezone import utc @@ -356,6 +356,14 @@ class BasePlaybookEvent(CreatedModifiedModel): job_id=self.job_id, uuid__in=failed ).update(failed=True) + # send success/failure notifications when we've finished handling the playbook_on_stats event + from awx.main.tasks import handle_success_and_failure_notifications # circular import + + def _send_notifications(): + handle_success_and_failure_notifications.apply_async([self.job.id]) + connection.on_commit(_send_notifications) + + for field in ('playbook', 'play', 'task', 'role'): value = force_text(event_data.get(field, '')).strip() if value != getattr(self, field): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index da7035b85f..a66cdb7293 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -602,6 +602,23 @@ def handle_work_error(task_id, *args, **kwargs): pass +@task(queue=get_local_queuename) +def handle_success_and_failure_notifications(job_id): + uj = UnifiedJob.objects.get(pk=job_id) + retries = 0 + while retries < 5: + if uj.finished: + uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') + break + else: + # wait a few seconds to avoid a race where the + # events are persisted _before_ the UJ.status + # changes from running -> successful + retries += 1 + time.sleep(1) + uj = UnifiedJob.objects.get(pk=job_id) + + @task(queue=get_local_queuename) def update_inventory_computed_fields(inventory_id): '''