Merge pull request #561 from ryanpetrello/idle-hands

improve the callback worker's ability to deal with idle/disconnected DB
This commit is contained in:
Ryan Petrello 2017-10-31 10:05:29 -04:00 committed by GitHub
commit 3568be84c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,13 +3,14 @@
# Python
import logging
import os
import signal
import time
from uuid import UUID
from multiprocessing import Process
from multiprocessing import Queue as MPQueue
from Queue import Empty as QueueEmpty
from Queue import Full as QueueFull
import os
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
@ -18,7 +19,8 @@ from kombu.mixins import ConsumerMixin
from django.conf import settings
from django.core.management.base import NoArgsCommand
from django.db import connection as django_connection
from django.db import DatabaseError
from django.db import DatabaseError, OperationalError
from django.db.utils import InterfaceError, InternalError
from django.core.cache import cache as django_cache
# AWX
@ -39,6 +41,9 @@ class WorkerSignalHandler:
class CallbackBrokerWorker(ConsumerMixin):
MAX_RETRIES = 2
def __init__(self, connection, use_workers=True):
self.connection = connection
self.worker_queues = []
@ -133,13 +138,40 @@ class CallbackBrokerWorker(ConsumerMixin):
logger.info('Body: {}'.format(
highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly'))
))
try:
def _save_event_data():
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e))
job_identifier = 'unknown job'
if 'job_id' in body:
job_identifier = body['job_id']
elif 'ad_hoc_command_id' in body:
job_identifier = body['ad_hoc_command_id']
retries = 0
while retries <= self.MAX_RETRIES:
try:
_save_event_data()
break
except (OperationalError, InterfaceError, InternalError) as e:
if retries >= self.MAX_RETRIES:
logger.exception('Worker could not re-establish database connectivity, shutting down gracefully: Job {}'.format(job_identifier))
os.kill(os.getppid(), signal.SIGINT)
return
delay = 60 * retries
logger.exception('Database Error Saving Job Event, retry #{i} in {delay} seconds:'.format(
i=retries + 1,
delay=delay
))
django_connection.close()
time.sleep(delay)
retries += 1
except DatabaseError as e:
logger.exception('Database Error Saving Job Event for Job {}'.format(job_identifier))
break
except Exception as exc:
import traceback
tb = traceback.format_exc()