From b85c98afd2927c7d67f1d105ebd86fb27c977ed1 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Mon, 12 Dec 2016 16:34:36 -0500 Subject: [PATCH 1/3] Split job event data between callback queue and stdout. Send most of event data directly over queue and capture only stdout/counter/start_line/end_line in celery task; recombine into single event in callback receiver. --- awx/lib/tower_display_callback/display.py | 2 +- awx/lib/tower_display_callback/events.py | 66 ++++++++++++++++++- awx/lib/tower_display_callback/module.py | 4 +- .../commands/run_callback_receiver.py | 23 +++++-- 4 files changed, 85 insertions(+), 10 deletions(-) diff --git a/awx/lib/tower_display_callback/display.py b/awx/lib/tower_display_callback/display.py index 128c9349c7..ad5e8ba37a 100644 --- a/awx/lib/tower_display_callback/display.py +++ b/awx/lib/tower_display_callback/display.py @@ -26,7 +26,7 @@ import uuid from ansible.utils.display import Display # Tower Display Callback -from tower_display_callback.events import event_context +from .events import event_context __all__ = [] diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index 86fab2895b..0909ed460d 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -22,14 +22,75 @@ import base64 import contextlib import datetime import json +import logging import multiprocessing import os import threading import uuid +# Kombu +from kombu import Connection, Exchange, Producer + __all__ = ['event_context'] +class CallbackQueueEventDispatcher(object): + + def __init__(self): + self.callback_connection = os.getenv('CALLBACK_CONNECTION', None) + self.connection_queue = os.getenv('CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None + self._init_logging() + + def _init_logging(self): + try: + self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0')) + except ValueError: + self.job_callback_debug = 0 + self.logger = logging.getLogger('awx.plugins.callback.job_event_callback') + if self.job_callback_debug >= 2: + self.logger.setLevel(logging.DEBUG) + elif self.job_callback_debug >= 1: + self.logger.setLevel(logging.INFO) + else: + self.logger.setLevel(logging.WARNING) + handler = logging.StreamHandler() + formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.propagate = False + + def dispatch(self, obj): + if not self.callback_connection or not self.connection_queue: + return + active_pid = os.getpid() + for retry_count in xrange(4): + try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = active_pid + if self.connection_pid != active_pid: + self.connection = None + if self.connection is None: + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') + + producer = Producer(self.connection) + producer.publish(obj, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) + return + except Exception, e: + self.logger.info('Publish Job Event Exception: %r, retry=%d', e, + retry_count, exc_info=True) + retry_count += 1 + if retry_count >= 3: + break + + class EventContext(object): ''' Store global and local (per thread/process) data associated with callback @@ -38,6 +99,7 @@ class EventContext(object): def __init__(self): self.display_lock = multiprocessing.RLock() + self.dispatcher = CallbackQueueEventDispatcher() def add_local(self, **kwargs): if not hasattr(self, '_local'): @@ -136,7 +198,9 @@ class EventContext(object): fileobj.flush() def dump_begin(self, fileobj): - self.dump(fileobj, self.get_begin_dict()) + begin_dict = self.get_begin_dict() + self.dispatcher.dispatch(begin_dict) + self.dump(fileobj, {'uuid': begin_dict['uuid']}) def dump_end(self, fileobj): self.dump(fileobj, self.get_end_dict(), flush=True) diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py index e61ef17624..59faa7ac79 100644 --- a/awx/lib/tower_display_callback/module.py +++ b/awx/lib/tower_display_callback/module.py @@ -29,8 +29,8 @@ from ansible.plugins.callback import CallbackBase from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule # Tower Display Callback -from tower_display_callback.events import event_context -from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule +from .events import event_context +from .minimal import CallbackModule as MinimalCallbackModule class BaseCallbackModule(CallbackBase): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index c0105b2587..7b959f6781 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -21,6 +21,7 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver') class CallbackBrokerWorker(ConsumerMixin): def __init__(self, connection): self.connection = connection + self.partial_events = {} def get_consumers(self, Consumer, channel): return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, @@ -31,18 +32,28 @@ class CallbackBrokerWorker(ConsumerMixin): def process_task(self, body, message): try: - if 'event' not in body: - raise Exception('Payload does not have an event') if 'job_id' not in body and 'ad_hoc_command_id' not in body: raise Exception('Payload does not have a job_id or ad_hoc_command_id') if settings.DEBUG: logger.info('Body: {}'.format(body)) logger.info('Message: {}'.format(message)) try: - if 'job_id' in body: - JobEvent.create_from_data(**body) - elif 'ad_hoc_command_id' in body: - AdHocCommandEvent.create_from_data(**body) + # If event came directly from callback without counter/stdout, + # save it until the rest of the event arrives. + if 'counter' not in body: + if 'uuid' in body: + self.partial_events[body['uuid']] = body + # If event has counter, try to combine it with any event data + # already received for the same uuid, then create the actual + # job event record. + else: + if 'uuid' in body: + partial_event = self.partial_events.pop(body['uuid'], {}) + body.update(partial_event) + if 'job_id' in body: + JobEvent.create_from_data(**body) + elif 'ad_hoc_command_id' in body: + AdHocCommandEvent.create_from_data(**body) except DatabaseError as e: logger.error('Database Error Saving Job Event: {}'.format(e)) except Exception as exc: From a61e729ebbc27a85e492010f3571cb659c8f8762 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 14 Dec 2016 15:05:28 -0500 Subject: [PATCH 2/3] Purge event res dict if it is over a certain size Also purge/update some old settings values --- awx/lib/tower_display_callback/events.py | 4 +++- awx/main/tasks.py | 1 + awx/settings/defaults.py | 17 +++++------------ 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index 0909ed460d..c17cf2c7f1 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -173,7 +173,9 @@ class EventContext(object): if event_data.get(key, False): event = key break - + max_res = int(os.getenv("MAX_EVENT_RES", 700000)) + if event not in ('playbook_on_stats',) and "res" in event_data and len(str(event_data['res'])) > max_res: + event_data['res'] = {} event_dict = dict(event=event, event_data=event_data) for key in event_data.keys(): if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e0dcae0fca..addbe4c8f2 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -809,6 +809,7 @@ class RunJob(BaseTask): env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' env['TOWER_HOST'] = settings.TOWER_URL_BASE + env['MAX_EVENT_RES'] = settings.MAX_EVENT_RES_DATA env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_CONNECTION'] = settings.BROKER_URL if getattr(settings, 'JOB_CALLBACK_DEBUG', False): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index fbb9c8fb73..9c758dfb13 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -150,7 +150,11 @@ ALLOWED_HOSTS = [] REMOTE_HOST_HEADERS = ['REMOTE_ADDR', 'REMOTE_HOST'] # Note: This setting may be overridden by database settings. -STDOUT_MAX_BYTES_DISPLAY = 1048576 +STDOUT_MAX_BYTES_DISPLAY = 10485760 + +# The maximum size of the ansible callback event's res data structure +# beyond this limit and the value will be removed +MAX_EVENT_RES_DATA = 700000 # Note: This setting may be overridden by database settings. EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 @@ -522,17 +526,6 @@ ANSIBLE_FORCE_COLOR = True # the celery task. AWX_TASK_ENV = {} -# Maximum number of job events processed by the callback receiver worker process -# before it recycles -JOB_EVENT_RECYCLE_THRESHOLD = 3000 - -# Number of workers used to proecess job events in parallel -JOB_EVENT_WORKERS = 4 - -# Maximum number of job events that can be waiting on a single worker queue before -# it can be skipped as too busy -JOB_EVENT_MAX_QUEUE_SIZE = 100 - # Flag to enable/disable updating hosts M2M when saving job events. CAPTURE_JOB_EVENT_HOSTS = False From 1cd2a762bec82a1da79def356684506dbc43d3f3 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 14 Dec 2016 15:20:04 -0500 Subject: [PATCH 3/3] Reset max bytes display --- awx/settings/defaults.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 9c758dfb13..876ba56c85 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -150,7 +150,7 @@ ALLOWED_HOSTS = [] REMOTE_HOST_HEADERS = ['REMOTE_ADDR', 'REMOTE_HOST'] # Note: This setting may be overridden by database settings. -STDOUT_MAX_BYTES_DISPLAY = 10485760 +STDOUT_MAX_BYTES_DISPLAY = 1048576 # The maximum size of the ansible callback event's res data structure # beyond this limit and the value will be removed