From 8c167e50c95173f1717c51705d9170cd3c537d38 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Tue, 27 Mar 2018 13:40:51 -0400 Subject: [PATCH] Continuously stream data from verbose jobs In verbose unified job models (inventory updates, system jobs, etc.), do not delay dispatch just because the encoded event data is not part of the data written to the buffer. This allows output from these commands to be submitted to the callback queue as they are produced, instead of waiting until the buffer is closed. --- awx/main/tasks.py | 27 ++++++---- .../tests/unit/utils/test_event_filter.py | 54 ++++++++++++++++++- awx/main/utils/common.py | 28 +++++++++- 3 files changed, 97 insertions(+), 12 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c47101b988..ca6f7dbede 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -55,7 +55,7 @@ from awx.main.queue import CallbackQueueDispatcher from awx.main.expect import run, isolated_manager from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, get_licenser, - wrap_args_with_proot, OutputEventFilter, ignore_inventory_computed_fields, + wrap_args_with_proot, OutputEventFilter, OutputVerboseFilter, ignore_inventory_computed_fields, ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars) from awx.main.utils.reload import restart_local_services, stop_local_services from awx.main.utils.pglock import advisory_lock @@ -811,19 +811,26 @@ class BaseTask(LogErrorsTask): def get_stdout_handle(self, instance): ''' - Return an virtual file object for capturing stdout and events. + Return an virtual file object for capturing stdout and/or events. ''' dispatcher = CallbackQueueDispatcher() - def event_callback(event_data): - event_data.setdefault(self.event_data_key, instance.id) - if 'uuid' in event_data: - cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) - if cache_event is not None: - event_data.update(cache_event) - dispatcher.dispatch(event_data) + if isinstance(instance, (Job, AdHocCommand, ProjectUpdate)): + def event_callback(event_data): + event_data.setdefault(self.event_data_key, instance.id) + if 'uuid' in event_data: + cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) + if cache_event is not None: + event_data.update(cache_event) + dispatcher.dispatch(event_data) - return OutputEventFilter(event_callback) + return OutputEventFilter(event_callback) + else: + def event_callback(event_data): + event_data.setdefault(self.event_data_key, instance.id) + dispatcher.dispatch(event_data) + + return OutputVerboseFilter(event_callback) def pre_run_hook(self, instance, **kwargs): ''' diff --git a/awx/main/tests/unit/utils/test_event_filter.py b/awx/main/tests/unit/utils/test_event_filter.py index 85ecc609d0..fb8f4fa144 100644 --- a/awx/main/tests/unit/utils/test_event_filter.py +++ b/awx/main/tests/unit/utils/test_event_filter.py @@ -5,7 +5,7 @@ from StringIO import StringIO from six.moves import xrange -from awx.main.utils import OutputEventFilter +from awx.main.utils import OutputEventFilter, OutputVerboseFilter MAX_WIDTH = 78 EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd' @@ -145,3 +145,55 @@ def test_large_stdout_blob(): f = OutputEventFilter(_callback) for x in range(1024 * 10): f.write('x' * 1024) + + +def test_verbose_line_buffering(): + events = [] + + def _callback(event_data): + events.append(event_data) + + f = OutputVerboseFilter(_callback) + f.write('one two\r\n\r\n') + + assert len(events) == 2 + assert events[0]['start_line'] == 0 + assert events[0]['end_line'] == 1 + assert events[0]['stdout'] == 'one two' + + assert events[1]['start_line'] == 1 + assert events[1]['end_line'] == 2 + assert events[1]['stdout'] == '' + + f.write('three') + assert len(events) == 2 + f.write('\r\nfou') + + # three is not pushed to buffer until its line completes + assert len(events) == 3 + assert events[2]['start_line'] == 2 + assert events[2]['end_line'] == 3 + assert events[2]['stdout'] == 'three' + + f.write('r\r') + f.write('\nfi') + + assert events[3]['start_line'] == 3 + assert events[3]['end_line'] == 4 + assert events[3]['stdout'] == 'four' + + f.write('ve') + f.write('\r\n') + + assert len(events) == 5 + assert events[4]['start_line'] == 4 + assert events[4]['end_line'] == 5 + assert events[4]['stdout'] == 'five' + + f.close() + + from pprint import pprint + pprint(events) + assert len(events) == 6 + + assert events[5]['event'] == 'EOF' diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index ba3413a133..be531d7e17 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -48,7 +48,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', '_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided', - 'get_current_apps', 'set_current_apps', 'OutputEventFilter', + 'get_current_apps', 'set_current_apps', 'OutputEventFilter', 'OutputVerboseFilter', 'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity', 'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict', 'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest', @@ -1009,6 +1009,32 @@ class OutputEventFilter(object): self._current_event_data = None +class OutputVerboseFilter(OutputEventFilter): + ''' + File-like object that dispatches stdout data. + Does not search for encoded job event data. + Use for unified job types that do not encode job event data. + ''' + def write(self, data): + self._buffer.write(data) + + # if the current chunk contains a line break + if data and '\n' in data: + # emit events for all complete lines we know about + lines = self._buffer.getvalue().splitlines(True) # keep ends + remainder = None + # if last line is not a complete line, then exclude it + if '\n' not in lines[-1]: + remainder = lines.pop() + # emit all complete lines + for line in lines: + self._emit_event(line) + self._buffer = StringIO() + # put final partial line back on buffer + if remainder: + self._buffer.write(remainder) + + def is_ansible_variable(key): return key.startswith('ansible_')