From 98ad2684d36fc08ccb98db99866f015fc38bb3b6 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 15 Mar 2017 17:23:05 -0400 Subject: [PATCH] when async log shipping fails, log an explanation --- awx/main/tests/unit/utils/test_handlers.py | 39 ++++++++++++++++++++++ awx/main/utils/handlers.py | 34 ++++++++++++++++++- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/awx/main/tests/unit/utils/test_handlers.py b/awx/main/tests/unit/utils/test_handlers.py index ba9ffc3e33..d7b7f5aed9 100644 --- a/awx/main/tests/unit/utils/test_handlers.py +++ b/awx/main/tests/unit/utils/test_handlers.py @@ -1,4 +1,5 @@ import base64 +import cStringIO import json import logging @@ -41,6 +42,17 @@ def ok200_adapter(): return OK200Adapter() +@pytest.fixture() +def connection_error_adapter(): + class ConnectionErrorAdapter(requests.adapters.HTTPAdapter): + + def send(self, request, **kwargs): + err = requests.packages.urllib3.exceptions.SSLError() + raise requests.exceptions.ConnectionError(err, request=request) + + return ConnectionErrorAdapter() + + def test_https_logging_handler_requests_async_implementation(): handler = HTTPSHandler() assert isinstance(handler.session, FuturesSession) @@ -114,6 +126,33 @@ def test_https_logging_handler_skip_log(params, logger_name, expected): assert handler.skip_log(logger_name) is expected +def test_https_logging_handler_connection_error(connection_error_adapter, + dummy_log_record): + handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, + message_type='logstash', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + handler.session.mount('http://', connection_error_adapter) + + buff = cStringIO.StringIO() + logging.getLogger('awx.main.utils.handlers').addHandler( + logging.StreamHandler(buff) + ) + + async_futures = handler.emit(dummy_log_record) + with pytest.raises(requests.exceptions.ConnectionError): + [future.result() for future in async_futures] + assert 'failed to emit log to external aggregator\nTraceback' in buff.getvalue() + + # we should only log failures *periodically*, so causing *another* + # immediate failure shouldn't report a second ConnectionError + buff.truncate(0) + async_futures = handler.emit(dummy_log_record) + with pytest.raises(requests.exceptions.ConnectionError): + [future.result() for future in async_futures] + assert buff.getvalue() == '' + + @pytest.mark.parametrize('message_type', ['logstash', 'splunk']) def test_https_logging_handler_emit(ok200_adapter, dummy_log_record, message_type): diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index c65cfb7cdd..b1b96b492f 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -5,6 +5,8 @@ import logging import json import requests +import time +from concurrent.futures import ThreadPoolExecutor from copy import copy # loggly @@ -18,6 +20,8 @@ from awx.main.utils.formatters import LogstashFormatter __all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger'] +logger = logging.getLogger('awx.main.utils.handlers') + # AWX external logging handler, generally designed to be used # with the accompanying LogstashHandler, derives from python-logstash library # Non-blocking request accomplished by FuturesSession, similar @@ -48,13 +52,41 @@ class HTTPSNullHandler(logging.NullHandler): return super(HTTPSNullHandler, self).__init__() +class VerboseThreadPoolExecutor(ThreadPoolExecutor): + + last_log_emit = 0 + + def submit(self, func, *args, **kwargs): + def _wrapped(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception: + # If an exception occurs in a concurrent thread worker (like + # a ConnectionError or a read timeout), periodically log + # that failure. + # + # This approach isn't really thread-safe, so we could + # potentially log once per thread every 10 seconds, but it + # beats logging *every* failed HTTP request in a scenario where + # you've typo'd your log aggregator hostname. + now = time.time() + if now - self.last_log_emit > 10: + logger.exception('failed to emit log to external aggregator') + self.last_log_emit = now + raise + return super(VerboseThreadPoolExecutor, self).submit(_wrapped, *args, + **kwargs) + + class BaseHTTPSHandler(logging.Handler): def __init__(self, fqdn=False, **kwargs): super(BaseHTTPSHandler, self).__init__() self.fqdn = fqdn for fd in PARAM_NAMES: setattr(self, fd, kwargs.get(fd, None)) - self.session = FuturesSession() + self.session = FuturesSession(executor=VerboseThreadPoolExecutor( + max_workers=2 # this is the default used by requests_futures + )) self.add_auth_information() @classmethod