Merge pull request #5737 from ryanpetrello/fix-5718

always use async http logging - even in celery workers
This commit is contained in:
Ryan Petrello
2017-03-17 13:28:46 -04:00
committed by GitHub
4 changed files with 93 additions and 47 deletions

View File

@@ -90,7 +90,7 @@ def celery_startup(conf=None, **kwargs):
@worker_process_init.connect @worker_process_init.connect
def task_set_logger_pre_run(*args, **kwargs): def task_set_logger_pre_run(*args, **kwargs):
cache.close() cache.close()
configure_external_logger(settings, async_flag=False, is_startup=False) configure_external_logger(settings, is_startup=False)
def _clear_cache_keys(set_of_keys): def _clear_cache_keys(set_of_keys):

View File

@@ -1,7 +1,9 @@
import base64 import base64
import cStringIO
import json import json
import logging import logging
from django.conf import settings
from django.conf import LazySettings from django.conf import LazySettings
import pytest import pytest
import requests import requests
@@ -40,17 +42,27 @@ def ok200_adapter():
return OK200Adapter() return OK200Adapter()
def test_https_logging_handler_requests_sync_implementation(): @pytest.fixture()
handler = HTTPSHandler(async=False) def connection_error_adapter():
assert not isinstance(handler.session, FuturesSession) class ConnectionErrorAdapter(requests.adapters.HTTPAdapter):
assert isinstance(handler.session, requests.Session)
def send(self, request, **kwargs):
err = requests.packages.urllib3.exceptions.SSLError()
raise requests.exceptions.ConnectionError(err, request=request)
return ConnectionErrorAdapter()
def test_https_logging_handler_requests_async_implementation(): def test_https_logging_handler_requests_async_implementation():
handler = HTTPSHandler(async=True) handler = HTTPSHandler()
assert isinstance(handler.session, FuturesSession) assert isinstance(handler.session, FuturesSession)
def test_https_logging_handler_has_default_http_timeout():
handler = HTTPSHandler.from_django_settings(settings)
assert handler.http_timeout == 5
@pytest.mark.parametrize('param', PARAM_NAMES.keys()) @pytest.mark.parametrize('param', PARAM_NAMES.keys())
def test_https_logging_handler_defaults(param): def test_https_logging_handler_defaults(param):
handler = HTTPSHandler() handler = HTTPSHandler()
@@ -114,18 +126,39 @@ def test_https_logging_handler_skip_log(params, logger_name, expected):
assert handler.skip_log(logger_name) is expected assert handler.skip_log(logger_name) is expected
@pytest.mark.parametrize('message_type, async', [ def test_https_logging_handler_connection_error(connection_error_adapter,
('logstash', False), dummy_log_record):
('logstash', True), handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
('splunk', False), message_type='logstash',
('splunk', True), enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
]) handler.setFormatter(LogstashFormatter())
handler.session.mount('http://', connection_error_adapter)
buff = cStringIO.StringIO()
logging.getLogger('awx.main.utils.handlers').addHandler(
logging.StreamHandler(buff)
)
async_futures = handler.emit(dummy_log_record)
with pytest.raises(requests.exceptions.ConnectionError):
[future.result() for future in async_futures]
assert 'failed to emit log to external aggregator\nTraceback' in buff.getvalue()
# we should only log failures *periodically*, so causing *another*
# immediate failure shouldn't report a second ConnectionError
buff.truncate(0)
async_futures = handler.emit(dummy_log_record)
with pytest.raises(requests.exceptions.ConnectionError):
[future.result() for future in async_futures]
assert buff.getvalue() == ''
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
def test_https_logging_handler_emit(ok200_adapter, dummy_log_record, def test_https_logging_handler_emit(ok200_adapter, dummy_log_record,
message_type, async): message_type):
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
message_type=message_type, message_type=message_type,
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
async=async)
handler.setFormatter(LogstashFormatter()) handler.setFormatter(LogstashFormatter())
handler.session.mount('http://', ok200_adapter) handler.session.mount('http://', ok200_adapter)
async_futures = handler.emit(dummy_log_record) async_futures = handler.emit(dummy_log_record)
@@ -151,14 +184,12 @@ def test_https_logging_handler_emit(ok200_adapter, dummy_log_record,
assert body['message'] == 'User joe logged in' assert body['message'] == 'User joe logged in'
@pytest.mark.parametrize('async', (True, False))
def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter, def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter,
dummy_log_record, async): dummy_log_record):
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
username='user', password='pass', username='user', password='pass',
message_type='logstash', message_type='logstash',
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
async=async)
handler.setFormatter(LogstashFormatter()) handler.setFormatter(LogstashFormatter())
handler.session.mount('http://', ok200_adapter) handler.session.mount('http://', ok200_adapter)
async_futures = handler.emit(dummy_log_record) async_futures = handler.emit(dummy_log_record)
@@ -169,13 +200,11 @@ def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter,
assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass") assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass")
@pytest.mark.parametrize('async', (True, False))
def test_https_logging_handler_emit_splunk_with_creds(ok200_adapter, def test_https_logging_handler_emit_splunk_with_creds(ok200_adapter,
dummy_log_record, async): dummy_log_record):
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True, handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
password='pass', message_type='splunk', password='pass', message_type='splunk',
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'], enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
async=async)
handler.setFormatter(LogstashFormatter()) handler.setFormatter(LogstashFormatter())
handler.session.mount('http://', ok200_adapter) handler.session.mount('http://', ok200_adapter)
async_futures = handler.emit(dummy_log_record) async_futures = handler.emit(dummy_log_record)

View File

@@ -5,6 +5,8 @@
import logging import logging
import json import json
import requests import requests
import time
from concurrent.futures import ThreadPoolExecutor
from copy import copy from copy import copy
# loggly # loggly
@@ -18,6 +20,8 @@ from awx.main.utils.formatters import LogstashFormatter
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger'] __all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger']
logger = logging.getLogger('awx.main.utils.handlers')
# AWX external logging handler, generally designed to be used # AWX external logging handler, generally designed to be used
# with the accompanying LogstashHandler, derives from python-logstash library # with the accompanying LogstashHandler, derives from python-logstash library
# Non-blocking request accomplished by FuturesSession, similar # Non-blocking request accomplished by FuturesSession, similar
@@ -33,6 +37,7 @@ PARAM_NAMES = {
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS', 'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
'enabled_flag': 'LOG_AGGREGATOR_ENABLED', 'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
'http_timeout': 'LOG_AGGREGATOR_HTTP_TIMEOUT',
} }
@@ -47,17 +52,41 @@ class HTTPSNullHandler(logging.NullHandler):
return super(HTTPSNullHandler, self).__init__() return super(HTTPSNullHandler, self).__init__()
class VerboseThreadPoolExecutor(ThreadPoolExecutor):
last_log_emit = 0
def submit(self, func, *args, **kwargs):
def _wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
# If an exception occurs in a concurrent thread worker (like
# a ConnectionError or a read timeout), periodically log
# that failure.
#
# This approach isn't really thread-safe, so we could
# potentially log once per thread every 10 seconds, but it
# beats logging *every* failed HTTP request in a scenario where
# you've typo'd your log aggregator hostname.
now = time.time()
if now - self.last_log_emit > 10:
logger.exception('failed to emit log to external aggregator')
self.last_log_emit = now
raise
return super(VerboseThreadPoolExecutor, self).submit(_wrapped, *args,
**kwargs)
class BaseHTTPSHandler(logging.Handler): class BaseHTTPSHandler(logging.Handler):
def __init__(self, fqdn=False, **kwargs): def __init__(self, fqdn=False, **kwargs):
super(BaseHTTPSHandler, self).__init__() super(BaseHTTPSHandler, self).__init__()
self.fqdn = fqdn self.fqdn = fqdn
self.async = kwargs.get('async', True)
for fd in PARAM_NAMES: for fd in PARAM_NAMES:
setattr(self, fd, kwargs.get(fd, None)) setattr(self, fd, kwargs.get(fd, None))
if self.async: self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
self.session = FuturesSession() max_workers=2 # this is the default used by requests_futures
else: ))
self.session = requests.Session()
self.add_auth_information() self.add_auth_information()
@classmethod @classmethod
@@ -105,10 +134,8 @@ class BaseHTTPSHandler(logging.Handler):
payload_str = json.dumps(payload_input) payload_str = json.dumps(payload_input)
else: else:
payload_str = payload_input payload_str = payload_input
if self.async: return dict(data=payload_str, background_callback=unused_callback,
return dict(data=payload_str, background_callback=unused_callback) timeout=self.http_timeout)
else:
return dict(data=payload_str)
def skip_log(self, logger_name): def skip_log(self, logger_name):
if self.host == '' or (not self.enabled_flag): if self.host == '' or (not self.enabled_flag):
@@ -123,10 +150,6 @@ class BaseHTTPSHandler(logging.Handler):
Emit a log record. Returns a list of zero or more Emit a log record. Returns a list of zero or more
``concurrent.futures.Future`` objects. ``concurrent.futures.Future`` objects.
When ``self.async`` is True, the list will contain one
Future object for each HTTP request made. When ``self.async`` is
False, the list will be empty.
See: See:
https://docs.python.org/3/library/concurrent.futures.html#future-objects https://docs.python.org/3/library/concurrent.futures.html#future-objects
http://pythonhosted.org/futures/ http://pythonhosted.org/futures/
@@ -147,17 +170,10 @@ class BaseHTTPSHandler(logging.Handler):
for key in facts_dict: for key in facts_dict:
fact_payload = copy(payload_data) fact_payload = copy(payload_data)
fact_payload.update(facts_dict[key]) fact_payload.update(facts_dict[key])
if self.async: async_futures.append(self._send(fact_payload))
async_futures.append(self._send(fact_payload))
else:
self._send(fact_payload)
return async_futures return async_futures
if self.async: return [self._send(payload)]
return [self._send(payload)]
self._send(payload)
return []
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
raise raise
except: except:
@@ -179,7 +195,7 @@ def add_or_remove_logger(address, instance):
specific_logger.handlers.append(instance) specific_logger.handlers.append(instance)
def configure_external_logger(settings_module, async_flag=True, is_startup=True): def configure_external_logger(settings_module, is_startup=True):
is_enabled = settings_module.LOG_AGGREGATOR_ENABLED is_enabled = settings_module.LOG_AGGREGATOR_ENABLED
if is_startup and (not is_enabled): if is_startup and (not is_enabled):
@@ -188,7 +204,7 @@ def configure_external_logger(settings_module, async_flag=True, is_startup=True)
instance = None instance = None
if is_enabled: if is_enabled:
instance = BaseHTTPSHandler.from_django_settings(settings_module, async=async_flag) instance = BaseHTTPSHandler.from_django_settings(settings_module)
instance.setFormatter(LogstashFormatter(settings_module=settings_module)) instance.setFormatter(LogstashFormatter(settings_module=settings_module))
awx_logger_instance = instance awx_logger_instance = instance
if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS: if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS:

View File

@@ -867,6 +867,7 @@ INSIGHTS_URL_BASE = "https://access.redhat.com"
TOWER_SETTINGS_MANIFEST = {} TOWER_SETTINGS_MANIFEST = {}
LOG_AGGREGATOR_ENABLED = False LOG_AGGREGATOR_ENABLED = False
LOG_AGGREGATOR_HTTP_TIMEOUT = 5
# The number of retry attempts for websocket session establishment # The number of retry attempts for websocket session establishment
# If you're encountering issues establishing websockets in clustered Tower, # If you're encountering issues establishing websockets in clustered Tower,