mirror of
https://github.com/ansible/awx.git
synced 2026-05-09 02:17:37 -02:30
always use async http logging - even in celery workers
additionally, add a timeout to requests to avoid thread starvation see: #5718 see: #5729
This commit is contained in:
@@ -90,7 +90,7 @@ def celery_startup(conf=None, **kwargs):
|
|||||||
@worker_process_init.connect
|
@worker_process_init.connect
|
||||||
def task_set_logger_pre_run(*args, **kwargs):
|
def task_set_logger_pre_run(*args, **kwargs):
|
||||||
cache.close()
|
cache.close()
|
||||||
configure_external_logger(settings, async_flag=False, is_startup=False)
|
configure_external_logger(settings, is_startup=False)
|
||||||
|
|
||||||
|
|
||||||
def _clear_cache_keys(set_of_keys):
|
def _clear_cache_keys(set_of_keys):
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import base64
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
from django.conf import LazySettings
|
from django.conf import LazySettings
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
@@ -40,17 +41,16 @@ def ok200_adapter():
|
|||||||
return OK200Adapter()
|
return OK200Adapter()
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_requests_sync_implementation():
|
|
||||||
handler = HTTPSHandler(async=False)
|
|
||||||
assert not isinstance(handler.session, FuturesSession)
|
|
||||||
assert isinstance(handler.session, requests.Session)
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_requests_async_implementation():
|
def test_https_logging_handler_requests_async_implementation():
|
||||||
handler = HTTPSHandler(async=True)
|
handler = HTTPSHandler()
|
||||||
assert isinstance(handler.session, FuturesSession)
|
assert isinstance(handler.session, FuturesSession)
|
||||||
|
|
||||||
|
|
||||||
|
def test_https_logging_handler_has_default_http_timeout():
|
||||||
|
handler = HTTPSHandler.from_django_settings(settings)
|
||||||
|
assert handler.http_timeout == 5
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||||
def test_https_logging_handler_defaults(param):
|
def test_https_logging_handler_defaults(param):
|
||||||
handler = HTTPSHandler()
|
handler = HTTPSHandler()
|
||||||
@@ -114,18 +114,12 @@ def test_https_logging_handler_skip_log(params, logger_name, expected):
|
|||||||
assert handler.skip_log(logger_name) is expected
|
assert handler.skip_log(logger_name) is expected
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('message_type, async', [
|
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
||||||
('logstash', False),
|
|
||||||
('logstash', True),
|
|
||||||
('splunk', False),
|
|
||||||
('splunk', True),
|
|
||||||
])
|
|
||||||
def test_https_logging_handler_emit(ok200_adapter, dummy_log_record,
|
def test_https_logging_handler_emit(ok200_adapter, dummy_log_record,
|
||||||
message_type, async):
|
message_type):
|
||||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||||
message_type=message_type,
|
message_type=message_type,
|
||||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||||
async=async)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
handler.setFormatter(LogstashFormatter())
|
||||||
handler.session.mount('http://', ok200_adapter)
|
handler.session.mount('http://', ok200_adapter)
|
||||||
async_futures = handler.emit(dummy_log_record)
|
async_futures = handler.emit(dummy_log_record)
|
||||||
@@ -151,14 +145,12 @@ def test_https_logging_handler_emit(ok200_adapter, dummy_log_record,
|
|||||||
assert body['message'] == 'User joe logged in'
|
assert body['message'] == 'User joe logged in'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('async', (True, False))
|
|
||||||
def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter,
|
def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter,
|
||||||
dummy_log_record, async):
|
dummy_log_record):
|
||||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||||
username='user', password='pass',
|
username='user', password='pass',
|
||||||
message_type='logstash',
|
message_type='logstash',
|
||||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||||
async=async)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
handler.setFormatter(LogstashFormatter())
|
||||||
handler.session.mount('http://', ok200_adapter)
|
handler.session.mount('http://', ok200_adapter)
|
||||||
async_futures = handler.emit(dummy_log_record)
|
async_futures = handler.emit(dummy_log_record)
|
||||||
@@ -169,13 +161,11 @@ def test_https_logging_handler_emit_logstash_with_creds(ok200_adapter,
|
|||||||
assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass")
|
assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('async', (True, False))
|
|
||||||
def test_https_logging_handler_emit_splunk_with_creds(ok200_adapter,
|
def test_https_logging_handler_emit_splunk_with_creds(ok200_adapter,
|
||||||
dummy_log_record, async):
|
dummy_log_record):
|
||||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||||
password='pass', message_type='splunk',
|
password='pass', message_type='splunk',
|
||||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||||
async=async)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
handler.setFormatter(LogstashFormatter())
|
||||||
handler.session.mount('http://', ok200_adapter)
|
handler.session.mount('http://', ok200_adapter)
|
||||||
async_futures = handler.emit(dummy_log_record)
|
async_futures = handler.emit(dummy_log_record)
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ PARAM_NAMES = {
|
|||||||
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
|
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
|
||||||
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
|
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
|
||||||
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
|
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
|
||||||
|
'http_timeout': 'LOG_AGGREGATOR_HTTP_TIMEOUT',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -51,13 +52,9 @@ class BaseHTTPSHandler(logging.Handler):
|
|||||||
def __init__(self, fqdn=False, **kwargs):
|
def __init__(self, fqdn=False, **kwargs):
|
||||||
super(BaseHTTPSHandler, self).__init__()
|
super(BaseHTTPSHandler, self).__init__()
|
||||||
self.fqdn = fqdn
|
self.fqdn = fqdn
|
||||||
self.async = kwargs.get('async', True)
|
|
||||||
for fd in PARAM_NAMES:
|
for fd in PARAM_NAMES:
|
||||||
setattr(self, fd, kwargs.get(fd, None))
|
setattr(self, fd, kwargs.get(fd, None))
|
||||||
if self.async:
|
self.session = FuturesSession()
|
||||||
self.session = FuturesSession()
|
|
||||||
else:
|
|
||||||
self.session = requests.Session()
|
|
||||||
self.add_auth_information()
|
self.add_auth_information()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -105,10 +102,8 @@ class BaseHTTPSHandler(logging.Handler):
|
|||||||
payload_str = json.dumps(payload_input)
|
payload_str = json.dumps(payload_input)
|
||||||
else:
|
else:
|
||||||
payload_str = payload_input
|
payload_str = payload_input
|
||||||
if self.async:
|
return dict(data=payload_str, background_callback=unused_callback,
|
||||||
return dict(data=payload_str, background_callback=unused_callback)
|
timeout=self.http_timeout)
|
||||||
else:
|
|
||||||
return dict(data=payload_str)
|
|
||||||
|
|
||||||
def skip_log(self, logger_name):
|
def skip_log(self, logger_name):
|
||||||
if self.host == '' or (not self.enabled_flag):
|
if self.host == '' or (not self.enabled_flag):
|
||||||
@@ -123,10 +118,6 @@ class BaseHTTPSHandler(logging.Handler):
|
|||||||
Emit a log record. Returns a list of zero or more
|
Emit a log record. Returns a list of zero or more
|
||||||
``concurrent.futures.Future`` objects.
|
``concurrent.futures.Future`` objects.
|
||||||
|
|
||||||
When ``self.async`` is True, the list will contain one
|
|
||||||
Future object for each HTTP request made. When ``self.async`` is
|
|
||||||
False, the list will be empty.
|
|
||||||
|
|
||||||
See:
|
See:
|
||||||
https://docs.python.org/3/library/concurrent.futures.html#future-objects
|
https://docs.python.org/3/library/concurrent.futures.html#future-objects
|
||||||
http://pythonhosted.org/futures/
|
http://pythonhosted.org/futures/
|
||||||
@@ -147,17 +138,10 @@ class BaseHTTPSHandler(logging.Handler):
|
|||||||
for key in facts_dict:
|
for key in facts_dict:
|
||||||
fact_payload = copy(payload_data)
|
fact_payload = copy(payload_data)
|
||||||
fact_payload.update(facts_dict[key])
|
fact_payload.update(facts_dict[key])
|
||||||
if self.async:
|
async_futures.append(self._send(fact_payload))
|
||||||
async_futures.append(self._send(fact_payload))
|
|
||||||
else:
|
|
||||||
self._send(fact_payload)
|
|
||||||
return async_futures
|
return async_futures
|
||||||
|
|
||||||
if self.async:
|
return [self._send(payload)]
|
||||||
return [self._send(payload)]
|
|
||||||
|
|
||||||
self._send(payload)
|
|
||||||
return []
|
|
||||||
except (KeyboardInterrupt, SystemExit):
|
except (KeyboardInterrupt, SystemExit):
|
||||||
raise
|
raise
|
||||||
except:
|
except:
|
||||||
@@ -179,7 +163,7 @@ def add_or_remove_logger(address, instance):
|
|||||||
specific_logger.handlers.append(instance)
|
specific_logger.handlers.append(instance)
|
||||||
|
|
||||||
|
|
||||||
def configure_external_logger(settings_module, async_flag=True, is_startup=True):
|
def configure_external_logger(settings_module, is_startup=True):
|
||||||
|
|
||||||
is_enabled = settings_module.LOG_AGGREGATOR_ENABLED
|
is_enabled = settings_module.LOG_AGGREGATOR_ENABLED
|
||||||
if is_startup and (not is_enabled):
|
if is_startup and (not is_enabled):
|
||||||
@@ -188,7 +172,7 @@ def configure_external_logger(settings_module, async_flag=True, is_startup=True)
|
|||||||
|
|
||||||
instance = None
|
instance = None
|
||||||
if is_enabled:
|
if is_enabled:
|
||||||
instance = BaseHTTPSHandler.from_django_settings(settings_module, async=async_flag)
|
instance = BaseHTTPSHandler.from_django_settings(settings_module)
|
||||||
instance.setFormatter(LogstashFormatter(settings_module=settings_module))
|
instance.setFormatter(LogstashFormatter(settings_module=settings_module))
|
||||||
awx_logger_instance = instance
|
awx_logger_instance = instance
|
||||||
if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS:
|
if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS:
|
||||||
|
|||||||
@@ -867,6 +867,7 @@ INSIGHTS_URL_BASE = "https://access.redhat.com"
|
|||||||
TOWER_SETTINGS_MANIFEST = {}
|
TOWER_SETTINGS_MANIFEST = {}
|
||||||
|
|
||||||
LOG_AGGREGATOR_ENABLED = False
|
LOG_AGGREGATOR_ENABLED = False
|
||||||
|
LOG_AGGREGATOR_HTTP_TIMEOUT = 5
|
||||||
|
|
||||||
# The number of retry attempts for websocket session establishment
|
# The number of retry attempts for websocket session establishment
|
||||||
# If you're encountering issues establishing websockets in clustered Tower,
|
# If you're encountering issues establishing websockets in clustered Tower,
|
||||||
|
|||||||
Reference in New Issue
Block a user