Merge pull request #4051 from cchurch/no-more-gibberish

Add multiprocessing RLock around job event data output
This commit is contained in:
Chris Church 2016-11-17 15:37:58 -05:00 committed by GitHub
commit dcbf91fc4a
4 changed files with 33 additions and 24 deletions

View File

@ -77,17 +77,21 @@ def display_with_context(f):
def wrapper(*args, **kwargs):
log_only = args[5] if len(args) >= 6 else kwargs.get('log_only', False)
stderr = args[3] if len(args) >= 4 else kwargs.get('stderr', False)
fileobj = sys.stderr if stderr else sys.stdout
event_uuid = event_context.get().get('uuid', None)
try:
if not log_only and not event_uuid:
with event_context.display_lock:
# If writing only to a log file or there is already an event UUID
# set (from a callback module method), skip dumping the event data.
if log_only or event_uuid:
return f(*args, **kwargs)
try:
fileobj = sys.stderr if stderr else sys.stdout
event_context.add_local(uuid=str(uuid.uuid4()))
event_context.dump_begin(fileobj)
return f(*args, **kwargs)
finally:
if not log_only and not event_uuid:
return f(*args, **kwargs)
finally:
event_context.dump_end(fileobj)
event_context.remove_local(uuid=None)
return wrapper

View File

@ -22,6 +22,7 @@ import base64
import contextlib
import datetime
import json
import multiprocessing
import os
import threading
import uuid
@ -35,6 +36,9 @@ class EventContext(object):
events and other display output methods.
'''
def __init__(self):
self.display_lock = multiprocessing.RLock()
def add_local(self, **kwargs):
if not hasattr(self, '_local'):
self._local = threading.local()
@ -121,12 +125,13 @@ class EventContext(object):
def dump(self, fileobj, data, max_width=78):
b64data = base64.b64encode(json.dumps(data))
fileobj.write(u'\x1b[K')
for offset in xrange(0, len(b64data), max_width):
chunk = b64data[offset:offset + max_width]
escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk))
fileobj.write(escaped_chunk)
fileobj.write(u'\x1b[K')
with self.display_lock:
fileobj.write(u'\x1b[K')
for offset in xrange(0, len(b64data), max_width):
chunk = b64data[offset:offset + max_width]
escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk))
fileobj.write(escaped_chunk)
fileobj.write(u'\x1b[K')
def dump_begin(self, fileobj):
self.dump(fileobj, self.get_begin_dict())

View File

@ -121,17 +121,18 @@ class BaseCallbackModule(CallbackBase):
else:
task = None
try:
event_context.add_local(event=event, **event_data)
if task:
self.set_task(task, local=True)
event_context.dump_begin(sys.stdout)
yield
finally:
event_context.dump_end(sys.stdout)
if task:
self.clear_task(local=True)
event_context.remove_local(event=None, **event_data)
with event_context.display_lock:
try:
event_context.add_local(event=event, **event_data)
if task:
self.set_task(task, local=True)
event_context.dump_begin(sys.stdout)
yield
finally:
event_context.dump_end(sys.stdout)
if task:
self.clear_task(local=True)
event_context.remove_local(event=None, **event_data)
def set_playbook(self, playbook):
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so do it for them.

View File

@ -36,7 +36,6 @@ logger = logging.getLogger('awx.main.signals')
def emit_job_event_detail(sender, **kwargs):
instance = kwargs['instance']
created = kwargs['created']
print("before created job_event_detail")
if created:
event_serialized = JobEventSerializer(instance).data
event_serialized['id'] = instance.id