From 1db697e4ebefb27a86575c2f06de422a97a63865 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Thu, 17 Nov 2016 09:09:19 -0500 Subject: [PATCH] Add multiprocessing RLock around event data output to ensure only one process is writing to stdout at a time. --- awx/lib/tower_display_callback/display.py | 16 ++++++++++------ awx/lib/tower_display_callback/events.py | 17 +++++++++++------ awx/lib/tower_display_callback/module.py | 23 ++++++++++++----------- awx/main/signals.py | 1 - 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/awx/lib/tower_display_callback/display.py b/awx/lib/tower_display_callback/display.py index 7f32ca9379..128c9349c7 100644 --- a/awx/lib/tower_display_callback/display.py +++ b/awx/lib/tower_display_callback/display.py @@ -77,17 +77,21 @@ def display_with_context(f): def wrapper(*args, **kwargs): log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False) stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False) - fileobj = sys.stderr if stderr else sys.stdout event_uuid = event_context.get().get('uuid', None) - try: - if not log_only and not event_uuid: + with event_context.display_lock: + # If writing only to a log file or there is already an event UUID + # set (from a callback module method), skip dumping the event data. + if log_only or event_uuid: + return f(*args, **kwargs) + try: + fileobj = sys.stderr if stderr else sys.stdout event_context.add_local(uuid=str(uuid.uuid4())) event_context.dump_begin(fileobj) - return f(*args, **kwargs) - finally: - if not log_only and not event_uuid: + return f(*args, **kwargs) + finally: event_context.dump_end(fileobj) event_context.remove_local(uuid=None) + return wrapper diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index 009c0c8efc..f85da9c233 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -22,6 +22,7 @@ import base64 import contextlib import datetime import json +import multiprocessing import os import threading import uuid @@ -35,6 +36,9 @@ class EventContext(object): events and other display output methods. ''' + def __init__(self): + self.display_lock = multiprocessing.RLock() + def add_local(self, **kwargs): if not hasattr(self, '_local'): self._local = threading.local() @@ -121,12 +125,13 @@ class EventContext(object): def dump(self, fileobj, data, max_width=78): b64data = base64.b64encode(json.dumps(data)) - fileobj.write(u'\x1b[K') - for offset in xrange(0, len(b64data), max_width): - chunk = b64data[offset:offset + max_width] - escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk)) - fileobj.write(escaped_chunk) - fileobj.write(u'\x1b[K') + with self.display_lock: + fileobj.write(u'\x1b[K') + for offset in xrange(0, len(b64data), max_width): + chunk = b64data[offset:offset + max_width] + escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk)) + fileobj.write(escaped_chunk) + fileobj.write(u'\x1b[K') def dump_begin(self, fileobj): self.dump(fileobj, self.get_begin_dict()) diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py index daa2bb8d4a..457455e513 100644 --- a/awx/lib/tower_display_callback/module.py +++ b/awx/lib/tower_display_callback/module.py @@ -121,17 +121,18 @@ class BaseCallbackModule(CallbackBase): else: task = None - try: - event_context.add_local(event=event, **event_data) - if task: - self.set_task(task, local=True) - event_context.dump_begin(sys.stdout) - yield - finally: - event_context.dump_end(sys.stdout) - if task: - self.clear_task(local=True) - event_context.remove_local(event=None, **event_data) + with event_context.display_lock: + try: + event_context.add_local(event=event, **event_data) + if task: + self.set_task(task, local=True) + event_context.dump_begin(sys.stdout) + yield + finally: + event_context.dump_end(sys.stdout) + if task: + self.clear_task(local=True) + event_context.remove_local(event=None, **event_data) def set_playbook(self, playbook): # NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them. diff --git a/awx/main/signals.py b/awx/main/signals.py index 7f3f8aee66..ceda8899b1 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -36,7 +36,6 @@ logger = logging.getLogger('awx.main.signals') def emit_job_event_detail(sender, **kwargs): instance = kwargs['instance'] created = kwargs['created'] - print("before created job_event_detail") if created: event_serialized = JobEventSerializer(instance).data event_serialized['id'] = instance.id