From b3733d233e28caf807e83602d0e350ffe4361975 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 15 Mar 2017 10:46:08 -0400 Subject: [PATCH] always use async http logging - even in celery workers additionally, add a timeout to requests to avoid thread starvation see: #5718 see: #5729 --- awx/main/tasks.py | 2 +- awx/main/tests/unit/utils/test_handlers.py | 38 ++++++++-------------- awx/main/utils/handlers.py | 32 +++++------------- awx/settings/defaults.py | 1 + 4 files changed, 24 insertions(+), 49 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1ad050a993..d60d936531 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -90,7 +90,7 @@ def celery_startup(conf=None, **kwargs): @worker_process_init.connect def task_set_logger_pre_run(*args, **kwargs): cache.close() - configure_external_logger(settings, async_flag=False, is_startup=False) + configure_external_logger(settings, is_startup=False) def _clear_cache_keys(set_of_keys): diff --git a/awx/main/tests/unit/utils/test_handlers.py b/awx/main/tests/unit/utils/test_handlers.py index 3de3b2e7b7..ba9ffc3e33 100644 --- a/awx/main/tests/unit/utils/test_handlers.py +++ b/awx/main/tests/unit/utils/test_handlers.py @@ -2,6 +2,7 @@ import base64 import json import logging +from django.conf import settings from django.conf import LazySettings import pytest import requests @@ -40,17 +41,16 @@ def ok200_adapter(): return OK200Adapter() -def test_https_logging_handler_requests_sync_implementation(): - handler = HTTPSHandler(async=False) - assert not isinstance(handler.session, FuturesSession) - assert isinstance(handler.session, requests.Session) - - def test_https_logging_handler_requests_async_implementation(): - handler = HTTPSHandler(async=True) + handler = HTTPSHandler() assert isinstance(handler.session, FuturesSession) +def test_https_logging_handler_has_default_http_timeout(): + handler = HTTPSHandler.from_django_settings(settings) + assert handler.http_timeout == 5 + + @pytest.mark.parametrize('param', PARAM_NAMES.keys()) def test_https_logging_handler_defaults(param): handler = HTTPSHandler() @@ -114,18 +114,12 @@ def test_https_logging_handler_skip_log(params, logger_name, expected): assert handler.skip_log(logger_name) is expected -@pytest.mark.parametrize('message_type, async', [ - ('logstash', False), - ('logstash', True), - ('splunk', False), - ('splunk', True), -]) +@pytest.mark.parametrize('message_type', ['logstash', 'splunk']) def test_https_logging_handler_emit(ok200_adapter, dummy_log_record, - message_type, async): + message_type): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, message_type=message_type, - enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], - async=async) + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) handler.setFormatter(LogstashFormatter()) handler.session.mount('http://', ok200_adapter) async_futures = handler.emit(dummy_log_record) @@ -151,14 +145,12 @@ def test_https_logging_handler_emit(ok200_adapter, dummy_log_record, assert body['message'] == 'User joe logged in' -@pytest.mark.parametrize('async', (True, False)) def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter, - dummy_log_record, async): + dummy_log_record): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, username='user', password='pass', message_type='logstash', - enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], - async=async) + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) handler.setFormatter(LogstashFormatter()) handler.session.mount('http://', ok200_adapter) async_futures = handler.emit(dummy_log_record) @@ -169,13 +161,11 @@ def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter, assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass") -@pytest.mark.parametrize('async', (True, False)) def test_https_logging_handler_emit_splunk_with_creds(ok200_adapter, - dummy_log_record, async): + dummy_log_record): handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, password='pass', message_type='splunk', - enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], - async=async) + enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) handler.setFormatter(LogstashFormatter()) handler.session.mount('http://', ok200_adapter) async_futures = handler.emit(dummy_log_record) diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index fe2fb87228..c65cfb7cdd 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -33,6 +33,7 @@ PARAM_NAMES = { 'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS', 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', 'enabled_flag': 'LOG_AGGREGATOR_ENABLED', + 'http_timeout': 'LOG_AGGREGATOR_HTTP_TIMEOUT', } @@ -51,13 +52,9 @@ class BaseHTTPSHandler(logging.Handler): def __init__(self, fqdn=False, **kwargs): super(BaseHTTPSHandler, self).__init__() self.fqdn = fqdn - self.async = kwargs.get('async', True) for fd in PARAM_NAMES: setattr(self, fd, kwargs.get(fd, None)) - if self.async: - self.session = FuturesSession() - else: - self.session = requests.Session() + self.session = FuturesSession() self.add_auth_information() @classmethod @@ -105,10 +102,8 @@ class BaseHTTPSHandler(logging.Handler): payload_str = json.dumps(payload_input) else: payload_str = payload_input - if self.async: - return dict(data=payload_str, background_callback=unused_callback) - else: - return dict(data=payload_str) + return dict(data=payload_str, background_callback=unused_callback, + timeout=self.http_timeout) def skip_log(self, logger_name): if self.host == '' or (not self.enabled_flag): @@ -123,10 +118,6 @@ class BaseHTTPSHandler(logging.Handler): Emit a log record. Returns a list of zero or more ``concurrent.futures.Future`` objects. - When ``self.async`` is True, the list will contain one - Future object for each HTTP request made. When ``self.async`` is - False, the list will be empty. - See: https://docs.python.org/3/library/concurrent.futures.html#future-objects http://pythonhosted.org/futures/ @@ -147,17 +138,10 @@ class BaseHTTPSHandler(logging.Handler): for key in facts_dict: fact_payload = copy(payload_data) fact_payload.update(facts_dict[key]) - if self.async: - async_futures.append(self._send(fact_payload)) - else: - self._send(fact_payload) + async_futures.append(self._send(fact_payload)) return async_futures - if self.async: - return [self._send(payload)] - - self._send(payload) - return [] + return [self._send(payload)] except (KeyboardInterrupt, SystemExit): raise except: @@ -179,7 +163,7 @@ def add_or_remove_logger(address, instance): specific_logger.handlers.append(instance) -def configure_external_logger(settings_module, async_flag=True, is_startup=True): +def configure_external_logger(settings_module, is_startup=True): is_enabled = settings_module.LOG_AGGREGATOR_ENABLED if is_startup and (not is_enabled): @@ -188,7 +172,7 @@ def configure_external_logger(settings_module, async_flag=True, is_startup=True) instance = None if is_enabled: - instance = BaseHTTPSHandler.from_django_settings(settings_module, async=async_flag) + instance = BaseHTTPSHandler.from_django_settings(settings_module) instance.setFormatter(LogstashFormatter(settings_module=settings_module)) awx_logger_instance = instance if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e673e347a7..e3af1781db 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -867,6 +867,7 @@ INSIGHTS_URL_BASE = "https://access.redhat.com" TOWER_SETTINGS_MANIFEST = {} LOG_AGGREGATOR_ENABLED = False +LOG_AGGREGATOR_HTTP_TIMEOUT = 5 # The number of retry attempts for websocket session establishment # If you're encountering issues establishing websockets in clustered Tower,