diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 9539a01762..3d311ef3d6 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,13 +3,14 @@ # Python import logging +import os import signal +import time from uuid import UUID from multiprocessing import Process from multiprocessing import Queue as MPQueue from Queue import Empty as QueueEmpty from Queue import Full as QueueFull -import os from kombu import Connection, Exchange, Queue from kombu.mixins import ConsumerMixin @@ -18,7 +19,8 @@ from kombu.mixins import ConsumerMixin from django.conf import settings from django.core.management.base import NoArgsCommand from django.db import connection as django_connection -from django.db import DatabaseError +from django.db import DatabaseError, OperationalError +from django.db.utils import InterfaceError, InternalError from django.core.cache import cache as django_cache # AWX @@ -39,6 +41,9 @@ class WorkerSignalHandler: class CallbackBrokerWorker(ConsumerMixin): + + MAX_RETRIES = 2 + def __init__(self, connection, use_workers=True): self.connection = connection self.worker_queues = [] @@ -133,13 +138,40 @@ class CallbackBrokerWorker(ConsumerMixin): logger.info('Body: {}'.format( highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly')) )) - try: + + def _save_event_data(): if 'job_id' in body: JobEvent.create_from_data(**body) elif 'ad_hoc_command_id' in body: AdHocCommandEvent.create_from_data(**body) - except DatabaseError as e: - logger.error('Database Error Saving Job Event: {}'.format(e)) + + job_identifier = 'unknown job' + if 'job_id' in body: + job_identifier = body['job_id'] + elif 'ad_hoc_command_id' in body: + job_identifier = body['ad_hoc_command_id'] + + retries = 0 + while retries <= self.MAX_RETRIES: + try: + _save_event_data() + break + except (OperationalError, InterfaceError, InternalError) as e: + if retries >= self.MAX_RETRIES: + logger.exception('Worker could not re-establish database connectivity, shutting down gracefully: Job {}'.format(job_identifier)) + os.kill(os.getppid(), signal.SIGINT) + return + delay = 60 * retries + logger.exception('Database Error Saving Job Event, retry #{i} in {delay} seconds:'.format( + i=retries + 1, + delay=delay + )) + django_connection.close() + time.sleep(delay) + retries += 1 + except DatabaseError as e: + logger.exception('Database Error Saving Job Event for Job {}'.format(job_identifier)) + break except Exception as exc: import traceback tb = traceback.format_exc()