improve the callback worker's ability to deal with idle/disconnected DB

if database connectivity is lost, callback workers currently raise an
uncaught exception and hang; this can cause the entire process to stop
handling callback events

see: https://github.com/ansible/ansible-tower/issues/7660
This commit is contained in:
Ryan Petrello 2017-10-27 13:26:22 -04:00
parent 4c4cbaef9f
commit 8d2ab3de42
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777

View File

@ -3,13 +3,14 @@
# Python
import logging
import os
import signal
import time
from uuid import UUID
from multiprocessing import Process
from multiprocessing import Queue as MPQueue
from Queue import Empty as QueueEmpty
from Queue import Full as QueueFull
import os
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
@ -18,7 +19,8 @@ from kombu.mixins import ConsumerMixin
from django.conf import settings
from django.core.management.base import NoArgsCommand
from django.db import connection as django_connection
from django.db import DatabaseError
from django.db import DatabaseError, OperationalError
from django.db.utils import InterfaceError, InternalError
from django.core.cache import cache as django_cache
# AWX
@ -39,6 +41,9 @@ class WorkerSignalHandler:
class CallbackBrokerWorker(ConsumerMixin):
MAX_RETRIES = 2
def __init__(self, connection, use_workers=True):
self.connection = connection
self.worker_queues = []
@ -133,13 +138,40 @@ class CallbackBrokerWorker(ConsumerMixin):
logger.info('Body: {}'.format(
highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly'))
))
try:
def _save_event_data():
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e))
job_identifier = 'unknown job'
if 'job_id' in body:
job_identifier = body['job_id']
elif 'ad_hoc_command_id' in body:
job_identifier = body['ad_hoc_command_id']
retries = 0
while retries <= self.MAX_RETRIES:
try:
_save_event_data()
break
except (OperationalError, InterfaceError, InternalError) as e:
if retries >= self.MAX_RETRIES:
logger.exception('Worker could not re-establish database connectivity, shutting down gracefully: Job {}'.format(job_identifier))
os.kill(os.getppid(), signal.SIGINT)
return
delay = 60 * retries
logger.exception('Database Error Saving Job Event, retry #{i} in {delay} seconds:'.format(
i=retries + 1,
delay=delay
))
django_connection.close()
time.sleep(delay)
retries += 1
except DatabaseError as e:
logger.exception('Database Error Saving Job Event for Job {}'.format(job_identifier))
break
except Exception as exc:
import traceback
tb = traceback.format_exc()