Split job event data between callback queue and stdout. Send most of event data directly over queue and capture only stdout/counter/start_line/end_line in celery task; recombine into single event in callback receiver.

This commit is contained in:
Chris Church
2016-12-12 16:34:36 -05:00
parent b7f469baab
commit b85c98afd2
4 changed files with 85 additions and 10 deletions

View File

@@ -26,7 +26,7 @@ import uuid
from ansible.utils.display import Display from ansible.utils.display import Display
# Tower Display Callback # Tower Display Callback
from tower_display_callback.events import event_context from .events import event_context
__all__ = [] __all__ = []

View File

@@ -22,14 +22,75 @@ import base64
import contextlib import contextlib
import datetime import datetime
import json import json
import logging
import multiprocessing import multiprocessing
import os import os
import threading import threading
import uuid import uuid
# Kombu
from kombu import Connection, Exchange, Producer
__all__ = ['event_context'] __all__ = ['event_context']
class CallbackQueueEventDispatcher(object):
def __init__(self):
self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self._init_logging()
def _init_logging(self):
try:
self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0'))
except ValueError:
self.job_callback_debug = 0
self.logger = logging.getLogger('awx.plugins.callback.job_event_callback')
if self.job_callback_debug >= 2:
self.logger.setLevel(logging.DEBUG)
elif self.job_callback_debug >= 1:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.WARNING)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
def dispatch(self, obj):
if not self.callback_connection or not self.connection_queue:
return
active_pid = os.getpid()
for retry_count in xrange(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self.connection = None
if self.connection is None:
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.connection_queue, type='direct')
producer = Producer(self.connection)
producer.publish(obj,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.connection_queue)
return
except Exception, e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)
retry_count += 1
if retry_count >= 3:
break
class EventContext(object): class EventContext(object):
''' '''
Store global and local (per thread/process) data associated with callback Store global and local (per thread/process) data associated with callback
@@ -38,6 +99,7 @@ class EventContext(object):
def __init__(self): def __init__(self):
self.display_lock = multiprocessing.RLock() self.display_lock = multiprocessing.RLock()
self.dispatcher = CallbackQueueEventDispatcher()
def add_local(self, **kwargs): def add_local(self, **kwargs):
if not hasattr(self, '_local'): if not hasattr(self, '_local'):
@@ -136,7 +198,9 @@ class EventContext(object):
fileobj.flush() fileobj.flush()
def dump_begin(self, fileobj): def dump_begin(self, fileobj):
self.dump(fileobj, self.get_begin_dict()) begin_dict = self.get_begin_dict()
self.dispatcher.dispatch(begin_dict)
self.dump(fileobj, {'uuid': begin_dict['uuid']})
def dump_end(self, fileobj): def dump_end(self, fileobj):
self.dump(fileobj, self.get_end_dict(), flush=True) self.dump(fileobj, self.get_end_dict(), flush=True)

View File

@@ -29,8 +29,8 @@ from ansible.plugins.callback import CallbackBase
from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule
# Tower Display Callback # Tower Display Callback
from tower_display_callback.events import event_context from .events import event_context
from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule from .minimal import CallbackModule as MinimalCallbackModule
class BaseCallbackModule(CallbackBase): class BaseCallbackModule(CallbackBase):

View File

@@ -21,6 +21,7 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver')
class CallbackBrokerWorker(ConsumerMixin): class CallbackBrokerWorker(ConsumerMixin):
def __init__(self, connection): def __init__(self, connection):
self.connection = connection self.connection = connection
self.partial_events = {}
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE,
@@ -31,18 +32,28 @@ class CallbackBrokerWorker(ConsumerMixin):
def process_task(self, body, message): def process_task(self, body, message):
try: try:
if 'event' not in body:
raise Exception('Payload does not have an event')
if 'job_id' not in body and 'ad_hoc_command_id' not in body: if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception('Payload does not have a job_id or ad_hoc_command_id') raise Exception('Payload does not have a job_id or ad_hoc_command_id')
if settings.DEBUG: if settings.DEBUG:
logger.info('Body: {}'.format(body)) logger.info('Body: {}'.format(body))
logger.info('Message: {}'.format(message)) logger.info('Message: {}'.format(message))
try: try:
if 'job_id' in body: # If event came directly from callback without counter/stdout,
JobEvent.create_from_data(**body) # save it until the rest of the event arrives.
elif 'ad_hoc_command_id' in body: if 'counter' not in body:
AdHocCommandEvent.create_from_data(**body) if 'uuid' in body:
self.partial_events[body['uuid']] = body
# If event has counter, try to combine it with any event data
# already received for the same uuid, then create the actual
# job event record.
else:
if 'uuid' in body:
partial_event = self.partial_events.pop(body['uuid'], {})
body.update(partial_event)
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e: except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e)) logger.error('Database Error Saving Job Event: {}'.format(e))
except Exception as exc: except Exception as exc: