mirror of
https://github.com/ansible/awx.git
synced 2026-03-10 14:09:28 -02:30
when async log shipping fails, log an explanation
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import base64
|
import base64
|
||||||
|
import cStringIO
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@@ -41,6 +42,17 @@ def ok200_adapter():
|
|||||||
return OK200Adapter()
|
return OK200Adapter()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def connection_error_adapter():
|
||||||
|
class ConnectionErrorAdapter(requests.adapters.HTTPAdapter):
|
||||||
|
|
||||||
|
def send(self, request, **kwargs):
|
||||||
|
err = requests.packages.urllib3.exceptions.SSLError()
|
||||||
|
raise requests.exceptions.ConnectionError(err, request=request)
|
||||||
|
|
||||||
|
return ConnectionErrorAdapter()
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_requests_async_implementation():
|
def test_https_logging_handler_requests_async_implementation():
|
||||||
handler = HTTPSHandler()
|
handler = HTTPSHandler()
|
||||||
assert isinstance(handler.session, FuturesSession)
|
assert isinstance(handler.session, FuturesSession)
|
||||||
@@ -114,6 +126,33 @@ def test_https_logging_handler_skip_log(params, logger_name, expected):
|
|||||||
assert handler.skip_log(logger_name) is expected
|
assert handler.skip_log(logger_name) is expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||||
|
dummy_log_record):
|
||||||
|
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||||
|
message_type='logstash',
|
||||||
|
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||||
|
handler.setFormatter(LogstashFormatter())
|
||||||
|
handler.session.mount('http://', connection_error_adapter)
|
||||||
|
|
||||||
|
buff = cStringIO.StringIO()
|
||||||
|
logging.getLogger('awx.main.utils.handlers').addHandler(
|
||||||
|
logging.StreamHandler(buff)
|
||||||
|
)
|
||||||
|
|
||||||
|
async_futures = handler.emit(dummy_log_record)
|
||||||
|
with pytest.raises(requests.exceptions.ConnectionError):
|
||||||
|
[future.result() for future in async_futures]
|
||||||
|
assert 'failed to emit log to external aggregator\nTraceback' in buff.getvalue()
|
||||||
|
|
||||||
|
# we should only log failures *periodically*, so causing *another*
|
||||||
|
# immediate failure shouldn't report a second ConnectionError
|
||||||
|
buff.truncate(0)
|
||||||
|
async_futures = handler.emit(dummy_log_record)
|
||||||
|
with pytest.raises(requests.exceptions.ConnectionError):
|
||||||
|
[future.result() for future in async_futures]
|
||||||
|
assert buff.getvalue() == ''
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
||||||
def test_https_logging_handler_emit(ok200_adapter, dummy_log_record,
|
def test_https_logging_handler_emit(ok200_adapter, dummy_log_record,
|
||||||
message_type):
|
message_type):
|
||||||
|
|||||||
@@ -5,6 +5,8 @@
|
|||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import requests
|
import requests
|
||||||
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from copy import copy
|
from copy import copy
|
||||||
|
|
||||||
# loggly
|
# loggly
|
||||||
@@ -18,6 +20,8 @@ from awx.main.utils.formatters import LogstashFormatter
|
|||||||
|
|
||||||
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger']
|
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'configure_external_logger']
|
||||||
|
|
||||||
|
logger = logging.getLogger('awx.main.utils.handlers')
|
||||||
|
|
||||||
# AWX external logging handler, generally designed to be used
|
# AWX external logging handler, generally designed to be used
|
||||||
# with the accompanying LogstashHandler, derives from python-logstash library
|
# with the accompanying LogstashHandler, derives from python-logstash library
|
||||||
# Non-blocking request accomplished by FuturesSession, similar
|
# Non-blocking request accomplished by FuturesSession, similar
|
||||||
@@ -48,13 +52,41 @@ class HTTPSNullHandler(logging.NullHandler):
|
|||||||
return super(HTTPSNullHandler, self).__init__()
|
return super(HTTPSNullHandler, self).__init__()
|
||||||
|
|
||||||
|
|
||||||
|
class VerboseThreadPoolExecutor(ThreadPoolExecutor):
|
||||||
|
|
||||||
|
last_log_emit = 0
|
||||||
|
|
||||||
|
def submit(self, func, *args, **kwargs):
|
||||||
|
def _wrapped(*args, **kwargs):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except Exception:
|
||||||
|
# If an exception occurs in a concurrent thread worker (like
|
||||||
|
# a ConnectionError or a read timeout), periodically log
|
||||||
|
# that failure.
|
||||||
|
#
|
||||||
|
# This approach isn't really thread-safe, so we could
|
||||||
|
# potentially log once per thread every 10 seconds, but it
|
||||||
|
# beats logging *every* failed HTTP request in a scenario where
|
||||||
|
# you've typo'd your log aggregator hostname.
|
||||||
|
now = time.time()
|
||||||
|
if now - self.last_log_emit > 10:
|
||||||
|
logger.exception('failed to emit log to external aggregator')
|
||||||
|
self.last_log_emit = now
|
||||||
|
raise
|
||||||
|
return super(VerboseThreadPoolExecutor, self).submit(_wrapped, *args,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class BaseHTTPSHandler(logging.Handler):
|
class BaseHTTPSHandler(logging.Handler):
|
||||||
def __init__(self, fqdn=False, **kwargs):
|
def __init__(self, fqdn=False, **kwargs):
|
||||||
super(BaseHTTPSHandler, self).__init__()
|
super(BaseHTTPSHandler, self).__init__()
|
||||||
self.fqdn = fqdn
|
self.fqdn = fqdn
|
||||||
for fd in PARAM_NAMES:
|
for fd in PARAM_NAMES:
|
||||||
setattr(self, fd, kwargs.get(fd, None))
|
setattr(self, fd, kwargs.get(fd, None))
|
||||||
self.session = FuturesSession()
|
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
|
||||||
|
max_workers=2 # this is the default used by requests_futures
|
||||||
|
))
|
||||||
self.add_auth_information()
|
self.add_auth_information()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
Reference in New Issue
Block a user