From f02a93c3336b1fa90e71950298817dc842317ec4 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Fri, 7 Feb 2014 17:12:46 -0500 Subject: [PATCH] AC-1012 Open a new RabbitMQ connection for each forked process to prevent intermittent hang, update debug logging for job event callback. --- awx/main/tasks.py | 4 +- awx/plugins/callback/job_event_callback.py | 43 +++++++++++++++++----- awx/settings/local_settings.py.example | 3 ++ 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e80e1525c6..1981c79722 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -379,7 +379,9 @@ class RunJob(BaseTask): env['REST_API_TOKEN'] = job.task_auth_token or '' if settings.BROKER_URL.startswith('amqp://'): env['BROKER_URL'] = settings.BROKER_URL - if settings.DEBUG: + if getattr(settings, 'JOB_CALLBACK_DEBUG', False): + env['JOB_CALLBACK_DEBUG'] = '2' + elif settings.DEBUG: env['JOB_CALLBACK_DEBUG'] = '1' # When using Ansible >= 1.3, allow the inventory script to include host diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 18a52e3fb8..5701126cfa 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -32,14 +32,16 @@ # Python import datetime import json +import logging import os import sys import urllib import urlparse -# Requests +# Requests / Kombu try: import requests + from kombu import Connection, Exchange, Queue except ImportError: # If running from an AWX installation, use the local version of requests if # if cannot be found globally. @@ -47,9 +49,7 @@ except ImportError: 'lib', 'site-packages') sys.path.insert(0, local_site_packages) import requests - -# Kombu -from kombu import Connection, Exchange, Queue + from kombu import Connection, Exchange, Queue class TokenAuth(requests.auth.AuthBase): @@ -87,24 +87,43 @@ class CallbackModule(object): self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') self.broker_url = os.getenv('BROKER_URL', '') - self.job_callback_debug = os.getenv('JOB_CALLBACK_DEBUG', '') + self._init_logging() + + def _init_logging(self): + try: + self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0')) + except ValueError: + self.job_callback_debug = 0 + self.logger = logging.getLogger('awx.plugins.callback.job_event_callback') + if self.job_callback_debug >= 2: + self.logger.setLevel(logging.DEBUG) + elif self.job_callback_debug >= 1: + self.logger.setLevel(logging.INFO) + else: + self.logger.setLevel(logging.WARNING) + handler = logging.StreamHandler() + formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.propagate = False def __del__(self): self._cleanup_connection() def _publish_errback(self, exc, interval): - if self.job_callback_debug: - print 'Publish Error: %r, retry in %s seconds, pid=%s' % (exc, interval, os.getpid()) + self.logger.info('Publish Error: %r', exc) def _cleanup_connection(self): if hasattr(self, 'producer'): try: + #self.logger.debug('Cleanup Producer: %r', self.producer) self.producer.cancel() except: pass del self.producer if hasattr(self, 'connection'): try: + #self.logger.debug('Cleanup Connection: %r', self.connection) self.connection.release() except: pass @@ -131,14 +150,21 @@ class CallbackModule(object): retry_count = 0 while True: try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = os.getpid() + if self.connection_pid != os.getpid(): + self._cleanup_connection() if not hasattr(self, 'connection'): self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True}) + self.logger.debug('New Connection: %r, retry=%d', self.connection, retry_count) if not hasattr(self, 'producer'): channel = self.connection.channel() self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json') self.publish = self.connection.ensure(self.producer, self.producer.publish, errback=self._publish_errback, max_retries=3, interval_start=1, interval_step=1, interval_max=10) + self.logger.debug('New Producer: %r, retry=%d', self.producer, retry_count) + self.logger.debug('Publish: %r, retry=%d', msg, retry_count) self.publish(msg, exchange=self.job_events_exchange, routing_key=('job_events[%d]' % self.job_id), declare=[self.job_events_queue]) @@ -146,8 +172,7 @@ class CallbackModule(object): self._cleanup_connection() return except Exception, e: - if self.job_callback_debug: - print 'Publish Exception: %r, pid=%s, retry=%d' % (e, os.getpid(), retry_count) + self.logger.info('Publish Exception: %r, retry=%d', e, retry_count, exc_info=True) if retry_count < 3: self._cleanup_connection() else: diff --git a/awx/settings/local_settings.py.example b/awx/settings/local_settings.py.example index ecc703e3c5..5200ee29f4 100644 --- a/awx/settings/local_settings.py.example +++ b/awx/settings/local_settings.py.example @@ -46,6 +46,9 @@ if len(sys.argv) >= 2 and sys.argv[1] == 'test': # Celery AMQP configuration. BROKER_URL = 'amqp://awx-dev:AWXsome1@localhost:5672/awx-dev' +# Set True to enable additional logging from the job_event_callback plugin +JOB_CALLBACK_DEBUG = False + # Absolute filesystem path to the directory to host projects (with playbooks). # This directory should NOT be web-accessible. PROJECTS_ROOT = os.path.join(BASE_DIR, 'projects')