refactor some callback receiver code

the bigint migration removed the foreign key constraints for:

- host_id
- job_id (and projectupdate_id, etc...)

because of this, we don't really need to check explicitly for a host_id
IntegrityError anymore (because it won't occur)

additionally, while it's possible to insert an event with a mismatched
job_id now (for example, you can totally start a long-running job, and
delete the job record in the background using the ORM or psql), doing
so results in DoesNotExist errors in the code that handles the
playbook_on_stats events
This commit is contained in:
Ryan Petrello
2020-09-25 08:56:25 -04:00
parent acc0ba570e
commit baad765179
2 changed files with 35 additions and 33 deletions

View File

@@ -11,7 +11,7 @@ import traceback
from django.conf import settings
from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db.utils import InterfaceError, InternalError, IntegrityError
from django.db.utils import InterfaceError, InternalError
import psutil
@@ -120,20 +120,12 @@ class CallbackBrokerWorker(BaseWorker):
e.modified = now
try:
cls.objects.bulk_create(events)
except Exception as exc:
except Exception:
# if an exception occurs, we should re-attempt to save the
# events one-by-one, because something in the list is
# broken/stale (e.g., an IntegrityError on a specific event)
# broken/stale
for e in events:
try:
if (
isinstance(exc, IntegrityError) and
getattr(e, 'host_id', '')
):
# this is one potential IntegrityError we can
# work around - if the host disappears before
# the event can be processed
e.host_id = None
e.save()
except Exception:
logger.exception('Database Error Saving Job Event')

View File

@@ -5,6 +5,7 @@ import logging
from collections import defaultdict
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import models, DatabaseError, connection
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
@@ -349,31 +350,36 @@ class BasePlaybookEvent(CreatedModifiedModel):
pass
if isinstance(self, JobEvent):
hostnames = self._hostnames()
self._update_host_summary_from_stats(set(hostnames))
if self.job.inventory:
try:
self.job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))
try:
job = self.job
except ObjectDoesNotExist:
job = None
if job:
hostnames = self._hostnames()
self._update_host_summary_from_stats(set(hostnames))
if job.inventory:
try:
job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))
# find parent links and progagate changed=T and failed=T
changed = self.job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
failed = self.job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
# find parent links and progagate changed=T and failed=T
changed = job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
failed = job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=changed
).update(changed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=failed
).update(failed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=changed
).update(changed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=failed
).update(failed=True)
# send success/failure notifications when we've finished handling the playbook_on_stats event
from awx.main.tasks import handle_success_and_failure_notifications # circular import
# send success/failure notifications when we've finished handling the playbook_on_stats event
from awx.main.tasks import handle_success_and_failure_notifications # circular import
def _send_notifications():
handle_success_and_failure_notifications.apply_async([self.job.id])
connection.on_commit(_send_notifications)
def _send_notifications():
handle_success_and_failure_notifications.apply_async([job.id])
connection.on_commit(_send_notifications)
for field in ('playbook', 'play', 'task', 'role'):
@@ -497,7 +503,11 @@ class JobEvent(BasePlaybookEvent):
def _update_host_summary_from_stats(self, hostnames):
with ignore_inventory_computed_fields():
if not self.job or not self.job.inventory:
try:
if not self.job or not self.job.inventory:
logger.info('Event {} missing job or inventory, host summaries not updated'.format(self.pk))
return
except ObjectDoesNotExist:
logger.info('Event {} missing job or inventory, host summaries not updated'.format(self.pk))
return
job = self.job