Merge pull request #1124 from AlanCoding/hot_potato

Stream standard out in non-event models
This commit is contained in:
Alan Rominger
2018-03-28 11:38:49 -04:00
committed by GitHub
3 changed files with 97 additions and 12 deletions

View File

@@ -55,7 +55,7 @@ from awx.main.queue import CallbackQueueDispatcher
from awx.main.expect import run, isolated_manager
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
check_proot_installed, build_proot_temp_dir, get_licenser,
wrap_args_with_proot, OutputEventFilter, ignore_inventory_computed_fields,
wrap_args_with_proot, OutputEventFilter, OutputVerboseFilter, ignore_inventory_computed_fields,
ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars)
from awx.main.utils.reload import restart_local_services, stop_local_services
from awx.main.utils.pglock import advisory_lock
@@ -821,19 +821,26 @@ class BaseTask(LogErrorsTask):
def get_stdout_handle(self, instance):
'''
Return an virtual file object for capturing stdout and events.
Return an virtual file object for capturing stdout and/or events.
'''
dispatcher = CallbackQueueDispatcher()
def event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
if isinstance(instance, (Job, AdHocCommand, ProjectUpdate)):
def event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
return OutputEventFilter(event_callback)
return OutputEventFilter(event_callback)
else:
def event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
dispatcher.dispatch(event_data)
return OutputVerboseFilter(event_callback)
def pre_run_hook(self, instance, **kwargs):
'''

View File

@@ -5,7 +5,7 @@ from StringIO import StringIO
from six.moves import xrange
from awx.main.utils import OutputEventFilter
from awx.main.utils import OutputEventFilter, OutputVerboseFilter
MAX_WIDTH = 78
EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd'
@@ -145,3 +145,55 @@ def test_large_stdout_blob():
f = OutputEventFilter(_callback)
for x in range(1024 * 10):
f.write('x' * 1024)
def test_verbose_line_buffering():
events = []
def _callback(event_data):
events.append(event_data)
f = OutputVerboseFilter(_callback)
f.write('one two\r\n\r\n')
assert len(events) == 2
assert events[0]['start_line'] == 0
assert events[0]['end_line'] == 1
assert events[0]['stdout'] == 'one two'
assert events[1]['start_line'] == 1
assert events[1]['end_line'] == 2
assert events[1]['stdout'] == ''
f.write('three')
assert len(events) == 2
f.write('\r\nfou')
# three is not pushed to buffer until its line completes
assert len(events) == 3
assert events[2]['start_line'] == 2
assert events[2]['end_line'] == 3
assert events[2]['stdout'] == 'three'
f.write('r\r')
f.write('\nfi')
assert events[3]['start_line'] == 3
assert events[3]['end_line'] == 4
assert events[3]['stdout'] == 'four'
f.write('ve')
f.write('\r\n')
assert len(events) == 5
assert events[4]['start_line'] == 4
assert events[4]['end_line'] == 5
assert events[4]['stdout'] == 'five'
f.close()
from pprint import pprint
pprint(events)
assert len(events) == 6
assert events[5]['event'] == 'EOF'

View File

@@ -48,7 +48,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
'get_current_apps', 'set_current_apps', 'OutputEventFilter',
'get_current_apps', 'set_current_apps', 'OutputEventFilter', 'OutputVerboseFilter',
'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity',
'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict',
'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest',
@@ -1009,6 +1009,32 @@ class OutputEventFilter(object):
self._current_event_data = None
class OutputVerboseFilter(OutputEventFilter):
'''
File-like object that dispatches stdout data.
Does not search for encoded job event data.
Use for unified job types that do not encode job event data.
'''
def write(self, data):
self._buffer.write(data)
# if the current chunk contains a line break
if data and '\n' in data:
# emit events for all complete lines we know about
lines = self._buffer.getvalue().splitlines(True) # keep ends
remainder = None
# if last line is not a complete line, then exclude it
if '\n' not in lines[-1]:
remainder = lines.pop()
# emit all complete lines
for line in lines:
self._emit_event(line)
self._buffer = StringIO()
# put final partial line back on buffer
if remainder:
self._buffer.write(remainder)
def is_ansible_variable(key):
return key.startswith('ansible_')