diff --git a/awx/main/tests/unit/utils/test_event_filter.py b/awx/main/tests/unit/utils/test_event_filter.py deleted file mode 100644 index e69dd3c507..0000000000 --- a/awx/main/tests/unit/utils/test_event_filter.py +++ /dev/null @@ -1,198 +0,0 @@ -import pytest -import base64 -import json -from io import StringIO - -from django.utils.encoding import smart_bytes, smart_str -from awx.main.utils import OutputEventFilter, OutputVerboseFilter - -MAX_WIDTH = 78 -EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd' - - -def write_encoded_event_data(fileobj, data): - b64data = smart_str(base64.b64encode(smart_bytes(json.dumps(data)))) - # pattern corresponding to OutputEventFilter expectation - fileobj.write('\x1b[K') - for offset in range(0, len(b64data), MAX_WIDTH): - chunk = b64data[offset:offset + MAX_WIDTH] - escaped_chunk = '{}\x1b[{}D'.format(chunk, len(chunk)) - fileobj.write(escaped_chunk) - fileobj.write('\x1b[K') - - -@pytest.fixture -def fake_callback(): - return [] - - -@pytest.fixture -def fake_cache(): - return {} - - -@pytest.fixture -def wrapped_handle(job_event_callback): - # Preliminary creation of resources usually done in tasks.py - return OutputEventFilter(job_event_callback) - - -@pytest.fixture -def job_event_callback(fake_callback, fake_cache): - def method(event_data): - if 'uuid' in event_data: - cache_event = fake_cache.get(':1:ev-{}'.format(event_data['uuid']), None) - if cache_event is not None: - event_data.update(cache_event) - fake_callback.append(event_data) - return method - - -def test_event_recomb(fake_callback, fake_cache, wrapped_handle): - # Pretend that this is done by the Ansible callback module - fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} - write_encoded_event_data(wrapped_handle, { - 'uuid': EXAMPLE_UUID - }) - wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n') - wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n') - write_encoded_event_data(wrapped_handle, {}) - # stop pretending - - assert len(fake_callback) == 1 - recomb_data = fake_callback[0] - assert 'event' in recomb_data - assert recomb_data['event'] == 'foo' - - -def test_separate_verbose_events(fake_callback, wrapped_handle): - # Pretend that this is done by the Ansible callback module - wrapped_handle.write('Using /etc/ansible/ansible.cfg as config file\n') - wrapped_handle.write('SSH password: \n') - write_encoded_event_data(wrapped_handle, { # associated with _next_ event - 'uuid': EXAMPLE_UUID - }) - # stop pretending - - assert len(fake_callback) == 2 - for event_data in fake_callback: - assert 'event' in event_data - assert event_data['event'] == 'verbose' - - -def test_large_data_payload(fake_callback, fake_cache, wrapped_handle): - # Pretend that this is done by the Ansible callback module - fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} - event_data_to_encode = { - 'uuid': EXAMPLE_UUID, - 'host': 'localhost', - 'role': 'some_path_to_role' - } - assert len(json.dumps(event_data_to_encode)) > MAX_WIDTH - write_encoded_event_data(wrapped_handle, event_data_to_encode) - wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n') - wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n') - write_encoded_event_data(wrapped_handle, {}) - # stop pretending - - assert len(fake_callback) == 1 - recomb_data = fake_callback[0] - assert 'role' in recomb_data - assert recomb_data['role'] == 'some_path_to_role' - assert 'event' in recomb_data - assert recomb_data['event'] == 'foo' - - -def test_event_lazy_parsing(fake_callback, fake_cache, wrapped_handle): - # Pretend that this is done by the Ansible callback module - fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} - buff = StringIO() - event_data_to_encode = { - 'uuid': EXAMPLE_UUID, - 'host': 'localhost', - 'role': 'some_path_to_role' - } - write_encoded_event_data(buff, event_data_to_encode) - - # write the data to the event filter in chunks to test lazy event matching - buff.seek(0) - start_token_chunk = buff.read(1) # \x1b - start_token_remainder = buff.read(2) # [K - body = buff.read(15) # next 15 bytes of base64 data - remainder = buff.read() # the remainder - for chunk in (start_token_chunk, start_token_remainder, body, remainder): - wrapped_handle.write(chunk) - - wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n') - wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n') - write_encoded_event_data(wrapped_handle, {}) - # stop pretending - - assert len(fake_callback) == 1 - recomb_data = fake_callback[0] - assert 'role' in recomb_data - assert recomb_data['role'] == 'some_path_to_role' - assert 'event' in recomb_data - assert recomb_data['event'] == 'foo' - - -@pytest.mark.timeout(1) -def test_large_stdout_blob(): - def _callback(*args, **kw): - pass - - f = OutputEventFilter(_callback) - for x in range(1024 * 10): - f.write('x' * 1024) - - -def test_verbose_line_buffering(): - events = [] - - def _callback(event_data): - events.append(event_data) - - f = OutputVerboseFilter(_callback) - f.write('one two\r\n\r\n') - - assert len(events) == 2 - assert events[0]['start_line'] == 0 - assert events[0]['end_line'] == 1 - assert events[0]['stdout'] == 'one two' - - assert events[1]['start_line'] == 1 - assert events[1]['end_line'] == 2 - assert events[1]['stdout'] == '' - - f.write('three') - assert len(events) == 2 - f.write('\r\nfou') - - # three is not pushed to buffer until its line completes - assert len(events) == 3 - assert events[2]['start_line'] == 2 - assert events[2]['end_line'] == 3 - assert events[2]['stdout'] == 'three' - - f.write('r\r') - f.write('\nfi') - - assert events[3]['start_line'] == 3 - assert events[3]['end_line'] == 4 - assert events[3]['stdout'] == 'four' - - f.write('ve') - f.write('\r\n') - - assert len(events) == 5 - assert events[4]['start_line'] == 4 - assert events[4]['end_line'] == 5 - assert events[4]['stdout'] == 'five' - - f.close() - - from pprint import pprint - pprint(events) - assert len(events) == 6 - - assert events[5]['event'] == 'EOF' diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 3539f11536..54d5abe266 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -2,7 +2,6 @@ # All Rights Reserved. # Python -import base64 import json import yaml import logging @@ -16,7 +15,6 @@ import contextlib import tempfile import psutil from functools import reduce, wraps -from io import StringIO from decimal import Decimal @@ -42,7 +40,7 @@ __all__ = ['get_object_or_400', 'camelcase_to_underscore', 'memoize', 'memoize_d 'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', '_inventory_updates', 'get_pk_from_dict', 'getattrd', 'getattr_dne', 'NoDefaultProvided', - 'get_current_apps', 'set_current_apps', 'OutputEventFilter', 'OutputVerboseFilter', + 'get_current_apps', 'set_current_apps', 'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity', 'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict', 'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest', @@ -961,117 +959,6 @@ def get_custom_venv_choices(custom_paths=None): return custom_venv_choices -class OutputEventFilter(object): - ''' - File-like object that looks for encoded job events in stdout data. - ''' - - EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K') - - def __init__(self, event_callback): - self._event_callback = event_callback - self._counter = 0 - self._start_line = 0 - self._buffer = StringIO() - self._last_chunk = '' - self._current_event_data = None - - def flush(self): - # pexpect wants to flush the file it writes to, but we're not - # actually capturing stdout to a raw file; we're just - # implementing a custom `write` method to discover and emit events from - # the stdout stream - pass - - def write(self, data): - data = smart_str(data) - self._buffer.write(data) - - # keep a sliding window of the last chunk written so we can detect - # event tokens and determine if we need to perform a search of the full - # buffer - should_search = '\x1b[K' in (self._last_chunk + data) - self._last_chunk = data - - # Only bother searching the buffer if we recently saw a start/end - # token (\x1b[K) - while should_search: - value = self._buffer.getvalue() - match = self.EVENT_DATA_RE.search(value) - if not match: - break - try: - base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1)) - event_data = json.loads(base64.b64decode(base64_data)) - except ValueError: - event_data = {} - self._emit_event(value[:match.start()], event_data) - remainder = value[match.end():] - self._buffer = StringIO() - self._buffer.write(remainder) - self._last_chunk = remainder - - def close(self): - value = self._buffer.getvalue() - if value: - self._emit_event(value) - self._buffer = StringIO() - self._event_callback(dict(event='EOF', final_counter=self._counter)) - - def _emit_event(self, buffered_stdout, next_event_data=None): - next_event_data = next_event_data or {} - if self._current_event_data: - event_data = self._current_event_data - stdout_chunks = [buffered_stdout] - elif buffered_stdout: - event_data = dict(event='verbose') - stdout_chunks = buffered_stdout.splitlines(True) - else: - stdout_chunks = [] - - for stdout_chunk in stdout_chunks: - self._counter += 1 - event_data['counter'] = self._counter - event_data['stdout'] = stdout_chunk[:-2] if len(stdout_chunk) > 2 else "" - n_lines = stdout_chunk.count('\n') - event_data['start_line'] = self._start_line - event_data['end_line'] = self._start_line + n_lines - self._start_line += n_lines - if self._event_callback: - self._event_callback(event_data) - - if next_event_data.get('uuid', None): - self._current_event_data = next_event_data - else: - self._current_event_data = None - - -class OutputVerboseFilter(OutputEventFilter): - ''' - File-like object that dispatches stdout data. - Does not search for encoded job event data. - Use for unified job types that do not encode job event data. - ''' - def write(self, data): - self._buffer.write(data) - - # if the current chunk contains a line break - if data and '\n' in data: - # emit events for all complete lines we know about - lines = self._buffer.getvalue().splitlines(True) # keep ends - remainder = None - # if last line is not a complete line, then exclude it - if '\n' not in lines[-1]: - remainder = lines.pop() - # emit all complete lines - for line in lines: - self._emit_event(line) - self._buffer = StringIO() - # put final partial line back on buffer - if remainder: - self._buffer.write(remainder) - - def is_ansible_variable(key): return key.startswith('ansible_')