diff --git a/awx/lib/tower_display_callback/display.py b/awx/lib/tower_display_callback/display.py index 128c9349c7..ad5e8ba37a 100644 --- a/awx/lib/tower_display_callback/display.py +++ b/awx/lib/tower_display_callback/display.py @@ -26,7 +26,7 @@ import uuid from ansible.utils.display import Display # Tower Display Callback -from tower_display_callback.events import event_context +from .events import event_context __all__ = [] diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index 86fab2895b..0909ed460d 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -22,14 +22,75 @@ import base64 import contextlib import datetime import json +import logging import multiprocessing import os import threading import uuid +# Kombu +from kombu import Connection, Exchange, Producer + __all__ = ['event_context'] +class CallbackQueueEventDispatcher(object): + + def __init__(self): + self.callback_connection = os.getenv('CALLBACK_CONNECTION', None) + self.connection_queue = os.getenv('CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None + self._init_logging() + + def _init_logging(self): + try: + self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0')) + except ValueError: + self.job_callback_debug = 0 + self.logger = logging.getLogger('awx.plugins.callback.job_event_callback') + if self.job_callback_debug >= 2: + self.logger.setLevel(logging.DEBUG) + elif self.job_callback_debug >= 1: + self.logger.setLevel(logging.INFO) + else: + self.logger.setLevel(logging.WARNING) + handler = logging.StreamHandler() + formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.propagate = False + + def dispatch(self, obj): + if not self.callback_connection or not self.connection_queue: + return + active_pid = os.getpid() + for retry_count in xrange(4): + try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = active_pid + if self.connection_pid != active_pid: + self.connection = None + if self.connection is None: + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') + + producer = Producer(self.connection) + producer.publish(obj, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) + return + except Exception, e: + self.logger.info('Publish Job Event Exception: %r, retry=%d', e, + retry_count, exc_info=True) + retry_count += 1 + if retry_count >= 3: + break + + class EventContext(object): ''' Store global and local (per thread/process) data associated with callback @@ -38,6 +99,7 @@ class EventContext(object): def __init__(self): self.display_lock = multiprocessing.RLock() + self.dispatcher = CallbackQueueEventDispatcher() def add_local(self, **kwargs): if not hasattr(self, '_local'): @@ -136,7 +198,9 @@ class EventContext(object): fileobj.flush() def dump_begin(self, fileobj): - self.dump(fileobj, self.get_begin_dict()) + begin_dict = self.get_begin_dict() + self.dispatcher.dispatch(begin_dict) + self.dump(fileobj, {'uuid': begin_dict['uuid']}) def dump_end(self, fileobj): self.dump(fileobj, self.get_end_dict(), flush=True) diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py index e61ef17624..59faa7ac79 100644 --- a/awx/lib/tower_display_callback/module.py +++ b/awx/lib/tower_display_callback/module.py @@ -29,8 +29,8 @@ from ansible.plugins.callback import CallbackBase from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule # Tower Display Callback -from tower_display_callback.events import event_context -from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule +from .events import event_context +from .minimal import CallbackModule as MinimalCallbackModule class BaseCallbackModule(CallbackBase): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index c0105b2587..7b959f6781 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -21,6 +21,7 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver') class CallbackBrokerWorker(ConsumerMixin): def __init__(self, connection): self.connection = connection + self.partial_events = {} def get_consumers(self, Consumer, channel): return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, @@ -31,18 +32,28 @@ class CallbackBrokerWorker(ConsumerMixin): def process_task(self, body, message): try: - if 'event' not in body: - raise Exception('Payload does not have an event') if 'job_id' not in body and 'ad_hoc_command_id' not in body: raise Exception('Payload does not have a job_id or ad_hoc_command_id') if settings.DEBUG: logger.info('Body: {}'.format(body)) logger.info('Message: {}'.format(message)) try: - if 'job_id' in body: - JobEvent.create_from_data(**body) - elif 'ad_hoc_command_id' in body: - AdHocCommandEvent.create_from_data(**body) + # If event came directly from callback without counter/stdout, + # save it until the rest of the event arrives. + if 'counter' not in body: + if 'uuid' in body: + self.partial_events[body['uuid']] = body + # If event has counter, try to combine it with any event data + # already received for the same uuid, then create the actual + # job event record. + else: + if 'uuid' in body: + partial_event = self.partial_events.pop(body['uuid'], {}) + body.update(partial_event) + if 'job_id' in body: + JobEvent.create_from_data(**body) + elif 'ad_hoc_command_id' in body: + AdHocCommandEvent.create_from_data(**body) except DatabaseError as e: logger.error('Database Error Saving Job Event: {}'.format(e)) except Exception as exc: