AC-1012 Open a new RabbitMQ connection for each forked process to prevent intermittent hang, update debug logging for job event callback.

This commit is contained in:
Chris Church
2014-02-07 17:12:46 -05:00
parent 8638f1e3d1
commit f02a93c333
3 changed files with 40 additions and 10 deletions

View File

@@ -32,14 +32,16 @@
# Python
import datetime
import json
import logging
import os
import sys
import urllib
import urlparse
# Requests
# Requests / Kombu
try:
import requests
from kombu import Connection, Exchange, Queue
except ImportError:
# If running from an AWX installation, use the local version of requests if
# if cannot be found globally.
@@ -47,9 +49,7 @@ except ImportError:
'lib', 'site-packages')
sys.path.insert(0, local_site_packages)
import requests
# Kombu
from kombu import Connection, Exchange, Queue
from kombu import Connection, Exchange, Queue
class TokenAuth(requests.auth.AuthBase):
@@ -87,24 +87,43 @@ class CallbackModule(object):
self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '')
self.broker_url = os.getenv('BROKER_URL', '')
self.job_callback_debug = os.getenv('JOB_CALLBACK_DEBUG', '')
self._init_logging()
def _init_logging(self):
try:
self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0'))
except ValueError:
self.job_callback_debug = 0
self.logger = logging.getLogger('awx.plugins.callback.job_event_callback')
if self.job_callback_debug >= 2:
self.logger.setLevel(logging.DEBUG)
elif self.job_callback_debug >= 1:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.WARNING)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
def __del__(self):
self._cleanup_connection()
def _publish_errback(self, exc, interval):
if self.job_callback_debug:
print 'Publish Error: %r, retry in %s seconds, pid=%s' % (exc, interval, os.getpid())
self.logger.info('Publish Error: %r', exc)
def _cleanup_connection(self):
if hasattr(self, 'producer'):
try:
#self.logger.debug('Cleanup Producer: %r', self.producer)
self.producer.cancel()
except:
pass
del self.producer
if hasattr(self, 'connection'):
try:
#self.logger.debug('Cleanup Connection: %r', self.connection)
self.connection.release()
except:
pass
@@ -131,14 +150,21 @@ class CallbackModule(object):
retry_count = 0
while True:
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = os.getpid()
if self.connection_pid != os.getpid():
self._cleanup_connection()
if not hasattr(self, 'connection'):
self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True})
self.logger.debug('New Connection: %r, retry=%d', self.connection, retry_count)
if not hasattr(self, 'producer'):
channel = self.connection.channel()
self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json')
self.publish = self.connection.ensure(self.producer, self.producer.publish,
errback=self._publish_errback,
max_retries=3, interval_start=1, interval_step=1, interval_max=10)
self.logger.debug('New Producer: %r, retry=%d', self.producer, retry_count)
self.logger.debug('Publish: %r, retry=%d', msg, retry_count)
self.publish(msg, exchange=self.job_events_exchange,
routing_key=('job_events[%d]' % self.job_id),
declare=[self.job_events_queue])
@@ -146,8 +172,7 @@ class CallbackModule(object):
self._cleanup_connection()
return
except Exception, e:
if self.job_callback_debug:
print 'Publish Exception: %r, pid=%s, retry=%d' % (e, os.getpid(), retry_count)
self.logger.info('Publish Exception: %r, retry=%d', e, retry_count, exc_info=True)
if retry_count < 3:
self._cleanup_connection()
else: