From 51f7907a01a573647a00a7d6cb2493fe7f5daa5c Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Tue, 16 Jan 2018 10:52:12 -0500 Subject: [PATCH] optimize OutputEventFilter for large stdout streams update our event data search algorithm to be a bit lazier in event data discovery; this drastically improves processing speeds for stdout >5MB see: https://github.com/ansible/awx/issues/417 --- .../commands/run_callback_receiver.py | 2 +- .../tests/unit/utils/test_event_filter.py | 44 +++++++++++++++++++ awx/main/utils/common.py | 34 ++++++++++---- requirements/requirements_dev.txt | 1 + 4 files changed, 71 insertions(+), 10 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 0c6c4f90dd..adf558631e 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -147,7 +147,7 @@ class CallbackBrokerWorker(ConsumerMixin): from pprint import pformat logger.info('Body: {}'.format( highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly')) - )) + )[:1024 * 4]) def _save_event_data(): for key, cls in event_map.items(): diff --git a/awx/main/tests/unit/utils/test_event_filter.py b/awx/main/tests/unit/utils/test_event_filter.py index aa5a210fcb..4db92c91ef 100644 --- a/awx/main/tests/unit/utils/test_event_filter.py +++ b/awx/main/tests/unit/utils/test_event_filter.py @@ -1,6 +1,7 @@ import pytest import base64 import json +from StringIO import StringIO from awx.main.utils import OutputEventFilter @@ -99,3 +100,46 @@ def test_large_data_payload(fake_callback, fake_cache, wrapped_handle): assert recomb_data['role'] == 'some_path_to_role' assert 'event' in recomb_data assert recomb_data['event'] == 'foo' + + +def test_event_lazy_parsing(fake_callback, fake_cache, wrapped_handle): + # Pretend that this is done by the Ansible callback module + fake_cache[':1:ev-{}'.format(EXAMPLE_UUID)] = {'event': 'foo'} + buff = StringIO() + event_data_to_encode = { + 'uuid': EXAMPLE_UUID, + 'host': 'localhost', + 'role': 'some_path_to_role' + } + write_encoded_event_data(buff, event_data_to_encode) + + # write the data to the event filter in chunks to test lazy event matching + buff.seek(0) + start_token_chunk = buff.read(1) # \x1b + start_token_remainder = buff.read(2) # [K + body = buff.read(15) # next 15 bytes of base64 data + remainder = buff.read() # the remainder + for chunk in (start_token_chunk, start_token_remainder, body, remainder): + wrapped_handle.write(chunk) + + wrapped_handle.write('\r\nTASK [Gathering Facts] *********************************************************\n') + wrapped_handle.write('\u001b[0;33mchanged: [localhost]\u001b[0m\n') + write_encoded_event_data(wrapped_handle, {}) + # stop pretending + + assert len(fake_callback) == 1 + recomb_data = fake_callback[0] + assert 'role' in recomb_data + assert recomb_data['role'] == 'some_path_to_role' + assert 'event' in recomb_data + assert recomb_data['event'] == 'foo' + + +@pytest.mark.timeout(1) +def test_large_stdout_blob(): + def _callback(*args, **kw): + pass + + f = OutputEventFilter(_callback) + for x in range(1024 * 10): + f.write('x' * 1024) diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 97b5ae1253..443da1f1f3 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -18,6 +18,7 @@ import contextlib import tempfile import six import psutil +from StringIO import StringIO # Decorator from decorator import decorator @@ -867,7 +868,8 @@ class OutputEventFilter(object): self._event_ct = 0 self._counter = 1 self._start_line = 0 - self._buffer = '' + self._buffer = StringIO() + self._last_chunk = '' self._current_event_data = None def flush(self): @@ -878,9 +880,19 @@ class OutputEventFilter(object): pass def write(self, data): - self._buffer += data - while True: - match = self.EVENT_DATA_RE.search(self._buffer) + self._buffer.write(data) + + # keep a sliding window of the last chunk written so we can detect + # event tokens and determine if we need to perform a search of the full + # buffer + should_search = '\x1b[K' in (self._last_chunk + data) + self._last_chunk = data + + # Only bother searching the buffer if we recently saw a start/end + # token (\x1b[K) + while should_search: + value = self._buffer.getvalue() + match = self.EVENT_DATA_RE.search(value) if not match: break try: @@ -888,13 +900,17 @@ class OutputEventFilter(object): event_data = json.loads(base64.b64decode(base64_data)) except ValueError: event_data = {} - self._emit_event(self._buffer[:match.start()], event_data) - self._buffer = self._buffer[match.end():] + self._emit_event(value[:match.start()], event_data) + remainder = value[match.end():] + self._buffer = StringIO() + self._buffer.write(remainder) + self._last_chunk = remainder def close(self): - if self._buffer: - self._emit_event(self._buffer) - self._buffer = '' + value = self._buffer.getvalue() + if value: + self._emit_event(value) + self._buffer = StringIO() self._event_callback(dict(event='EOF')) def _emit_event(self, buffered_stdout, next_event_data=None): diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index aa3edffe6b..131b8ebef5 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -10,6 +10,7 @@ pytest-cov pytest-django pytest-pythonpath pytest-mock +pytest-timeout logutils flower uwsgitop