Merge pull request #3661 from AlanCoding/stdout_cleanup

Remove stdout wrapper classes that moved to runner

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot] 2019-04-10 15:05:32 +00:00 committed by GitHub
commit ae7b173e17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 1 additions and 312 deletions

View File

@ -1,198 +0,0 @@
import pytest
import base64
import json
from io import StringIO
from django.utils.encoding import smart_bytes, smart_str
from awx.main.utils import OutputEventFilter, OutputVerboseFilter
MAX_WIDTH = 78
EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd'
def write_encoded_event_data(fileobj, data):
b64data = smart_str(base64.b64encode(smart_bytes(json.dumps(data))))
# pattern corresponding to OutputEventFilter expectation
fileobj.write('\x1b[K')
for offset in range(0, len(b64data), MAX_WIDTH):
chunk = b64data[offset:offset + MAX_WIDTH]
escaped_chunk = '{}\x1b[{}D'.format(chunk, len(chunk))
fileobj.write(escaped_chunk)
fileobj.write('\x1b[K')
@pytest.fixture
def fake_callback():
return []
@pytest.fixture
def fake_cache():
return {}
@pytest.fixture
def wrapped_handle(job_event_callback):
# Preliminary creation of resources usually done in tasks.py
return OutputEventFilter(job_event_callback)
@pytest.fixture
def job_event_callback(fake_callback, fake_cache):
def method(event_data):
if 'uuid' in event_data:
cache_event = fake_cache.get(':1:ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
fake_callback.append(event_data)
return method
def test_event_recomb(fake_callback, fake_cache, wrapped_handle):
# Pretend that this is done by the Ansible callback module
fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'}
write_encoded_event_data(wrapped_handle, {
'uuid': EXAMPLE_UUID
})
wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n')
wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n')
write_encoded_event_data(wrapped_handle, {})
# stop pretending
assert len(fake_callback) == 1
recomb_data = fake_callback[0]
assert 'event' in recomb_data
assert recomb_data['event'] == 'foo'
def test_separate_verbose_events(fake_callback, wrapped_handle):
# Pretend that this is done by the Ansible callback module
wrapped_handle.write('Using /etc/ansible/ansible.cfg as config file\n')
wrapped_handle.write('SSH password: \n')
write_encoded_event_data(wrapped_handle, { # associated with _next_ event
'uuid': EXAMPLE_UUID
})
# stop pretending
assert len(fake_callback) == 2
for event_data in fake_callback:
assert 'event' in event_data
assert event_data['event'] == 'verbose'
def test_large_data_payload(fake_callback, fake_cache, wrapped_handle):
# Pretend that this is done by the Ansible callback module
fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'}
event_data_to_encode = {
'uuid': EXAMPLE_UUID,
'host': 'localhost',
'role': 'some_path_to_role'
}
assert len(json.dumps(event_data_to_encode)) > MAX_WIDTH
write_encoded_event_data(wrapped_handle, event_data_to_encode)
wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n')
wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n')
write_encoded_event_data(wrapped_handle, {})
# stop pretending
assert len(fake_callback) == 1
recomb_data = fake_callback[0]
assert 'role' in recomb_data
assert recomb_data['role'] == 'some_path_to_role'
assert 'event' in recomb_data
assert recomb_data['event'] == 'foo'
def test_event_lazy_parsing(fake_callback, fake_cache, wrapped_handle):
# Pretend that this is done by the Ansible callback module
fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'}
buff = StringIO()
event_data_to_encode = {
'uuid': EXAMPLE_UUID,
'host': 'localhost',
'role': 'some_path_to_role'
}
write_encoded_event_data(buff, event_data_to_encode)
# write the data to the event filter in chunks to test lazy event matching
buff.seek(0)
start_token_chunk = buff.read(1) # \x1b
start_token_remainder = buff.read(2) # [K
body = buff.read(15) # next 15 bytes of base64 data
remainder = buff.read() # the remainder
for chunk in (start_token_chunk, start_token_remainder, body, remainder):
wrapped_handle.write(chunk)
wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n')
wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n')
write_encoded_event_data(wrapped_handle, {})
# stop pretending
assert len(fake_callback) == 1
recomb_data = fake_callback[0]
assert 'role' in recomb_data
assert recomb_data['role'] == 'some_path_to_role'
assert 'event' in recomb_data
assert recomb_data['event'] == 'foo'
@pytest.mark.timeout(1)
def test_large_stdout_blob():
def _callback(*args, **kw):
pass
f = OutputEventFilter(_callback)
for x in range(1024 * 10):
f.write('x' * 1024)
def test_verbose_line_buffering():
events = []
def _callback(event_data):
events.append(event_data)
f = OutputVerboseFilter(_callback)
f.write('one two\r\n\r\n')
assert len(events) == 2
assert events[0]['start_line'] == 0
assert events[0]['end_line'] == 1
assert events[0]['stdout'] == 'one two'
assert events[1]['start_line'] == 1
assert events[1]['end_line'] == 2
assert events[1]['stdout'] == ''
f.write('three')
assert len(events) == 2
f.write('\r\nfou')
# three is not pushed to buffer until its line completes
assert len(events) == 3
assert events[2]['start_line'] == 2
assert events[2]['end_line'] == 3
assert events[2]['stdout'] == 'three'
f.write('r\r')
f.write('\nfi')
assert events[3]['start_line'] == 3
assert events[3]['end_line'] == 4
assert events[3]['stdout'] == 'four'
f.write('ve')
f.write('\r\n')
assert len(events) == 5
assert events[4]['start_line'] == 4
assert events[4]['end_line'] == 5
assert events[4]['stdout'] == 'five'
f.close()
from pprint import pprint
pprint(events)
assert len(events) == 6
assert events[5]['event'] == 'EOF'

View File

@ -2,7 +2,6 @@
# All Rights Reserved.
# Python
import base64
import json
import yaml
import logging
@ -16,7 +15,6 @@ import contextlib
import tempfile
import psutil
from functools import reduce, wraps
from io import StringIO
from decimal import Decimal
@ -42,7 +40,7 @@ __all__ = ['get_object_or_400', 'camelcase_to_underscore', 'memoize', 'memoize_d
'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'getattr_dne', 'NoDefaultProvided',
'get_current_apps', 'set_current_apps', 'OutputEventFilter', 'OutputVerboseFilter',
'get_current_apps', 'set_current_apps',
'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity',
'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict',
'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest',
@ -961,117 +959,6 @@ def get_custom_venv_choices(custom_paths=None):
return custom_venv_choices
class OutputEventFilter(object):
'''
File-like object that looks for encoded job events in stdout data.
'''
EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')
def __init__(self, event_callback):
self._event_callback = event_callback
self._counter = 0
self._start_line = 0
self._buffer = StringIO()
self._last_chunk = ''
self._current_event_data = None
def flush(self):
# pexpect wants to flush the file it writes to, but we're not
# actually capturing stdout to a raw file; we're just
# implementing a custom `write` method to discover and emit events from
# the stdout stream
pass
def write(self, data):
data = smart_str(data)
self._buffer.write(data)
# keep a sliding window of the last chunk written so we can detect
# event tokens and determine if we need to perform a search of the full
# buffer
should_search = '\x1b[K' in (self._last_chunk + data)
self._last_chunk = data
# Only bother searching the buffer if we recently saw a start/end
# token (\x1b[K)
while should_search:
value = self._buffer.getvalue()
match = self.EVENT_DATA_RE.search(value)
if not match:
break
try:
base64_data = re.sub(r'\x1b\[\d+D', '', match.group(1))
event_data = json.loads(base64.b64decode(base64_data))
except ValueError:
event_data = {}
self._emit_event(value[:match.start()], event_data)
remainder = value[match.end():]
self._buffer = StringIO()
self._buffer.write(remainder)
self._last_chunk = remainder
def close(self):
value = self._buffer.getvalue()
if value:
self._emit_event(value)
self._buffer = StringIO()
self._event_callback(dict(event='EOF', final_counter=self._counter))
def _emit_event(self, buffered_stdout, next_event_data=None):
next_event_data = next_event_data or {}
if self._current_event_data:
event_data = self._current_event_data
stdout_chunks = [buffered_stdout]
elif buffered_stdout:
event_data = dict(event='verbose')
stdout_chunks = buffered_stdout.splitlines(True)
else:
stdout_chunks = []
for stdout_chunk in stdout_chunks:
self._counter += 1
event_data['counter'] = self._counter
event_data['stdout'] = stdout_chunk[:-2] if len(stdout_chunk) > 2 else ""
n_lines = stdout_chunk.count('\n')
event_data['start_line'] = self._start_line
event_data['end_line'] = self._start_line + n_lines
self._start_line += n_lines
if self._event_callback:
self._event_callback(event_data)
if next_event_data.get('uuid', None):
self._current_event_data = next_event_data
else:
self._current_event_data = None
class OutputVerboseFilter(OutputEventFilter):
'''
File-like object that dispatches stdout data.
Does not search for encoded job event data.
Use for unified job types that do not encode job event data.
'''
def write(self, data):
self._buffer.write(data)
# if the current chunk contains a line break
if data and '\n' in data:
# emit events for all complete lines we know about
lines = self._buffer.getvalue().splitlines(True) # keep ends
remainder = None
# if last line is not a complete line, then exclude it
if '\n' not in lines[-1]:
remainder = lines.pop()
# emit all complete lines
for line in lines:
self._emit_event(line)
self._buffer = StringIO()
# put final partial line back on buffer
if remainder:
self._buffer.write(remainder)
def is_ansible_variable(key):
return key.startswith('ansible_')