Merge pull request #4404 from cchurch/split-job-event-data

Split job event data between callback queue and stdout.
This commit is contained in:
Chris Church
2016-12-14 15:23:13 -05:00
committed by GitHub
6 changed files with 93 additions and 22 deletions

View File

@@ -26,7 +26,7 @@ import uuid
from ansible.utils.display import Display from ansible.utils.display import Display
# Tower Display Callback # Tower Display Callback
from tower_display_callback.events import event_context from .events import event_context
__all__ = [] __all__ = []

View File

@@ -22,14 +22,75 @@ import base64
import contextlib import contextlib
import datetime import datetime
import json import json
import logging
import multiprocessing import multiprocessing
import os import os
import threading import threading
import uuid import uuid
# Kombu
from kombu import Connection, Exchange, Producer
__all__ = ['event_context'] __all__ = ['event_context']
class CallbackQueueEventDispatcher(object):
def __init__(self):
self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self._init_logging()
def _init_logging(self):
try:
self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0'))
except ValueError:
self.job_callback_debug = 0
self.logger = logging.getLogger('awx.plugins.callback.job_event_callback')
if self.job_callback_debug >= 2:
self.logger.setLevel(logging.DEBUG)
elif self.job_callback_debug >= 1:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.WARNING)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
def dispatch(self, obj):
if not self.callback_connection or not self.connection_queue:
return
active_pid = os.getpid()
for retry_count in xrange(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self.connection = None
if self.connection is None:
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.connection_queue, type='direct')
producer = Producer(self.connection)
producer.publish(obj,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.connection_queue)
return
except Exception, e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)
retry_count += 1
if retry_count >= 3:
break
class EventContext(object): class EventContext(object):
''' '''
Store global and local (per thread/process) data associated with callback Store global and local (per thread/process) data associated with callback
@@ -38,6 +99,7 @@ class EventContext(object):
def __init__(self): def __init__(self):
self.display_lock = multiprocessing.RLock() self.display_lock = multiprocessing.RLock()
self.dispatcher = CallbackQueueEventDispatcher()
def add_local(self, **kwargs): def add_local(self, **kwargs):
if not hasattr(self, '_local'): if not hasattr(self, '_local'):
@@ -111,7 +173,9 @@ class EventContext(object):
if event_data.get(key, False): if event_data.get(key, False):
event = key event = key
break break
max_res = int(os.getenv("MAX_EVENT_RES", 700000))
if event not in ('playbook_on_stats',) and "res" in event_data and len(str(event_data['res'])) > max_res:
event_data['res'] = {}
event_dict = dict(event=event, event_data=event_data) event_dict = dict(event=event, event_data=event_data)
for key in event_data.keys(): for key in event_data.keys():
if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'): if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'):
@@ -136,7 +200,9 @@ class EventContext(object):
fileobj.flush() fileobj.flush()
def dump_begin(self, fileobj): def dump_begin(self, fileobj):
self.dump(fileobj, self.get_begin_dict()) begin_dict = self.get_begin_dict()
self.dispatcher.dispatch(begin_dict)
self.dump(fileobj, {'uuid': begin_dict['uuid']})
def dump_end(self, fileobj): def dump_end(self, fileobj):
self.dump(fileobj, self.get_end_dict(), flush=True) self.dump(fileobj, self.get_end_dict(), flush=True)

View File

@@ -29,8 +29,8 @@ from ansible.plugins.callback import CallbackBase
from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule
# Tower Display Callback # Tower Display Callback
from tower_display_callback.events import event_context from .events import event_context
from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule from .minimal import CallbackModule as MinimalCallbackModule
class BaseCallbackModule(CallbackBase): class BaseCallbackModule(CallbackBase):

View File

@@ -21,6 +21,7 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver')
class CallbackBrokerWorker(ConsumerMixin): class CallbackBrokerWorker(ConsumerMixin):
def __init__(self, connection): def __init__(self, connection):
self.connection = connection self.connection = connection
self.partial_events = {}
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE,
@@ -31,18 +32,28 @@ class CallbackBrokerWorker(ConsumerMixin):
def process_task(self, body, message): def process_task(self, body, message):
try: try:
if 'event' not in body:
raise Exception('Payload does not have an event')
if 'job_id' not in body and 'ad_hoc_command_id' not in body: if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception('Payload does not have a job_id or ad_hoc_command_id') raise Exception('Payload does not have a job_id or ad_hoc_command_id')
if settings.DEBUG: if settings.DEBUG:
logger.info('Body: {}'.format(body)) logger.info('Body: {}'.format(body))
logger.info('Message: {}'.format(message)) logger.info('Message: {}'.format(message))
try: try:
if 'job_id' in body: # If event came directly from callback without counter/stdout,
JobEvent.create_from_data(**body) # save it until the rest of the event arrives.
elif 'ad_hoc_command_id' in body: if 'counter' not in body:
AdHocCommandEvent.create_from_data(**body) if 'uuid' in body:
self.partial_events[body['uuid']] = body
# If event has counter, try to combine it with any event data
# already received for the same uuid, then create the actual
# job event record.
else:
if 'uuid' in body:
partial_event = self.partial_events.pop(body['uuid'], {})
body.update(partial_event)
if 'job_id' in body:
JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e: except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e)) logger.error('Database Error Saving Job Event: {}'.format(e))
except Exception as exc: except Exception as exc:

View File

@@ -809,6 +809,7 @@ class RunJob(BaseTask):
env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or '' env['REST_API_TOKEN'] = job.task_auth_token or ''
env['TOWER_HOST'] = settings.TOWER_URL_BASE env['TOWER_HOST'] = settings.TOWER_URL_BASE
env['MAX_EVENT_RES'] = settings.MAX_EVENT_RES_DATA
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
env['CALLBACK_CONNECTION'] = settings.BROKER_URL env['CALLBACK_CONNECTION'] = settings.BROKER_URL
if getattr(settings, 'JOB_CALLBACK_DEBUG', False): if getattr(settings, 'JOB_CALLBACK_DEBUG', False):

View File

@@ -152,6 +152,10 @@ REMOTE_HOST_HEADERS = ['REMOTE_ADDR', 'REMOTE_HOST']
# Note: This setting may be overridden by database settings. # Note: This setting may be overridden by database settings.
STDOUT_MAX_BYTES_DISPLAY = 1048576 STDOUT_MAX_BYTES_DISPLAY = 1048576
# The maximum size of the ansible callback event's res data structure
# beyond this limit and the value will be removed
MAX_EVENT_RES_DATA = 700000
# Note: This setting may be overridden by database settings. # Note: This setting may be overridden by database settings.
EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024
@@ -522,17 +526,6 @@ ANSIBLE_FORCE_COLOR = True
# the celery task. # the celery task.
AWX_TASK_ENV = {} AWX_TASK_ENV = {}
# Maximum number of job events processed by the callback receiver worker process
# before it recycles
JOB_EVENT_RECYCLE_THRESHOLD = 3000
# Number of workers used to proecess job events in parallel
JOB_EVENT_WORKERS = 4
# Maximum number of job events that can be waiting on a single worker queue before
# it can be skipped as too busy
JOB_EVENT_MAX_QUEUE_SIZE = 100
# Flag to enable/disable updating hosts M2M when saving job events. # Flag to enable/disable updating hosts M2M when saving job events.
CAPTURE_JOB_EVENT_HOSTS = False CAPTURE_JOB_EVENT_HOSTS = False