change when we send job notifications to avoid a race condition

success/failure notifications for *playbooks* include summary data about
the hosts in based on the contents of the playbook_on_stats event

the current implementation suffers from a number of race conditions that
sometimes can cause that data to be missing or incomplete; this change
makes it so that for *playbooks* we build (and send) the notification in
response to the playbook_on_stats event, not the EOF event
This commit is contained in:
Ryan Petrello
2020-03-13 17:36:20 -04:00
parent a725778b17
commit d40a5dec8f
3 changed files with 37 additions and 15 deletions

View File

@@ -15,7 +15,9 @@ from django.db.utils import InterfaceError, InternalError, IntegrityError
from awx.main.consumers import emit_channel_notification from awx.main.consumers import emit_channel_notification
from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent,
InventoryUpdateEvent, SystemJobEvent, UnifiedJob) InventoryUpdateEvent, SystemJobEvent, UnifiedJob,
Job)
from awx.main.tasks import handle_success_and_failure_notifications
from awx.main.models.events import emit_event_detail from awx.main.models.events import emit_event_detail
from .base import BaseWorker from .base import BaseWorker
@@ -137,19 +139,14 @@ class CallbackBrokerWorker(BaseWorker):
# have all the data we need to send out success/failure # have all the data we need to send out success/failure
# notification templates # notification templates
uj = UnifiedJob.objects.get(pk=job_identifier) uj = UnifiedJob.objects.get(pk=job_identifier)
if hasattr(uj, 'send_notification_templates'):
retries = 0 if isinstance(uj, Job):
while retries < 5: # *actual playbooks* send their success/failure
if uj.finished: # notifications in response to the playbook_on_stats
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') # event handling code in main.models.events
break pass
else: elif hasattr(uj, 'send_notification_templates'):
# wait a few seconds to avoid a race where the handle_success_and_failure_notifications.apply_async([uj.id])
# events are persisted _before_ the UJ.status
# changes from running -> successful
retries += 1
time.sleep(1)
uj = UnifiedJob.objects.get(pk=job_identifier)
except Exception: except Exception:
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier)) logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
return return

View File

@@ -4,7 +4,7 @@ import datetime
import logging import logging
from collections import defaultdict from collections import defaultdict
from django.db import models, DatabaseError from django.db import models, DatabaseError, connection
from django.utils.dateparse import parse_datetime from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator from django.utils.text import Truncator
from django.utils.timezone import utc from django.utils.timezone import utc
@@ -356,6 +356,14 @@ class BasePlaybookEvent(CreatedModifiedModel):
job_id=self.job_id, uuid__in=failed job_id=self.job_id, uuid__in=failed
).update(failed=True) ).update(failed=True)
# send success/failure notifications when we've finished handling the playbook_on_stats event
from awx.main.tasks import handle_success_and_failure_notifications # circular import
def _send_notifications():
handle_success_and_failure_notifications.apply_async([self.job.id])
connection.on_commit(_send_notifications)
for field in ('playbook', 'play', 'task', 'role'): for field in ('playbook', 'play', 'task', 'role'):
value = force_text(event_data.get(field, '')).strip() value = force_text(event_data.get(field, '')).strip()
if value != getattr(self, field): if value != getattr(self, field):

View File

@@ -602,6 +602,23 @@ def handle_work_error(task_id, *args, **kwargs):
pass pass
@task(queue=get_local_queuename)
def handle_success_and_failure_notifications(job_id):
uj = UnifiedJob.objects.get(pk=job_id)
retries = 0
while retries < 5:
if uj.finished:
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
break
else:
# wait a few seconds to avoid a race where the
# events are persisted _before_ the UJ.status
# changes from running -> successful
retries += 1
time.sleep(1)
uj = UnifiedJob.objects.get(pk=job_id)
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def update_inventory_computed_fields(inventory_id): def update_inventory_computed_fields(inventory_id):
''' '''