mirror of
https://github.com/ansible/awx.git
synced 2026-05-24 09:07:45 -02:30
POC: replace our external log aggregation feature with rsyslog
- this change adds rsyslog (https://github.com/rsyslog/rsyslog) as a new service that runs on every AWX node (managed by supervisord) in particular, this feature requires a recent version (v8.38+) of rsyslog that supports the omhttp module (https://github.com/rsyslog/rsyslog-doc/pull/750) - the "external_logger" handler in AWX is now a SysLogHandler that ships logs to the local UDP port where rsyslog is configured to listen (by default, 51414) - every time a LOG_AGGREGATOR_* setting is changed, every AWX node reconfigures and restarts its local instance of rsyslog so that its fowarding settings match what has been configured in AWX - unlike the prior implementation, if the external logging aggregator (splunk/logstash) goes temporarily offline, rsyslog will retain the messages and ship them when the log aggregator is back online - 4xx or 5xx level errors are recorded at /var/log/tower/external.err
This commit is contained in:
committed by
Christian Adams
parent
eafb751ecc
commit
589d27c88c
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
# Python
|
# Python
|
||||||
import collections
|
import collections
|
||||||
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
@@ -26,7 +27,6 @@ from awx.api.generics import (
|
|||||||
from awx.api.permissions import IsSuperUser
|
from awx.api.permissions import IsSuperUser
|
||||||
from awx.api.versioning import reverse
|
from awx.api.versioning import reverse
|
||||||
from awx.main.utils import camelcase_to_underscore
|
from awx.main.utils import camelcase_to_underscore
|
||||||
from awx.main.utils.handlers import AWXProxyHandler, LoggingConnectivityException
|
|
||||||
from awx.main.tasks import handle_setting_changes
|
from awx.main.tasks import handle_setting_changes
|
||||||
from awx.conf.models import Setting
|
from awx.conf.models import Setting
|
||||||
from awx.conf.serializers import SettingCategorySerializer, SettingSingletonSerializer
|
from awx.conf.serializers import SettingCategorySerializer, SettingSingletonSerializer
|
||||||
@@ -161,40 +161,8 @@ class SettingLoggingTest(GenericAPIView):
|
|||||||
filter_backends = []
|
filter_backends = []
|
||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
defaults = dict()
|
logging.getLogger('awx').info('AWX Connection Test')
|
||||||
for key in settings_registry.get_registered_settings(category_slug='logging'):
|
return Response(status=status.HTTP_202_ACCEPTED)
|
||||||
try:
|
|
||||||
defaults[key] = settings_registry.get_setting_field(key).get_default()
|
|
||||||
except serializers.SkipField:
|
|
||||||
defaults[key] = None
|
|
||||||
obj = type('Settings', (object,), defaults)()
|
|
||||||
serializer = self.get_serializer(obj, data=request.data)
|
|
||||||
serializer.is_valid(raise_exception=True)
|
|
||||||
# Special validation specific to logging test.
|
|
||||||
errors = {}
|
|
||||||
for key in ['LOG_AGGREGATOR_TYPE', 'LOG_AGGREGATOR_HOST']:
|
|
||||||
if not request.data.get(key, ''):
|
|
||||||
errors[key] = 'This field is required.'
|
|
||||||
if errors:
|
|
||||||
raise ValidationError(errors)
|
|
||||||
|
|
||||||
if request.data.get('LOG_AGGREGATOR_PASSWORD', '').startswith('$encrypted$'):
|
|
||||||
serializer.validated_data['LOG_AGGREGATOR_PASSWORD'] = getattr(
|
|
||||||
settings, 'LOG_AGGREGATOR_PASSWORD', ''
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
class MockSettings:
|
|
||||||
pass
|
|
||||||
mock_settings = MockSettings()
|
|
||||||
for k, v in serializer.validated_data.items():
|
|
||||||
setattr(mock_settings, k, v)
|
|
||||||
AWXProxyHandler().perform_test(custom_settings=mock_settings)
|
|
||||||
if mock_settings.LOG_AGGREGATOR_PROTOCOL.upper() == 'UDP':
|
|
||||||
return Response(status=status.HTTP_201_CREATED)
|
|
||||||
except LoggingConnectivityException as e:
|
|
||||||
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
||||||
return Response(status=status.HTTP_200_OK)
|
|
||||||
|
|
||||||
|
|
||||||
# Create view functions for all of the class-based views to simplify inclusion
|
# Create view functions for all of the class-based views to simplify inclusion
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ from django.core.cache import cache as django_cache
|
|||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from django.db import connection as django_connection
|
from django.db import connection as django_connection
|
||||||
|
|
||||||
from awx.main.utils.handlers import AWXProxyHandler
|
|
||||||
from awx.main.dispatch import get_local_queuename, reaper
|
from awx.main.dispatch import get_local_queuename, reaper
|
||||||
from awx.main.dispatch.control import Control
|
from awx.main.dispatch.control import Control
|
||||||
from awx.main.dispatch.pool import AutoscalePool
|
from awx.main.dispatch.pool import AutoscalePool
|
||||||
@@ -56,11 +55,6 @@ class Command(BaseCommand):
|
|||||||
reaper.reap()
|
reaper.reap()
|
||||||
consumer = None
|
consumer = None
|
||||||
|
|
||||||
# don't ship external logs inside the dispatcher's parent process
|
|
||||||
# this exists to work around a race condition + deadlock bug on fork
|
|
||||||
# in cpython itself:
|
|
||||||
# https://bugs.python.org/issue37429
|
|
||||||
AWXProxyHandler.disable()
|
|
||||||
try:
|
try:
|
||||||
queues = ['tower_broadcast_all', get_local_queuename()]
|
queues = ['tower_broadcast_all', get_local_queuename()]
|
||||||
consumer = AWXConsumerPG(
|
consumer = AWXConsumerPG(
|
||||||
|
|||||||
@@ -72,7 +72,8 @@ from awx.main.utils import (get_ssh_version, update_scm_url,
|
|||||||
ignore_inventory_group_removal, extract_ansible_vars, schedule_task_manager,
|
ignore_inventory_group_removal, extract_ansible_vars, schedule_task_manager,
|
||||||
get_awx_version)
|
get_awx_version)
|
||||||
from awx.main.utils.ansible import read_ansible_config
|
from awx.main.utils.ansible import read_ansible_config
|
||||||
from awx.main.utils.common import _get_ansible_version, get_custom_venv_choices
|
from awx.main.utils.common import get_ansible_version, _get_ansible_version, get_custom_venv_choices
|
||||||
|
from awx.main.utils.external_logging import reconfigure_rsyslog
|
||||||
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
||||||
from awx.main.utils.reload import stop_local_services
|
from awx.main.utils.reload import stop_local_services
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
@@ -280,6 +281,12 @@ def handle_setting_changes(setting_keys):
|
|||||||
logger.debug('cache delete_many(%r)', cache_keys)
|
logger.debug('cache delete_many(%r)', cache_keys)
|
||||||
cache.delete_many(cache_keys)
|
cache.delete_many(cache_keys)
|
||||||
|
|
||||||
|
if any([
|
||||||
|
setting.startswith('LOG_AGGREGATOR')
|
||||||
|
for setting in setting_keys
|
||||||
|
]):
|
||||||
|
reconfigure_rsyslog()
|
||||||
|
|
||||||
|
|
||||||
@task(queue='tower_broadcast_all')
|
@task(queue='tower_broadcast_all')
|
||||||
def delete_project_files(project_path):
|
def delete_project_files(project_path):
|
||||||
|
|||||||
@@ -1,393 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
import base64
|
|
||||||
import logging
|
|
||||||
import socket
|
|
||||||
import datetime
|
|
||||||
from dateutil.tz import tzutc
|
|
||||||
from io import StringIO
|
|
||||||
from uuid import uuid4
|
|
||||||
|
|
||||||
from unittest import mock
|
|
||||||
|
|
||||||
from django.conf import LazySettings
|
|
||||||
from django.utils.encoding import smart_str
|
|
||||||
import pytest
|
|
||||||
import requests
|
|
||||||
from requests_futures.sessions import FuturesSession
|
|
||||||
|
|
||||||
from awx.main.utils.handlers import (BaseHandler, BaseHTTPSHandler as HTTPSHandler,
|
|
||||||
TCPHandler, UDPHandler, _encode_payload_for_socket,
|
|
||||||
PARAM_NAMES, LoggingConnectivityException,
|
|
||||||
AWXProxyHandler)
|
|
||||||
from awx.main.utils.formatters import LogstashFormatter
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def https_adapter():
|
|
||||||
class FakeHTTPSAdapter(requests.adapters.HTTPAdapter):
|
|
||||||
requests = []
|
|
||||||
status = 200
|
|
||||||
reason = None
|
|
||||||
|
|
||||||
def send(self, request, **kwargs):
|
|
||||||
self.requests.append(request)
|
|
||||||
resp = requests.models.Response()
|
|
||||||
resp.status_code = self.status
|
|
||||||
resp.reason = self.reason
|
|
||||||
resp.request = request
|
|
||||||
return resp
|
|
||||||
|
|
||||||
return FakeHTTPSAdapter()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def connection_error_adapter():
|
|
||||||
class ConnectionErrorAdapter(requests.adapters.HTTPAdapter):
|
|
||||||
|
|
||||||
def send(self, request, **kwargs):
|
|
||||||
err = requests.packages.urllib3.exceptions.SSLError()
|
|
||||||
raise requests.exceptions.ConnectionError(err, request=request)
|
|
||||||
|
|
||||||
return ConnectionErrorAdapter()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def fake_socket(tmpdir_factory, request):
|
|
||||||
sok = socket.socket
|
|
||||||
sok.send = mock.MagicMock()
|
|
||||||
sok.connect = mock.MagicMock()
|
|
||||||
sok.setblocking = mock.MagicMock()
|
|
||||||
sok.close = mock.MagicMock()
|
|
||||||
return sok
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_requests_async_implementation():
|
|
||||||
handler = HTTPSHandler()
|
|
||||||
assert isinstance(handler.session, FuturesSession)
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_has_default_http_timeout():
|
|
||||||
handler = TCPHandler()
|
|
||||||
assert handler.tcp_timeout == 5
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('param', ['host', 'port', 'indv_facts'])
|
|
||||||
def test_base_logging_handler_defaults(param):
|
|
||||||
handler = BaseHandler()
|
|
||||||
assert hasattr(handler, param) and getattr(handler, param) is None
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('param', ['host', 'port', 'indv_facts'])
|
|
||||||
def test_base_logging_handler_kwargs(param):
|
|
||||||
handler = BaseHandler(**{param: 'EXAMPLE'})
|
|
||||||
assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE'
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('params', [
|
|
||||||
{
|
|
||||||
'LOG_AGGREGATOR_HOST': 'https://server.invalid',
|
|
||||||
'LOG_AGGREGATOR_PORT': 22222,
|
|
||||||
'LOG_AGGREGATOR_TYPE': 'loggly',
|
|
||||||
'LOG_AGGREGATOR_USERNAME': 'foo',
|
|
||||||
'LOG_AGGREGATOR_PASSWORD': 'bar',
|
|
||||||
'LOG_AGGREGATOR_INDIVIDUAL_FACTS': True,
|
|
||||||
'LOG_AGGREGATOR_TCP_TIMEOUT': 96,
|
|
||||||
'LOG_AGGREGATOR_VERIFY_CERT': False,
|
|
||||||
'LOG_AGGREGATOR_PROTOCOL': 'https'
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'LOG_AGGREGATOR_HOST': 'https://server.invalid',
|
|
||||||
'LOG_AGGREGATOR_PORT': 22222,
|
|
||||||
'LOG_AGGREGATOR_PROTOCOL': 'udp'
|
|
||||||
}
|
|
||||||
])
|
|
||||||
def test_real_handler_from_django_settings(params):
|
|
||||||
settings = LazySettings()
|
|
||||||
settings.configure(**params)
|
|
||||||
handler = AWXProxyHandler().get_handler(custom_settings=settings)
|
|
||||||
# need the _reverse_ dictionary from PARAM_NAMES
|
|
||||||
attr_lookup = {}
|
|
||||||
for attr_name, setting_name in PARAM_NAMES.items():
|
|
||||||
attr_lookup[setting_name] = attr_name
|
|
||||||
for setting_name, val in params.items():
|
|
||||||
attr_name = attr_lookup[setting_name]
|
|
||||||
if attr_name == 'protocol':
|
|
||||||
continue
|
|
||||||
assert hasattr(handler, attr_name)
|
|
||||||
|
|
||||||
|
|
||||||
def test_invalid_kwarg_to_real_handler():
|
|
||||||
settings = LazySettings()
|
|
||||||
settings.configure(**{
|
|
||||||
'LOG_AGGREGATOR_HOST': 'https://server.invalid',
|
|
||||||
'LOG_AGGREGATOR_PORT': 22222,
|
|
||||||
'LOG_AGGREGATOR_PROTOCOL': 'udp',
|
|
||||||
'LOG_AGGREGATOR_VERIFY_CERT': False # setting not valid for UDP handler
|
|
||||||
})
|
|
||||||
handler = AWXProxyHandler().get_handler(custom_settings=settings)
|
|
||||||
assert not hasattr(handler, 'verify_cert')
|
|
||||||
|
|
||||||
|
|
||||||
def test_protocol_not_specified():
|
|
||||||
settings = LazySettings()
|
|
||||||
settings.configure(**{
|
|
||||||
'LOG_AGGREGATOR_HOST': 'https://server.invalid',
|
|
||||||
'LOG_AGGREGATOR_PORT': 22222,
|
|
||||||
'LOG_AGGREGATOR_PROTOCOL': None # awx/settings/defaults.py
|
|
||||||
})
|
|
||||||
handler = AWXProxyHandler().get_handler(custom_settings=settings)
|
|
||||||
assert isinstance(handler, logging.NullHandler)
|
|
||||||
|
|
||||||
|
|
||||||
def test_base_logging_handler_emit_system_tracking(dummy_log_record):
|
|
||||||
handler = BaseHandler(host='127.0.0.1', indv_facts=True)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
dummy_log_record.name = 'awx.analytics.system_tracking'
|
|
||||||
dummy_log_record.msg = None
|
|
||||||
dummy_log_record.inventory_id = 11
|
|
||||||
dummy_log_record.host_name = 'my_lucky_host'
|
|
||||||
dummy_log_record.job_id = 777
|
|
||||||
dummy_log_record.ansible_facts = {
|
|
||||||
"ansible_kernel": "4.4.66-boot2docker",
|
|
||||||
"ansible_machine": "x86_64",
|
|
||||||
"ansible_swapfree_mb": 4663,
|
|
||||||
}
|
|
||||||
dummy_log_record.ansible_facts_modified = datetime.datetime.now(tzutc()).isoformat()
|
|
||||||
sent_payloads = handler.emit(dummy_log_record)
|
|
||||||
|
|
||||||
assert len(sent_payloads) == 1
|
|
||||||
assert sent_payloads[0]['ansible_facts'] == dummy_log_record.ansible_facts
|
|
||||||
assert sent_payloads[0]['ansible_facts_modified'] == dummy_log_record.ansible_facts_modified
|
|
||||||
assert sent_payloads[0]['level'] == 'INFO'
|
|
||||||
assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking'
|
|
||||||
assert sent_payloads[0]['job_id'] == dummy_log_record.job_id
|
|
||||||
assert sent_payloads[0]['inventory_id'] == dummy_log_record.inventory_id
|
|
||||||
assert sent_payloads[0]['host_name'] == dummy_log_record.host_name
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('host, port, normalized, hostname_only', [
|
|
||||||
('http://localhost', None, 'http://localhost', False),
|
|
||||||
('http://localhost', 8080, 'http://localhost:8080', False),
|
|
||||||
('https://localhost', 443, 'https://localhost:443', False),
|
|
||||||
('ftp://localhost', 443, 'ftp://localhost:443', False),
|
|
||||||
('https://localhost:550', 443, 'https://localhost:550', False),
|
|
||||||
('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar', False),
|
|
||||||
('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar', False),
|
|
||||||
('http://splunk.server:8088/services/collector/event', 80,
|
|
||||||
'http://splunk.server:8088/services/collector/event', False),
|
|
||||||
('http://splunk.server/services/collector/event', 8088,
|
|
||||||
'http://splunk.server:8088/services/collector/event', False),
|
|
||||||
('splunk.server:8088/services/collector/event', 80,
|
|
||||||
'http://splunk.server:8088/services/collector/event', False),
|
|
||||||
('splunk.server/services/collector/event', 8088,
|
|
||||||
'http://splunk.server:8088/services/collector/event', False),
|
|
||||||
('localhost', None, 'http://localhost', False),
|
|
||||||
('localhost', 8080, 'http://localhost:8080', False),
|
|
||||||
('localhost', 4399, 'localhost', True),
|
|
||||||
('tcp://localhost:4399/foo/bar', 4399, 'localhost', True),
|
|
||||||
])
|
|
||||||
def test_base_logging_handler_host_format(host, port, normalized, hostname_only):
|
|
||||||
handler = BaseHandler(host=host, port=port)
|
|
||||||
assert handler._get_host(scheme='http', hostname_only=hostname_only) == normalized
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'status, reason, exc',
|
|
||||||
[(200, '200 OK', None), (404, 'Not Found', LoggingConnectivityException)]
|
|
||||||
)
|
|
||||||
@pytest.mark.parametrize('protocol', ['http', 'https', None])
|
|
||||||
def test_https_logging_handler_connectivity_test(https_adapter, status, reason, exc, protocol):
|
|
||||||
host = 'example.org'
|
|
||||||
if protocol:
|
|
||||||
host = '://'.join([protocol, host])
|
|
||||||
https_adapter.status = status
|
|
||||||
https_adapter.reason = reason
|
|
||||||
settings = LazySettings()
|
|
||||||
settings.configure(**{
|
|
||||||
'LOG_AGGREGATOR_HOST': host,
|
|
||||||
'LOG_AGGREGATOR_PORT': 8080,
|
|
||||||
'LOG_AGGREGATOR_TYPE': 'logstash',
|
|
||||||
'LOG_AGGREGATOR_USERNAME': 'user',
|
|
||||||
'LOG_AGGREGATOR_PASSWORD': 'password',
|
|
||||||
'LOG_AGGREGATOR_LOGGERS': ['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
|
||||||
'LOG_AGGREGATOR_PROTOCOL': 'https',
|
|
||||||
'CLUSTER_HOST_ID': '',
|
|
||||||
'LOG_AGGREGATOR_TOWER_UUID': str(uuid4()),
|
|
||||||
'LOG_AGGREGATOR_LEVEL': 'DEBUG',
|
|
||||||
})
|
|
||||||
|
|
||||||
class FakeHTTPSHandler(HTTPSHandler):
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(FakeHTTPSHandler, self).__init__(*args, **kwargs)
|
|
||||||
self.session.mount('{}://'.format(protocol or 'https'), https_adapter)
|
|
||||||
|
|
||||||
def emit(self, record):
|
|
||||||
return super(FakeHTTPSHandler, self).emit(record)
|
|
||||||
|
|
||||||
with mock.patch.object(AWXProxyHandler, 'get_handler_class') as mock_get_class:
|
|
||||||
mock_get_class.return_value = FakeHTTPSHandler
|
|
||||||
if exc:
|
|
||||||
with pytest.raises(exc) as e:
|
|
||||||
AWXProxyHandler().perform_test(settings)
|
|
||||||
assert str(e).endswith('%s: %s' % (status, reason))
|
|
||||||
else:
|
|
||||||
assert AWXProxyHandler().perform_test(settings) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_logstash_auth_info():
|
|
||||||
handler = HTTPSHandler(message_type='logstash', username='bob', password='ansible')
|
|
||||||
handler._add_auth_information()
|
|
||||||
assert isinstance(handler.session.auth, requests.auth.HTTPBasicAuth)
|
|
||||||
assert handler.session.auth.username == 'bob'
|
|
||||||
assert handler.session.auth.password == 'ansible'
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_splunk_auth_info():
|
|
||||||
handler = HTTPSHandler(message_type='splunk', password='ansible')
|
|
||||||
handler._add_auth_information()
|
|
||||||
assert handler.session.headers['Authorization'] == 'Splunk ansible'
|
|
||||||
assert handler.session.headers['Content-Type'] == 'application/json'
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_connection_error(connection_error_adapter,
|
|
||||||
dummy_log_record):
|
|
||||||
handler = HTTPSHandler(host='127.0.0.1', message_type='logstash')
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
handler.session.mount('http://', connection_error_adapter)
|
|
||||||
|
|
||||||
buff = StringIO()
|
|
||||||
logging.getLogger('awx.main.utils.handlers').addHandler(
|
|
||||||
logging.StreamHandler(buff)
|
|
||||||
)
|
|
||||||
|
|
||||||
async_futures = handler.emit(dummy_log_record)
|
|
||||||
with pytest.raises(requests.exceptions.ConnectionError):
|
|
||||||
[future.result() for future in async_futures]
|
|
||||||
assert 'failed to emit log to external aggregator\nTraceback' in buff.getvalue()
|
|
||||||
|
|
||||||
# we should only log failures *periodically*, so causing *another*
|
|
||||||
# immediate failure shouldn't report a second ConnectionError
|
|
||||||
buff.truncate(0)
|
|
||||||
async_futures = handler.emit(dummy_log_record)
|
|
||||||
with pytest.raises(requests.exceptions.ConnectionError):
|
|
||||||
[future.result() for future in async_futures]
|
|
||||||
assert buff.getvalue() == ''
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
|
||||||
def test_https_logging_handler_emit_without_cred(https_adapter, dummy_log_record,
|
|
||||||
message_type):
|
|
||||||
handler = HTTPSHandler(host='127.0.0.1', message_type=message_type)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
handler.session.mount('https://', https_adapter)
|
|
||||||
async_futures = handler.emit(dummy_log_record)
|
|
||||||
[future.result() for future in async_futures]
|
|
||||||
|
|
||||||
assert len(https_adapter.requests) == 1
|
|
||||||
request = https_adapter.requests[0]
|
|
||||||
assert request.url == 'https://127.0.0.1/'
|
|
||||||
assert request.method == 'POST'
|
|
||||||
|
|
||||||
if message_type == 'logstash':
|
|
||||||
# A username + password weren't used, so this header should be missing
|
|
||||||
assert 'Authorization' not in request.headers
|
|
||||||
|
|
||||||
if message_type == 'splunk':
|
|
||||||
assert request.headers['Authorization'] == 'Splunk None'
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_emit_logstash_with_creds(https_adapter,
|
|
||||||
dummy_log_record):
|
|
||||||
handler = HTTPSHandler(host='127.0.0.1',
|
|
||||||
username='user', password='pass',
|
|
||||||
message_type='logstash')
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
handler.session.mount('https://', https_adapter)
|
|
||||||
async_futures = handler.emit(dummy_log_record)
|
|
||||||
[future.result() for future in async_futures]
|
|
||||||
|
|
||||||
assert len(https_adapter.requests) == 1
|
|
||||||
request = https_adapter.requests[0]
|
|
||||||
assert request.headers['Authorization'] == 'Basic %s' % smart_str(base64.b64encode(b"user:pass"))
|
|
||||||
|
|
||||||
|
|
||||||
def test_https_logging_handler_emit_splunk_with_creds(https_adapter,
|
|
||||||
dummy_log_record):
|
|
||||||
handler = HTTPSHandler(host='127.0.0.1',
|
|
||||||
password='pass', message_type='splunk')
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
handler.session.mount('https://', https_adapter)
|
|
||||||
async_futures = handler.emit(dummy_log_record)
|
|
||||||
[future.result() for future in async_futures]
|
|
||||||
|
|
||||||
assert len(https_adapter.requests) == 1
|
|
||||||
request = https_adapter.requests[0]
|
|
||||||
assert request.headers['Authorization'] == 'Splunk pass'
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('payload, encoded_payload', [
|
|
||||||
('foobar', 'foobar'),
|
|
||||||
({'foo': 'bar'}, '{"foo": "bar"}'),
|
|
||||||
({u'测试键': u'测试值'}, '{"测试键": "测试值"}'),
|
|
||||||
])
|
|
||||||
def test_encode_payload_for_socket(payload, encoded_payload):
|
|
||||||
assert _encode_payload_for_socket(payload).decode('utf-8') == encoded_payload
|
|
||||||
|
|
||||||
|
|
||||||
def test_udp_handler_create_socket_at_init():
|
|
||||||
handler = UDPHandler(host='127.0.0.1', port=4399)
|
|
||||||
assert hasattr(handler, 'socket')
|
|
||||||
assert isinstance(handler.socket, socket.socket)
|
|
||||||
assert handler.socket.family == socket.AF_INET
|
|
||||||
assert handler.socket.type == socket.SOCK_DGRAM
|
|
||||||
|
|
||||||
|
|
||||||
def test_udp_handler_send(dummy_log_record):
|
|
||||||
handler = UDPHandler(host='127.0.0.1', port=4399)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
with mock.patch('awx.main.utils.handlers._encode_payload_for_socket', return_value="des") as encode_mock,\
|
|
||||||
mock.patch.object(handler, 'socket') as socket_mock:
|
|
||||||
handler.emit(dummy_log_record)
|
|
||||||
encode_mock.assert_called_once_with(handler.format(dummy_log_record))
|
|
||||||
socket_mock.sendto.assert_called_once_with("des", ('127.0.0.1', 4399))
|
|
||||||
|
|
||||||
|
|
||||||
def test_tcp_handler_send(fake_socket, dummy_log_record):
|
|
||||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
|
||||||
mock.patch('select.select', return_value=([], [fake_socket], [])):
|
|
||||||
handler.emit(dummy_log_record)
|
|
||||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399))
|
|
||||||
fake_socket.setblocking.assert_called_once_with(0)
|
|
||||||
fake_socket.send.assert_called_once_with(handler.format(dummy_log_record))
|
|
||||||
fake_socket.close.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_tcp_handler_return_if_socket_unavailable(fake_socket, dummy_log_record):
|
|
||||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
|
||||||
mock.patch('select.select', return_value=([], [], [])):
|
|
||||||
handler.emit(dummy_log_record)
|
|
||||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399))
|
|
||||||
fake_socket.setblocking.assert_called_once_with(0)
|
|
||||||
assert not fake_socket.send.called
|
|
||||||
fake_socket.close.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_tcp_handler_log_exception(fake_socket, dummy_log_record):
|
|
||||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
|
||||||
mock.patch('select.select', return_value=([], [], [])),\
|
|
||||||
mock.patch('awx.main.utils.handlers.logger') as logger_mock:
|
|
||||||
fake_socket.connect.side_effect = Exception("foo")
|
|
||||||
handler.emit(dummy_log_record)
|
|
||||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
logger_mock.exception.assert_called_once()
|
|
||||||
fake_socket.close.assert_called_once()
|
|
||||||
assert not fake_socket.send.called
|
|
||||||
65
awx/main/utils/external_logging.py
Normal file
65
awx/main/utils/external_logging.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
import urllib.parse as urlparse
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
from awx.main.utils.reload import supervisor_service_command
|
||||||
|
|
||||||
|
|
||||||
|
def reconfigure_rsyslog():
|
||||||
|
tmpl = ''
|
||||||
|
if settings.LOG_AGGREGATOR_ENABLED:
|
||||||
|
host = getattr(settings, 'LOG_AGGREGATOR_HOST', '')
|
||||||
|
port = getattr(settings, 'LOG_AGGREGATOR_PORT', '')
|
||||||
|
protocol = getattr(settings, 'LOG_AGGREGATOR_PROTOCOL', '')
|
||||||
|
|
||||||
|
if protocol.startswith('http'):
|
||||||
|
scheme = 'https'
|
||||||
|
# urlparse requires '//' to be provided if scheme is not specified
|
||||||
|
original_parsed = urlparse.urlsplit(host)
|
||||||
|
if (not original_parsed.scheme and not host.startswith('//')) or original_parsed.hostname is None:
|
||||||
|
host = '%s://%s' % (scheme, host) if scheme else '//%s' % host
|
||||||
|
parsed = urlparse.urlsplit(host)
|
||||||
|
|
||||||
|
host = parsed.hostname
|
||||||
|
try:
|
||||||
|
port = parsed.port or settings.LOG_AGGREGATOR_PORT
|
||||||
|
except ValueError:
|
||||||
|
port = settings.LOG_AGGREGATOR_PORT
|
||||||
|
|
||||||
|
parts = []
|
||||||
|
parts.extend([
|
||||||
|
'$ModLoad imudp',
|
||||||
|
'$UDPServerRun 51414',
|
||||||
|
'template(name="awx" type="string" string="%msg%")',
|
||||||
|
])
|
||||||
|
if protocol.startswith('http'):
|
||||||
|
# https://github.com/rsyslog/rsyslog-doc/blob/master/source/configuration/modules/omhttp.rst
|
||||||
|
ssl = "on" if parsed.scheme == 'https' else "off"
|
||||||
|
skip_verify = "off" if settings.LOG_AGGREGATOR_VERIFY_CERT else "on"
|
||||||
|
params = [
|
||||||
|
'type="omhttp"',
|
||||||
|
f'server="{host}"',
|
||||||
|
f'serverport="{port}"',
|
||||||
|
f'usehttps="{ssl}"',
|
||||||
|
f'skipverifyhost="{skip_verify}"',
|
||||||
|
'action.resumeRetryCount="-1"',
|
||||||
|
'template="awx"',
|
||||||
|
'errorfile="/var/log/tower/external.err"',
|
||||||
|
]
|
||||||
|
username = getattr(settings, 'LOG_AGGREGATOR_USERNAME', '')
|
||||||
|
password = getattr(settings, 'LOG_AGGREGATOR_PASSWORD', '')
|
||||||
|
if username:
|
||||||
|
params.append(f'uid="{username}"')
|
||||||
|
if password:
|
||||||
|
params.append(f'pwd="{password}"')
|
||||||
|
params = ' '.join(params)
|
||||||
|
parts.extend(['module(load="omhttp")', f'action({params})'])
|
||||||
|
else:
|
||||||
|
parts.append(
|
||||||
|
f'action(type="omfwd" target="{host}" port="{port}" protocol="{protocol}" action.resumeRetryCount="-1" template="awx")' # noqa
|
||||||
|
)
|
||||||
|
tmpl = '\n'.join(parts)
|
||||||
|
|
||||||
|
with open('/var/lib/awx/rsyslog.conf', 'w') as f:
|
||||||
|
f.write(tmpl + '\n')
|
||||||
|
supervisor_service_command(command='restart', service='awx-rsyslogd')
|
||||||
@@ -3,406 +3,10 @@
|
|||||||
|
|
||||||
# Python
|
# Python
|
||||||
import logging
|
import logging
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import requests
|
|
||||||
import time
|
|
||||||
import threading
|
|
||||||
import socket
|
|
||||||
import select
|
|
||||||
from urllib import parse as urlparse
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
from requests.exceptions import RequestException
|
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
# requests futures, a dependency used by these handlers
|
|
||||||
from requests_futures.sessions import FuturesSession
|
|
||||||
import cachetools
|
|
||||||
|
|
||||||
# AWX
|
|
||||||
from awx.main.utils.formatters import LogstashFormatter
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ['BaseHTTPSHandler', 'TCPHandler', 'UDPHandler',
|
|
||||||
'AWXProxyHandler']
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.utils.handlers')
|
|
||||||
|
|
||||||
|
|
||||||
# Translation of parameter names to names in Django settings
|
|
||||||
# logging settings category, only those related to handler / log emission
|
|
||||||
PARAM_NAMES = {
|
|
||||||
'host': 'LOG_AGGREGATOR_HOST',
|
|
||||||
'port': 'LOG_AGGREGATOR_PORT',
|
|
||||||
'message_type': 'LOG_AGGREGATOR_TYPE',
|
|
||||||
'username': 'LOG_AGGREGATOR_USERNAME',
|
|
||||||
'password': 'LOG_AGGREGATOR_PASSWORD',
|
|
||||||
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
|
|
||||||
'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT',
|
|
||||||
'verify_cert': 'LOG_AGGREGATOR_VERIFY_CERT',
|
|
||||||
'protocol': 'LOG_AGGREGATOR_PROTOCOL'
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def unused_callback(sess, resp):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class LoggingConnectivityException(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class VerboseThreadPoolExecutor(ThreadPoolExecutor):
|
|
||||||
|
|
||||||
last_log_emit = 0
|
|
||||||
|
|
||||||
def submit(self, func, *args, **kwargs):
|
|
||||||
def _wrapped(*args, **kwargs):
|
|
||||||
try:
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
except Exception:
|
|
||||||
# If an exception occurs in a concurrent thread worker (like
|
|
||||||
# a ConnectionError or a read timeout), periodically log
|
|
||||||
# that failure.
|
|
||||||
#
|
|
||||||
# This approach isn't really thread-safe, so we could
|
|
||||||
# potentially log once per thread every 10 seconds, but it
|
|
||||||
# beats logging *every* failed HTTP request in a scenario where
|
|
||||||
# you've typo'd your log aggregator hostname.
|
|
||||||
now = time.time()
|
|
||||||
if now - self.last_log_emit > 10:
|
|
||||||
logger.exception('failed to emit log to external aggregator')
|
|
||||||
self.last_log_emit = now
|
|
||||||
raise
|
|
||||||
return super(VerboseThreadPoolExecutor, self).submit(_wrapped, *args,
|
|
||||||
**kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class SocketResult:
|
|
||||||
'''
|
|
||||||
A class to be the return type of methods that send data over a socket
|
|
||||||
allows object to be used in the same way as a request futures object
|
|
||||||
'''
|
|
||||||
def __init__(self, ok, reason=None):
|
|
||||||
self.ok = ok
|
|
||||||
self.reason = reason
|
|
||||||
|
|
||||||
def result(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
|
|
||||||
class BaseHandler(logging.Handler):
|
|
||||||
def __init__(self, host=None, port=None, indv_facts=None, **kwargs):
|
|
||||||
super(BaseHandler, self).__init__()
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.indv_facts = indv_facts
|
|
||||||
|
|
||||||
def _send(self, payload):
|
|
||||||
"""Actually send message to log aggregator.
|
|
||||||
"""
|
|
||||||
return payload
|
|
||||||
|
|
||||||
def _format_and_send_record(self, record):
|
|
||||||
if self.indv_facts:
|
|
||||||
return [self._send(json.loads(self.format(record)))]
|
|
||||||
return [self._send(self.format(record))]
|
|
||||||
|
|
||||||
def emit(self, record):
|
|
||||||
"""
|
|
||||||
Emit a log record. Returns a list of zero or more
|
|
||||||
implementation-specific objects for tests.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return self._format_and_send_record(record)
|
|
||||||
except (KeyboardInterrupt, SystemExit):
|
|
||||||
raise
|
|
||||||
except Exception:
|
|
||||||
self.handleError(record)
|
|
||||||
|
|
||||||
def _get_host(self, scheme='', hostname_only=False):
|
|
||||||
"""Return the host name of log aggregator.
|
|
||||||
"""
|
|
||||||
host = self.host or ''
|
|
||||||
# urlparse requires '//' to be provided if scheme is not specified
|
|
||||||
original_parsed = urlparse.urlsplit(host)
|
|
||||||
if (not original_parsed.scheme and not host.startswith('//')) or original_parsed.hostname is None:
|
|
||||||
host = '%s://%s' % (scheme, host) if scheme else '//%s' % host
|
|
||||||
parsed = urlparse.urlsplit(host)
|
|
||||||
|
|
||||||
if hostname_only:
|
|
||||||
return parsed.hostname
|
|
||||||
|
|
||||||
try:
|
|
||||||
port = parsed.port or self.port
|
|
||||||
except ValueError:
|
|
||||||
port = self.port
|
|
||||||
netloc = parsed.netloc if port is None else '%s:%s' % (parsed.hostname, port)
|
|
||||||
|
|
||||||
url_components = list(parsed)
|
|
||||||
url_components[1] = netloc
|
|
||||||
ret = urlparse.urlunsplit(url_components)
|
|
||||||
return ret.lstrip('/')
|
|
||||||
|
|
||||||
|
|
||||||
class BaseHTTPSHandler(BaseHandler):
|
|
||||||
'''
|
|
||||||
Originally derived from python-logstash library
|
|
||||||
Non-blocking request accomplished by FuturesSession, similar
|
|
||||||
to the loggly-python-handler library
|
|
||||||
'''
|
|
||||||
def _add_auth_information(self):
|
|
||||||
if self.message_type == 'logstash':
|
|
||||||
if not self.username:
|
|
||||||
# Logstash authentication not enabled
|
|
||||||
return
|
|
||||||
logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password)
|
|
||||||
self.session.auth = logstash_auth
|
|
||||||
elif self.message_type == 'splunk':
|
|
||||||
auth_header = "Splunk %s" % self.password
|
|
||||||
headers = {
|
|
||||||
"Authorization": auth_header,
|
|
||||||
"Content-Type": "application/json"
|
|
||||||
}
|
|
||||||
self.session.headers.update(headers)
|
|
||||||
|
|
||||||
def __init__(self, fqdn=False, message_type=None, username=None, password=None,
|
|
||||||
tcp_timeout=5, verify_cert=True, **kwargs):
|
|
||||||
self.fqdn = fqdn
|
|
||||||
self.message_type = message_type
|
|
||||||
self.username = username
|
|
||||||
self.password = password
|
|
||||||
self.tcp_timeout = tcp_timeout
|
|
||||||
self.verify_cert = verify_cert
|
|
||||||
super(BaseHTTPSHandler, self).__init__(**kwargs)
|
|
||||||
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
|
|
||||||
max_workers=2 # this is the default used by requests_futures
|
|
||||||
))
|
|
||||||
self._add_auth_information()
|
|
||||||
|
|
||||||
def _get_post_kwargs(self, payload_input):
|
|
||||||
if self.message_type == 'splunk':
|
|
||||||
# Splunk needs data nested under key "event"
|
|
||||||
if not isinstance(payload_input, dict):
|
|
||||||
payload_input = json.loads(payload_input)
|
|
||||||
payload_input = {'event': payload_input}
|
|
||||||
if isinstance(payload_input, dict):
|
|
||||||
payload_str = json.dumps(payload_input)
|
|
||||||
else:
|
|
||||||
payload_str = payload_input
|
|
||||||
kwargs = dict(data=payload_str, background_callback=unused_callback,
|
|
||||||
timeout=self.tcp_timeout)
|
|
||||||
if self.verify_cert is False:
|
|
||||||
kwargs['verify'] = False
|
|
||||||
return kwargs
|
|
||||||
|
|
||||||
|
|
||||||
def _send(self, payload):
|
|
||||||
"""See:
|
|
||||||
https://docs.python.org/3/library/concurrent.futures.html#future-objects
|
|
||||||
http://pythonhosted.org/futures/
|
|
||||||
"""
|
|
||||||
return self.session.post(self._get_host(scheme='https'),
|
|
||||||
**self._get_post_kwargs(payload))
|
|
||||||
|
|
||||||
|
|
||||||
def _encode_payload_for_socket(payload):
|
|
||||||
encoded_payload = payload
|
|
||||||
if isinstance(encoded_payload, dict):
|
|
||||||
encoded_payload = json.dumps(encoded_payload, ensure_ascii=False)
|
|
||||||
if isinstance(encoded_payload, str):
|
|
||||||
encoded_payload = encoded_payload.encode('utf-8')
|
|
||||||
return encoded_payload
|
|
||||||
|
|
||||||
|
|
||||||
class TCPHandler(BaseHandler):
|
|
||||||
def __init__(self, tcp_timeout=5, **kwargs):
|
|
||||||
self.tcp_timeout = tcp_timeout
|
|
||||||
super(TCPHandler, self).__init__(**kwargs)
|
|
||||||
|
|
||||||
def _send(self, payload):
|
|
||||||
payload = _encode_payload_for_socket(payload)
|
|
||||||
sok = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
try:
|
|
||||||
sok.connect((self._get_host(hostname_only=True), self.port or 0))
|
|
||||||
sok.setblocking(0)
|
|
||||||
_, ready_to_send, _ = select.select([], [sok], [], float(self.tcp_timeout))
|
|
||||||
if len(ready_to_send) == 0:
|
|
||||||
ret = SocketResult(False, "Socket currently busy, failed to send message")
|
|
||||||
logger.warning(ret.reason)
|
|
||||||
else:
|
|
||||||
sok.send(payload)
|
|
||||||
ret = SocketResult(True) # success!
|
|
||||||
except Exception as e:
|
|
||||||
ret = SocketResult(False, "Error sending message from %s: %s" %
|
|
||||||
(TCPHandler.__name__,
|
|
||||||
' '.join(str(arg) for arg in e.args)))
|
|
||||||
logger.exception(ret.reason)
|
|
||||||
finally:
|
|
||||||
sok.close()
|
|
||||||
return ret
|
|
||||||
|
|
||||||
|
|
||||||
class UDPHandler(BaseHandler):
|
|
||||||
message = "Cannot determine if UDP messages are received."
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
super(UDPHandler, self).__init__(**kwargs)
|
|
||||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
||||||
|
|
||||||
def _send(self, payload):
|
|
||||||
payload = _encode_payload_for_socket(payload)
|
|
||||||
self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0))
|
|
||||||
return SocketResult(True, reason=self.message)
|
|
||||||
|
|
||||||
|
|
||||||
class AWXNullHandler(logging.NullHandler):
|
|
||||||
'''
|
|
||||||
Only additional this does is accept arbitrary __init__ params because
|
|
||||||
the proxy handler does not (yet) work with arbitrary handler classes
|
|
||||||
'''
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(AWXNullHandler, self).__init__()
|
|
||||||
|
|
||||||
|
|
||||||
HANDLER_MAPPING = {
|
|
||||||
'https': BaseHTTPSHandler,
|
|
||||||
'tcp': TCPHandler,
|
|
||||||
'udp': UDPHandler,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
TTLCache = cachetools.TTLCache
|
|
||||||
|
|
||||||
if 'py.test' in os.environ.get('_', ''):
|
|
||||||
# don't cache settings in unit tests
|
|
||||||
class TTLCache(TTLCache):
|
|
||||||
|
|
||||||
def __getitem__(self, item):
|
|
||||||
raise KeyError()
|
|
||||||
|
|
||||||
|
|
||||||
class AWXProxyHandler(logging.Handler):
|
|
||||||
'''
|
|
||||||
Handler specific to the AWX external logging feature
|
|
||||||
|
|
||||||
Will dynamically create a handler specific to the configured
|
|
||||||
protocol, and will create a new one automatically on setting change
|
|
||||||
|
|
||||||
Managing parameters:
|
|
||||||
All parameters will get their value from settings as a default
|
|
||||||
if the parameter was either provided on init, or set manually,
|
|
||||||
this value will take precedence.
|
|
||||||
Parameters match same parameters in the actualized handler classes.
|
|
||||||
'''
|
|
||||||
|
|
||||||
thread_local = threading.local()
|
|
||||||
_auditor = None
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
|
||||||
# TODO: process 'level' kwarg
|
|
||||||
super(AWXProxyHandler, self).__init__(**kwargs)
|
|
||||||
self._handler = None
|
|
||||||
self._old_kwargs = {}
|
|
||||||
|
|
||||||
@property
|
|
||||||
def auditor(self):
|
|
||||||
if not self._auditor:
|
|
||||||
self._auditor = logging.handlers.RotatingFileHandler(
|
|
||||||
filename='/var/log/tower/external.log',
|
|
||||||
maxBytes=1024 * 1024 * 50, # 50 MB
|
|
||||||
backupCount=5,
|
|
||||||
)
|
|
||||||
|
|
||||||
class WritableLogstashFormatter(LogstashFormatter):
|
|
||||||
@classmethod
|
|
||||||
def serialize(cls, message):
|
|
||||||
return json.dumps(message)
|
|
||||||
|
|
||||||
self._auditor.setFormatter(WritableLogstashFormatter())
|
|
||||||
return self._auditor
|
|
||||||
|
|
||||||
def get_handler_class(self, protocol):
|
|
||||||
return HANDLER_MAPPING.get(protocol, AWXNullHandler)
|
|
||||||
|
|
||||||
@cachetools.cached(cache=TTLCache(maxsize=1, ttl=3), key=lambda *args, **kw: 'get_handler')
|
|
||||||
def get_handler(self, custom_settings=None, force_create=False):
|
|
||||||
new_kwargs = {}
|
|
||||||
use_settings = custom_settings or settings
|
|
||||||
for field_name, setting_name in PARAM_NAMES.items():
|
|
||||||
val = getattr(use_settings, setting_name, None)
|
|
||||||
if val is None:
|
|
||||||
continue
|
|
||||||
new_kwargs[field_name] = val
|
|
||||||
if new_kwargs == self._old_kwargs and self._handler and (not force_create):
|
|
||||||
# avoids re-creating session objects, and other such things
|
|
||||||
return self._handler
|
|
||||||
self._old_kwargs = new_kwargs.copy()
|
|
||||||
# TODO: remove any kwargs no applicable to that particular handler
|
|
||||||
protocol = new_kwargs.pop('protocol', None)
|
|
||||||
HandlerClass = self.get_handler_class(protocol)
|
|
||||||
# cleanup old handler and make new one
|
|
||||||
if self._handler:
|
|
||||||
self._handler.close()
|
|
||||||
logger.debug('Creating external log handler due to startup or settings change.')
|
|
||||||
self._handler = HandlerClass(**new_kwargs)
|
|
||||||
if self.formatter:
|
|
||||||
# self.format(record) is called inside of emit method
|
|
||||||
# so not safe to assume this can be handled within self
|
|
||||||
self._handler.setFormatter(self.formatter)
|
|
||||||
return self._handler
|
|
||||||
|
|
||||||
@cachetools.cached(cache=TTLCache(maxsize=1, ttl=3), key=lambda *args, **kw: 'should_audit')
|
|
||||||
def should_audit(self):
|
|
||||||
return settings.LOG_AGGREGATOR_AUDIT
|
|
||||||
|
|
||||||
def emit(self, record):
|
|
||||||
if AWXProxyHandler.thread_local.enabled:
|
|
||||||
actual_handler = self.get_handler()
|
|
||||||
if self.should_audit():
|
|
||||||
self.auditor.setLevel(settings.LOG_AGGREGATOR_LEVEL)
|
|
||||||
self.auditor.emit(record)
|
|
||||||
return actual_handler.emit(record)
|
|
||||||
|
|
||||||
def perform_test(self, custom_settings):
|
|
||||||
"""
|
|
||||||
Tests logging connectivity for given settings module.
|
|
||||||
@raises LoggingConnectivityException
|
|
||||||
"""
|
|
||||||
handler = self.get_handler(custom_settings=custom_settings, force_create=True)
|
|
||||||
handler.setFormatter(LogstashFormatter())
|
|
||||||
logger = logging.getLogger(__file__)
|
|
||||||
fn, lno, func, _ = logger.findCaller()
|
|
||||||
record = logger.makeRecord('awx', 10, fn, lno,
|
|
||||||
'AWX Connection Test', tuple(),
|
|
||||||
None, func)
|
|
||||||
futures = handler.emit(record)
|
|
||||||
for future in futures:
|
|
||||||
try:
|
|
||||||
resp = future.result()
|
|
||||||
if not resp.ok:
|
|
||||||
if isinstance(resp, SocketResult):
|
|
||||||
raise LoggingConnectivityException(
|
|
||||||
'Socket error: {}'.format(resp.reason or '')
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise LoggingConnectivityException(
|
|
||||||
': '.join([str(resp.status_code), resp.reason or ''])
|
|
||||||
)
|
|
||||||
except RequestException as e:
|
|
||||||
raise LoggingConnectivityException(str(e))
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def disable(cls):
|
|
||||||
cls.thread_local.enabled = False
|
|
||||||
|
|
||||||
|
|
||||||
AWXProxyHandler.thread_local.enabled = True
|
|
||||||
|
|
||||||
|
|
||||||
ColorHandler = logging.StreamHandler
|
ColorHandler = logging.StreamHandler
|
||||||
|
|
||||||
if settings.COLOR_LOGS is True:
|
if settings.COLOR_LOGS is True:
|
||||||
|
|||||||
@@ -11,18 +11,13 @@ from django.conf import settings
|
|||||||
logger = logging.getLogger('awx.main.utils.reload')
|
logger = logging.getLogger('awx.main.utils.reload')
|
||||||
|
|
||||||
|
|
||||||
def _supervisor_service_command(command, communicate=True):
|
def supervisor_service_command(command, service='*', communicate=True):
|
||||||
'''
|
'''
|
||||||
example use pattern of supervisorctl:
|
example use pattern of supervisorctl:
|
||||||
# supervisorctl restart tower-processes:receiver tower-processes:factcacher
|
# supervisorctl restart tower-processes:receiver tower-processes:factcacher
|
||||||
'''
|
'''
|
||||||
group_name = 'tower-processes'
|
|
||||||
if settings.DEBUG:
|
|
||||||
group_name = 'awx-processes'
|
|
||||||
args = ['supervisorctl']
|
args = ['supervisorctl']
|
||||||
if settings.DEBUG:
|
args.extend([command, ':'.join(['tower-processes', service])])
|
||||||
args.extend(['-c', '/supervisor.conf'])
|
|
||||||
args.extend([command, '{}:*'.format(group_name)])
|
|
||||||
logger.debug('Issuing command to {} services, args={}'.format(command, args))
|
logger.debug('Issuing command to {} services, args={}'.format(command, args))
|
||||||
supervisor_process = subprocess.Popen(args, stdin=subprocess.PIPE,
|
supervisor_process = subprocess.Popen(args, stdin=subprocess.PIPE,
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
@@ -41,4 +36,4 @@ def _supervisor_service_command(command, communicate=True):
|
|||||||
|
|
||||||
def stop_local_services(communicate=True):
|
def stop_local_services(communicate=True):
|
||||||
logger.warn('Stopping services on this node in response to user action')
|
logger.warn('Stopping services on this node in response to user action')
|
||||||
_supervisor_service_command(command='stop', communicate=communicate)
|
supervisor_service_command(command='stop', communicate=communicate)
|
||||||
|
|||||||
@@ -1011,8 +1011,9 @@ LOGGING = {
|
|||||||
'formatter': 'simple',
|
'formatter': 'simple',
|
||||||
},
|
},
|
||||||
'external_logger': {
|
'external_logger': {
|
||||||
'class': 'awx.main.utils.handlers.AWXProxyHandler',
|
'class': 'logging.handlers.SysLogHandler',
|
||||||
'formatter': 'json',
|
'formatter': 'json',
|
||||||
|
'address': ('localhost', 51414),
|
||||||
'filters': ['external_log_enabled', 'dynamic_level_filter'],
|
'filters': ['external_log_enabled', 'dynamic_level_filter'],
|
||||||
},
|
},
|
||||||
'tower_warnings': {
|
'tower_warnings': {
|
||||||
|
|||||||
@@ -78,7 +78,7 @@
|
|||||||
ngClick: 'vm.testLogging()',
|
ngClick: 'vm.testLogging()',
|
||||||
label: i18n._('Test'),
|
label: i18n._('Test'),
|
||||||
class: 'btn-primary',
|
class: 'btn-primary',
|
||||||
ngDisabled: 'configuration_logging_template_form.$invalid'
|
ngDisabled: 'configuration_logging_template_form.$pending'
|
||||||
},
|
},
|
||||||
cancel: {
|
cancel: {
|
||||||
ngClick: 'vm.formCancel()',
|
ngClick: 'vm.formCancel()',
|
||||||
|
|||||||
@@ -1,13 +0,0 @@
|
|||||||
Copyright 2013 Ross McFarland
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
@@ -42,7 +42,6 @@ social-auth-core==3.3.1 # see UPGRADE BLOCKERs
|
|||||||
social-auth-app-django==3.1.0 # see UPGRADE BLOCKERs
|
social-auth-app-django==3.1.0 # see UPGRADE BLOCKERs
|
||||||
redis
|
redis
|
||||||
requests
|
requests
|
||||||
requests-futures # see library notes
|
|
||||||
slackclient==1.1.2 # see UPGRADE BLOCKERs
|
slackclient==1.1.2 # see UPGRADE BLOCKERs
|
||||||
tacacs_plus==1.0 # UPGRADE BLOCKER: auth does not work with later versions
|
tacacs_plus==1.0 # UPGRADE BLOCKER: auth does not work with later versions
|
||||||
twilio
|
twilio
|
||||||
|
|||||||
@@ -102,6 +102,10 @@ RUN cd /usr/local/bin && \
|
|||||||
curl -L https://github.com/openshift/origin/releases/download/v3.9.0/openshift-origin-client-tools-v3.9.0-191fece-linux-64bit.tar.gz | \
|
curl -L https://github.com/openshift/origin/releases/download/v3.9.0/openshift-origin-client-tools-v3.9.0-191fece-linux-64bit.tar.gz | \
|
||||||
tar -xz --strip-components=1 --wildcards --no-anchored 'oc'
|
tar -xz --strip-components=1 --wildcards --no-anchored 'oc'
|
||||||
|
|
||||||
|
ADD tools/docker-compose/rsyslog.repo /etc/yum.repos.d/
|
||||||
|
RUN yum install -y rsyslog-omhttp
|
||||||
|
RUN echo '$IncludeConfig /var/lib/awx/rsyslog.conf' >> /etc/rsyslog.conf
|
||||||
|
|
||||||
RUN dnf -y clean all && rm -rf /root/.cache
|
RUN dnf -y clean all && rm -rf /root/.cache
|
||||||
|
|
||||||
# https://github.com/ansible/awx/issues/5224
|
# https://github.com/ansible/awx/issues/5224
|
||||||
|
|||||||
7
tools/docker-compose/rsyslog.repo
Normal file
7
tools/docker-compose/rsyslog.repo
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
[rsyslog_v8]
|
||||||
|
name=Adiscon CentOS-$releasever - local packages for $basearch
|
||||||
|
baseurl=http://rpms.adiscon.com/v8-stable/epel-$releasever/$basearch
|
||||||
|
enabled=1
|
||||||
|
gpgcheck=0
|
||||||
|
gpgkey=http://rpms.adiscon.com/RPM-GPG-KEY-Adiscon
|
||||||
|
protect=1
|
||||||
@@ -71,8 +71,20 @@ redirect_stderr=true
|
|||||||
stdout_logfile=/dev/fd/1
|
stdout_logfile=/dev/fd/1
|
||||||
stdout_logfile_maxbytes=0
|
stdout_logfile_maxbytes=0
|
||||||
|
|
||||||
|
[program:awx-rsyslogd]
|
||||||
|
command = rsyslogd -nd -i /awx_devel/rsyslog.pid
|
||||||
|
autostart = true
|
||||||
|
autorestart = true
|
||||||
|
stopwaitsecs = 1
|
||||||
|
stopsignal=KILL
|
||||||
|
stopasgroup=true
|
||||||
|
killasgroup=true
|
||||||
|
redirect_stderr=true
|
||||||
|
stdout_logfile=/tmp/ryan
|
||||||
|
stdout_logfile_maxbytes=0
|
||||||
|
|
||||||
[group:tower-processes]
|
[group:tower-processes]
|
||||||
programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsbroadcast
|
programs=awx-dispatcher,awx-receiver,awx-runworker,awx-uwsgi,awx-daphne,awx-nginx,awx-wsbroadcast,awx-rsyslogd
|
||||||
priority=5
|
priority=5
|
||||||
|
|
||||||
[unix_http_server]
|
[unix_http_server]
|
||||||
|
|||||||
Reference in New Issue
Block a user