From b3733d233e28caf807e83602d0e350ffe4361975 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 15 Mar 2017 10:46:08 -0400 Subject: [PATCH 1/2] always use async http logging - even in celery workers additionally, add a timeout to requests to avoid thread starvation see: #5718 see: #5729 --- awx/main/tasks.py | 2 +- awx/main/tests/unit/utils/test_handlers.py | 38 ++++++++-------------- awx/main/utils/handlers.py | 32 +++++------------- awx/settings/defaults.py | 1 + 4 files changed, 24 insertions(+), 49 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1ad050a993..d60d936531 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -90,7 +90,7 @@ def celery_startup(conf=None, **kwargs): @worker_process_init.connect def task_set_logger_pre_run(*args, **kwargs): cache.close() - configure_external_logger(settings, async_flag=False, is_startup=False) + configure_external_logger(settings, is_startup=False) def _clear_cache_keys(set_of_keys): diff --git a/awx/main/tests/unit/utils/test_handlers.py b/awx/main/tests/unit/utils/test_handlers.py index 3de3b2e7b7..ba9ffc3e33 100644 --- a/awx/main/tests/unit/utils/test_handlers.py +++ b/awx/main/tests/unit/utils/test_handlers.py @@ -2,6 +2,7 @@ import base64 import json import logging +from django.conf import settings from django.conf import LazySettings import pytest import requests @@ -40,17 +41,16 @@ def ok200_adapter(): return OK200Adapter() -def test_https_logging_handler_requests_sync_implementation(): - handler = HTTPSHandler(async=False) - assert not isinstance(handler.session, FuturesSession) - assert isinstance(handler.session, requests.Session) - - def test_https_logging_handler_requests_async_implementation(): - handler = HTTPSHandler(async=True) + handler = HTTPSHandler() assert isinstance(handler.session, FuturesSession) +def test_https_logging_handler_has_default_http_timeout(): + handler = HTTPSHandler.from_django_settings(settings) + assert handler.http_timeout == 5 + + @pytest.mark.parametrize('param', PARAM_NAMES.keys()) def test_https_logging_handler_defaults(param): handler = HTTPSHandler() @@ -114,18 +114,12 @@ def test_https_logging_handler_skip_log(params, logger_name, expected): assert handler.skip_log(logger_name) is expected -@pytest.mark.parametrize('message_type, async', [ - ('logstash', False), - ('logstash', True), - ('splunk', False), - ('splunk', True), -]) +@pytest.mark.parametrize('message_type', ['logstash', 'splunk']) def test_https_logging_handler_emit(ok200_adapter, dummy_log_record, - message_type, async): + message_type): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, message_type=message_type, - enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], - async=async) + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) handler.setFormatter(LogstashFormatter()) handler.session.mount('http://', ok200_adapter) async_futures = handler.emit(dummy_log_record) @@ -151,14 +145,12 @@ def test_https_logging_handler_emit(ok200_adapter, dummy_log_record, assert body['message'] == 'User joe logged in' -@pytest.mark.parametrize('async', (True, False)) def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter, - dummy_log_record, async): + dummy_log_record): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, username='user', password='pass', message_type='logstash', - enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], - async=async) + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) handler.setFormatter(LogstashFormatter()) handler.session.mount('http://', ok200_adapter) async_futures = handler.emit(dummy_log_record) @@ -169,13 +161,11 @@ def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter, assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass") -@pytest.mark.parametrize('async', (True, False)) def test_https_logging_handler_emit_splunk_with_creds(ok200_adapter, - dummy_log_record, async): + dummy_log_record): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, password='pass', message_type='splunk', - enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], - async=async) + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) handler.setFormatter(LogstashFormatter()) handler.session.mount('http://', ok200_adapter) async_futures = handler.emit(dummy_log_record) diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index fe2fb87228..c65cfb7cdd 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -33,6 +33,7 @@ PARAM_NAMES = { 'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS', 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', 'enabled_flag': 'LOG_AGGREGATOR_ENABLED', + 'http_timeout': 'LOG_AGGREGATOR_HTTP_TIMEOUT', } @@ -51,13 +52,9 @@ class BaseHTTPSHandler(logging.Handler): def __init__(self, fqdn=False, **kwargs): super(BaseHTTPSHandler, self).__init__() self.fqdn = fqdn - self.async = kwargs.get('async', True) for fd in PARAM_NAMES: setattr(self, fd, kwargs.get(fd, None)) - if self.async: - self.session = FuturesSession() - else: - self.session = requests.Session() + self.session = FuturesSession() self.add_auth_information() @classmethod @@ -105,10 +102,8 @@ class BaseHTTPSHandler(logging.Handler): payload_str = json.dumps(payload_input) else: payload_str = payload_input - if self.async: - return dict(data=payload_str, background_callback=unused_callback) - else: - return dict(data=payload_str) + return dict(data=payload_str, background_callback=unused_callback, + timeout=self.http_timeout) def skip_log(self, logger_name): if self.host == '' or (not self.enabled_flag): @@ -123,10 +118,6 @@ class BaseHTTPSHandler(logging.Handler): Emit a log record. Returns a list of zero or more ``concurrent.futures.Future`` objects. - When ``self.async`` is True, the list will contain one - Future object for each HTTP request made. When ``self.async`` is - False, the list will be empty. - See: https://docs.python.org/3/library/concurrent.futures.html#future-objects http://pythonhosted.org/futures/ @@ -147,17 +138,10 @@ class BaseHTTPSHandler(logging.Handler): for key in facts_dict: fact_payload = copy(payload_data) fact_payload.update(facts_dict[key]) - if self.async: - async_futures.append(self._send(fact_payload)) - else: - self._send(fact_payload) + async_futures.append(self._send(fact_payload)) return async_futures - if self.async: - return [self._send(payload)] - - self._send(payload) - return [] + return [self._send(payload)] except (KeyboardInterrupt, SystemExit): raise except: @@ -179,7 +163,7 @@ def add_or_remove_logger(address, instance): specific_logger.handlers.append(instance) -def configure_external_logger(settings_module, async_flag=True, is_startup=True): +def configure_external_logger(settings_module, is_startup=True): is_enabled = settings_module.LOG_AGGREGATOR_ENABLED if is_startup and (not is_enabled): @@ -188,7 +172,7 @@ def configure_external_logger(settings_module, async_flag=True, is_startup=True) instance = None if is_enabled: - instance = BaseHTTPSHandler.from_django_settings(settings_module, async=async_flag) + instance = BaseHTTPSHandler.from_django_settings(settings_module) instance.setFormatter(LogstashFormatter(settings_module=settings_module)) awx_logger_instance = instance if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e673e347a7..e3af1781db 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -867,6 +867,7 @@ INSIGHTS_URL_BASE = "https://access.redhat.com" TOWER_SETTINGS_MANIFEST = {} LOG_AGGREGATOR_ENABLED = False +LOG_AGGREGATOR_HTTP_TIMEOUT = 5 # The number of retry attempts for websocket session establishment # If you're encountering issues establishing websockets in clustered Tower, From 98ad2684d36fc08ccb98db99866f015fc38bb3b6 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 15 Mar 2017 17:23:05 -0400 Subject: [PATCH 2/2] when async log shipping fails, log an explanation --- awx/main/tests/unit/utils/test_handlers.py | 39 ++++++++++++++++++++++ awx/main/utils/handlers.py | 34 ++++++++++++++++++- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/awx/main/tests/unit/utils/test_handlers.py b/awx/main/tests/unit/utils/test_handlers.py index ba9ffc3e33..d7b7f5aed9 100644 --- a/awx/main/tests/unit/utils/test_handlers.py +++ b/awx/main/tests/unit/utils/test_handlers.py @@ -1,4 +1,5 @@ import base64 +import cStringIO import json import logging @@ -41,6 +42,17 @@ def ok200_adapter(): return OK200Adapter() +@pytest.fixture() +def connection_error_adapter(): + class ConnectionErrorAdapter(requests.adapters.HTTPAdapter): + + def send(self, request, **kwargs): + err = requests.packages.urllib3.exceptions.SSLError() + raise requests.exceptions.ConnectionError(err, request=request) + + return ConnectionErrorAdapter() + + def test_https_logging_handler_requests_async_implementation(): handler = HTTPSHandler() assert isinstance(handler.session, FuturesSession) @@ -114,6 +126,33 @@ def test_https_logging_handler_skip_log(params, logger_name, expected): assert handler.skip_log(logger_name) is expected +def test_https_logging_handler_connection_error(connection_error_adapter, + dummy_log_record): + handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, + message_type='logstash', + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) + handler.setFormatter(LogstashFormatter()) + handler.session.mount('http://', connection_error_adapter) + + buff = cStringIO.StringIO() + logging.getLogger('awx.main.utils.handlers').addHandler( + logging.StreamHandler(buff) + ) + + async_futures = handler.emit(dummy_log_record) + with pytest.raises(requests.exceptions.ConnectionError): + [future.result() for future in async_futures] + assert 'failed to emit log to external aggregator\nTraceback' in buff.getvalue() + + # we should only log failures *periodically*, so causing *another* + # immediate failure shouldn't report a second ConnectionError + buff.truncate(0) + async_futures = handler.emit(dummy_log_record) + with pytest.raises(requests.exceptions.ConnectionError): + [future.result() for future in async_futures] + assert buff.getvalue() == '' + + @pytest.mark.parametrize('message_type', ['logstash', 'splunk']) def test_https_logging_handler_emit(ok200_adapter, dummy_log_record, message_type): diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index c65cfb7cdd..b1b96b492f 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -5,6 +5,8 @@ import logging import json import requests +import time +from concurrent.futures import ThreadPoolExecutor from copy import copy # loggly @@ -18,6 +20,8 @@ from awx.main.utils.formatters import LogstashFormatter __all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger'] +logger = logging.getLogger('awx.main.utils.handlers') + # AWX external logging handler, generally designed to be used # with the accompanying LogstashHandler, derives from python-logstash library # Non-blocking request accomplished by FuturesSession, similar @@ -48,13 +52,41 @@ class HTTPSNullHandler(logging.NullHandler): return super(HTTPSNullHandler, self).__init__() +class VerboseThreadPoolExecutor(ThreadPoolExecutor): + + last_log_emit = 0 + + def submit(self, func, *args, **kwargs): + def _wrapped(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception: + # If an exception occurs in a concurrent thread worker (like + # a ConnectionError or a read timeout), periodically log + # that failure. + # + # This approach isn't really thread-safe, so we could + # potentially log once per thread every 10 seconds, but it + # beats logging *every* failed HTTP request in a scenario where + # you've typo'd your log aggregator hostname. + now = time.time() + if now - self.last_log_emit > 10: + logger.exception('failed to emit log to external aggregator') + self.last_log_emit = now + raise + return super(VerboseThreadPoolExecutor, self).submit(_wrapped, *args, + **kwargs) + + class BaseHTTPSHandler(logging.Handler): def __init__(self, fqdn=False, **kwargs): super(BaseHTTPSHandler, self).__init__() self.fqdn = fqdn for fd in PARAM_NAMES: setattr(self, fd, kwargs.get(fd, None)) - self.session = FuturesSession() + self.session = FuturesSession(executor=VerboseThreadPoolExecutor( + max_workers=2 # this is the default used by requests_futures + )) self.add_auth_information() @classmethod