From 589d27c88c4d514026c1425f4f7fa8cda188156e Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 23 Oct 2019 23:54:47 -0400 Subject: [PATCH] POC: replace our external log aggregation feature with rsyslog - this change adds rsyslog (https://github.com/rsyslog/rsyslog) as a new service that runs on every AWX node (managed by supervisord) in particular, this feature requires a recent version (v8.38+) of rsyslog that supports the omhttp module (https://github.com/rsyslog/rsyslog-doc/pull/750) - the "external_logger" handler in AWX is now a SysLogHandler that ships logs to the local UDP port where rsyslog is configured to listen (by default, 51414) - every time a LOG_AGGREGATOR_* setting is changed, every AWX node reconfigures and restarts its local instance of rsyslog so that its fowarding settings match what has been configured in AWX - unlike the prior implementation, if the external logging aggregator (splunk/logstash) goes temporarily offline, rsyslog will retain the messages and ship them when the log aggregator is back online - 4xx or 5xx level errors are recorded at /var/log/tower/external.err --- awx/conf/views.py | 38 +- .../management/commands/run_dispatcher.py | 6 - awx/main/tasks.py | 9 +- awx/main/tests/unit/utils/test_handlers.py | 393 ----------------- awx/main/utils/external_logging.py | 65 +++ awx/main/utils/handlers.py | 396 ------------------ awx/main/utils/reload.py | 11 +- awx/settings/defaults.py | 3 +- .../sub-forms/system-logging.form.js | 2 +- docs/licenses/requests-futures.txt | 13 - requirements/requirements.in | 1 - tools/docker-compose/Dockerfile | 4 + tools/docker-compose/rsyslog.repo | 7 + tools/docker-compose/supervisor.conf | 14 +- 14 files changed, 106 insertions(+), 856 deletions(-) delete mode 100644 awx/main/tests/unit/utils/test_handlers.py create mode 100644 awx/main/utils/external_logging.py delete mode 100644 docs/licenses/requests-futures.txt create mode 100644 tools/docker-compose/rsyslog.repo diff --git a/awx/conf/views.py b/awx/conf/views.py index 13b72a926f..7a2a21a713 100644 --- a/awx/conf/views.py +++ b/awx/conf/views.py @@ -3,6 +3,7 @@ # Python import collections +import logging import sys # Django @@ -26,7 +27,6 @@ from awx.api.generics import ( from awx.api.permissions import IsSuperUser from awx.api.versioning import reverse from awx.main.utils import camelcase_to_underscore -from awx.main.utils.handlers import AWXProxyHandler, LoggingConnectivityException from awx.main.tasks import handle_setting_changes from awx.conf.models import Setting from awx.conf.serializers import SettingCategorySerializer, SettingSingletonSerializer @@ -161,40 +161,8 @@ class SettingLoggingTest(GenericAPIView): filter_backends = [] def post(self, request, *args, **kwargs): - defaults = dict() - for key in settings_registry.get_registered_settings(category_slug='logging'): - try: - defaults[key] = settings_registry.get_setting_field(key).get_default() - except serializers.SkipField: - defaults[key] = None - obj = type('Settings', (object,), defaults)() - serializer = self.get_serializer(obj, data=request.data) - serializer.is_valid(raise_exception=True) - # Special validation specific to logging test. - errors = {} - for key in ['LOG_AGGREGATOR_TYPE', 'LOG_AGGREGATOR_HOST']: - if not request.data.get(key, ''): - errors[key] = 'This field is required.' - if errors: - raise ValidationError(errors) - - if request.data.get('LOG_AGGREGATOR_PASSWORD', '').startswith('$encrypted$'): - serializer.validated_data['LOG_AGGREGATOR_PASSWORD'] = getattr( - settings, 'LOG_AGGREGATOR_PASSWORD', '' - ) - - try: - class MockSettings: - pass - mock_settings = MockSettings() - for k, v in serializer.validated_data.items(): - setattr(mock_settings, k, v) - AWXProxyHandler().perform_test(custom_settings=mock_settings) - if mock_settings.LOG_AGGREGATOR_PROTOCOL.upper() == 'UDP': - return Response(status=status.HTTP_201_CREATED) - except LoggingConnectivityException as e: - return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return Response(status=status.HTTP_200_OK) + logging.getLogger('awx').info('AWX Connection Test') + return Response(status=status.HTTP_202_ACCEPTED) # Create view functions for all of the class-based views to simplify inclusion diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 5f7db4f106..e22baa379f 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -7,7 +7,6 @@ from django.core.cache import cache as django_cache from django.core.management.base import BaseCommand from django.db import connection as django_connection -from awx.main.utils.handlers import AWXProxyHandler from awx.main.dispatch import get_local_queuename, reaper from awx.main.dispatch.control import Control from awx.main.dispatch.pool import AutoscalePool @@ -56,11 +55,6 @@ class Command(BaseCommand): reaper.reap() consumer = None - # don't ship external logs inside the dispatcher's parent process - # this exists to work around a race condition + deadlock bug on fork - # in cpython itself: - # https://bugs.python.org/issue37429 - AWXProxyHandler.disable() try: queues = ['tower_broadcast_all', get_local_queuename()] consumer = AWXConsumerPG( diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 6374815769..3116351644 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -72,7 +72,8 @@ from awx.main.utils import (get_ssh_version, update_scm_url, ignore_inventory_group_removal, extract_ansible_vars, schedule_task_manager, get_awx_version) from awx.main.utils.ansible import read_ansible_config -from awx.main.utils.common import _get_ansible_version, get_custom_venv_choices +from awx.main.utils.common import get_ansible_version, _get_ansible_version, get_custom_venv_choices +from awx.main.utils.external_logging import reconfigure_rsyslog from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock @@ -280,6 +281,12 @@ def handle_setting_changes(setting_keys): logger.debug('cache delete_many(%r)', cache_keys) cache.delete_many(cache_keys) + if any([ + setting.startswith('LOG_AGGREGATOR') + for setting in setting_keys + ]): + reconfigure_rsyslog() + @task(queue='tower_broadcast_all') def delete_project_files(project_path): diff --git a/awx/main/tests/unit/utils/test_handlers.py b/awx/main/tests/unit/utils/test_handlers.py deleted file mode 100644 index 6fa9b1f992..0000000000 --- a/awx/main/tests/unit/utils/test_handlers.py +++ /dev/null @@ -1,393 +0,0 @@ -# -*- coding: utf-8 -*- -import base64 -import logging -import socket -import datetime -from dateutil.tz import tzutc -from io import StringIO -from uuid import uuid4 - -from unittest import mock - -from django.conf import LazySettings -from django.utils.encoding import smart_str -import pytest -import requests -from requests_futures.sessions import FuturesSession - -from awx.main.utils.handlers import (BaseHandler, BaseHTTPSHandler as HTTPSHandler, - TCPHandler, UDPHandler, _encode_payload_for_socket, - PARAM_NAMES, LoggingConnectivityException, - AWXProxyHandler) -from awx.main.utils.formatters import LogstashFormatter - - -@pytest.fixture() -def https_adapter(): - class FakeHTTPSAdapter(requests.adapters.HTTPAdapter): - requests = [] - status = 200 - reason = None - - def send(self, request, **kwargs): - self.requests.append(request) - resp = requests.models.Response() - resp.status_code = self.status - resp.reason = self.reason - resp.request = request - return resp - - return FakeHTTPSAdapter() - - -@pytest.fixture() -def connection_error_adapter(): - class ConnectionErrorAdapter(requests.adapters.HTTPAdapter): - - def send(self, request, **kwargs): - err = requests.packages.urllib3.exceptions.SSLError() - raise requests.exceptions.ConnectionError(err, request=request) - - return ConnectionErrorAdapter() - - -@pytest.fixture -def fake_socket(tmpdir_factory, request): - sok = socket.socket - sok.send = mock.MagicMock() - sok.connect = mock.MagicMock() - sok.setblocking = mock.MagicMock() - sok.close = mock.MagicMock() - return sok - - -def test_https_logging_handler_requests_async_implementation(): - handler = HTTPSHandler() - assert isinstance(handler.session, FuturesSession) - - -def test_https_logging_handler_has_default_http_timeout(): - handler = TCPHandler() - assert handler.tcp_timeout == 5 - - -@pytest.mark.parametrize('param', ['host', 'port', 'indv_facts']) -def test_base_logging_handler_defaults(param): - handler = BaseHandler() - assert hasattr(handler, param) and getattr(handler, param) is None - - -@pytest.mark.parametrize('param', ['host', 'port', 'indv_facts']) -def test_base_logging_handler_kwargs(param): - handler = BaseHandler(**{param: 'EXAMPLE'}) - assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE' - - -@pytest.mark.parametrize('params', [ - { - 'LOG_AGGREGATOR_HOST': 'https://server.invalid', - 'LOG_AGGREGATOR_PORT': 22222, - 'LOG_AGGREGATOR_TYPE': 'loggly', - 'LOG_AGGREGATOR_USERNAME': 'foo', - 'LOG_AGGREGATOR_PASSWORD': 'bar', - 'LOG_AGGREGATOR_INDIVIDUAL_FACTS': True, - 'LOG_AGGREGATOR_TCP_TIMEOUT': 96, - 'LOG_AGGREGATOR_VERIFY_CERT': False, - 'LOG_AGGREGATOR_PROTOCOL': 'https' - }, - { - 'LOG_AGGREGATOR_HOST': 'https://server.invalid', - 'LOG_AGGREGATOR_PORT': 22222, - 'LOG_AGGREGATOR_PROTOCOL': 'udp' - } -]) -def test_real_handler_from_django_settings(params): - settings = LazySettings() - settings.configure(**params) - handler = AWXProxyHandler().get_handler(custom_settings=settings) - # need the _reverse_ dictionary from PARAM_NAMES - attr_lookup = {} - for attr_name, setting_name in PARAM_NAMES.items(): - attr_lookup[setting_name] = attr_name - for setting_name, val in params.items(): - attr_name = attr_lookup[setting_name] - if attr_name == 'protocol': - continue - assert hasattr(handler, attr_name) - - -def test_invalid_kwarg_to_real_handler(): - settings = LazySettings() - settings.configure(**{ - 'LOG_AGGREGATOR_HOST': 'https://server.invalid', - 'LOG_AGGREGATOR_PORT': 22222, - 'LOG_AGGREGATOR_PROTOCOL': 'udp', - 'LOG_AGGREGATOR_VERIFY_CERT': False # setting not valid for UDP handler - }) - handler = AWXProxyHandler().get_handler(custom_settings=settings) - assert not hasattr(handler, 'verify_cert') - - -def test_protocol_not_specified(): - settings = LazySettings() - settings.configure(**{ - 'LOG_AGGREGATOR_HOST': 'https://server.invalid', - 'LOG_AGGREGATOR_PORT': 22222, - 'LOG_AGGREGATOR_PROTOCOL': None # awx/settings/defaults.py - }) - handler = AWXProxyHandler().get_handler(custom_settings=settings) - assert isinstance(handler, logging.NullHandler) - - -def test_base_logging_handler_emit_system_tracking(dummy_log_record): - handler = BaseHandler(host='127.0.0.1', indv_facts=True) - handler.setFormatter(LogstashFormatter()) - dummy_log_record.name = 'awx.analytics.system_tracking' - dummy_log_record.msg = None - dummy_log_record.inventory_id = 11 - dummy_log_record.host_name = 'my_lucky_host' - dummy_log_record.job_id = 777 - dummy_log_record.ansible_facts = { - "ansible_kernel": "4.4.66-boot2docker", - "ansible_machine": "x86_64", - "ansible_swapfree_mb": 4663, - } - dummy_log_record.ansible_facts_modified = datetime.datetime.now(tzutc()).isoformat() - sent_payloads = handler.emit(dummy_log_record) - - assert len(sent_payloads) == 1 - assert sent_payloads[0]['ansible_facts'] == dummy_log_record.ansible_facts - assert sent_payloads[0]['ansible_facts_modified'] == dummy_log_record.ansible_facts_modified - assert sent_payloads[0]['level'] == 'INFO' - assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking' - assert sent_payloads[0]['job_id'] == dummy_log_record.job_id - assert sent_payloads[0]['inventory_id'] == dummy_log_record.inventory_id - assert sent_payloads[0]['host_name'] == dummy_log_record.host_name - - -@pytest.mark.parametrize('host, port, normalized, hostname_only', [ - ('http://localhost', None, 'http://localhost', False), - ('http://localhost', 8080, 'http://localhost:8080', False), - ('https://localhost', 443, 'https://localhost:443', False), - ('ftp://localhost', 443, 'ftp://localhost:443', False), - ('https://localhost:550', 443, 'https://localhost:550', False), - ('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar', False), - ('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar', False), - ('http://splunk.server:8088/services/collector/event', 80, - 'http://splunk.server:8088/services/collector/event', False), - ('http://splunk.server/services/collector/event', 8088, - 'http://splunk.server:8088/services/collector/event', False), - ('splunk.server:8088/services/collector/event', 80, - 'http://splunk.server:8088/services/collector/event', False), - ('splunk.server/services/collector/event', 8088, - 'http://splunk.server:8088/services/collector/event', False), - ('localhost', None, 'http://localhost', False), - ('localhost', 8080, 'http://localhost:8080', False), - ('localhost', 4399, 'localhost', True), - ('tcp://localhost:4399/foo/bar', 4399, 'localhost', True), -]) -def test_base_logging_handler_host_format(host, port, normalized, hostname_only): - handler = BaseHandler(host=host, port=port) - assert handler._get_host(scheme='http', hostname_only=hostname_only) == normalized - - -@pytest.mark.parametrize( - 'status, reason, exc', - [(200, '200 OK', None), (404, 'Not Found', LoggingConnectivityException)] -) -@pytest.mark.parametrize('protocol', ['http', 'https', None]) -def test_https_logging_handler_connectivity_test(https_adapter, status, reason, exc, protocol): - host = 'example.org' - if protocol: - host = '://'.join([protocol, host]) - https_adapter.status = status - https_adapter.reason = reason - settings = LazySettings() - settings.configure(**{ - 'LOG_AGGREGATOR_HOST': host, - 'LOG_AGGREGATOR_PORT': 8080, - 'LOG_AGGREGATOR_TYPE': 'logstash', - 'LOG_AGGREGATOR_USERNAME': 'user', - 'LOG_AGGREGATOR_PASSWORD': 'password', - 'LOG_AGGREGATOR_LOGGERS': ['awx', 'activity_stream', 'job_events', 'system_tracking'], - 'LOG_AGGREGATOR_PROTOCOL': 'https', - 'CLUSTER_HOST_ID': '', - 'LOG_AGGREGATOR_TOWER_UUID': str(uuid4()), - 'LOG_AGGREGATOR_LEVEL': 'DEBUG', - }) - - class FakeHTTPSHandler(HTTPSHandler): - - def __init__(self, *args, **kwargs): - super(FakeHTTPSHandler, self).__init__(*args, **kwargs) - self.session.mount('{}://'.format(protocol or 'https'), https_adapter) - - def emit(self, record): - return super(FakeHTTPSHandler, self).emit(record) - - with mock.patch.object(AWXProxyHandler, 'get_handler_class') as mock_get_class: - mock_get_class.return_value = FakeHTTPSHandler - if exc: - with pytest.raises(exc) as e: - AWXProxyHandler().perform_test(settings) - assert str(e).endswith('%s: %s' % (status, reason)) - else: - assert AWXProxyHandler().perform_test(settings) is None - - -def test_https_logging_handler_logstash_auth_info(): - handler = HTTPSHandler(message_type='logstash', username='bob', password='ansible') - handler._add_auth_information() - assert isinstance(handler.session.auth, requests.auth.HTTPBasicAuth) - assert handler.session.auth.username == 'bob' - assert handler.session.auth.password == 'ansible' - - -def test_https_logging_handler_splunk_auth_info(): - handler = HTTPSHandler(message_type='splunk', password='ansible') - handler._add_auth_information() - assert handler.session.headers['Authorization'] == 'Splunk ansible' - assert handler.session.headers['Content-Type'] == 'application/json' - - -def test_https_logging_handler_connection_error(connection_error_adapter, - dummy_log_record): - handler = HTTPSHandler(host='127.0.0.1', message_type='logstash') - handler.setFormatter(LogstashFormatter()) - handler.session.mount('http://', connection_error_adapter) - - buff = StringIO() - logging.getLogger('awx.main.utils.handlers').addHandler( - logging.StreamHandler(buff) - ) - - async_futures = handler.emit(dummy_log_record) - with pytest.raises(requests.exceptions.ConnectionError): - [future.result() for future in async_futures] - assert 'failed to emit log to external aggregator\nTraceback' in buff.getvalue() - - # we should only log failures *periodically*, so causing *another* - # immediate failure shouldn't report a second ConnectionError - buff.truncate(0) - async_futures = handler.emit(dummy_log_record) - with pytest.raises(requests.exceptions.ConnectionError): - [future.result() for future in async_futures] - assert buff.getvalue() == '' - - -@pytest.mark.parametrize('message_type', ['logstash', 'splunk']) -def test_https_logging_handler_emit_without_cred(https_adapter, dummy_log_record, - message_type): - handler = HTTPSHandler(host='127.0.0.1', message_type=message_type) - handler.setFormatter(LogstashFormatter()) - handler.session.mount('https://', https_adapter) - async_futures = handler.emit(dummy_log_record) - [future.result() for future in async_futures] - - assert len(https_adapter.requests) == 1 - request = https_adapter.requests[0] - assert request.url == 'https://127.0.0.1/' - assert request.method == 'POST' - - if message_type == 'logstash': - # A username + password weren't used, so this header should be missing - assert 'Authorization' not in request.headers - - if message_type == 'splunk': - assert request.headers['Authorization'] == 'Splunk None' - - -def test_https_logging_handler_emit_logstash_with_creds(https_adapter, - dummy_log_record): - handler = HTTPSHandler(host='127.0.0.1', - username='user', password='pass', - message_type='logstash') - handler.setFormatter(LogstashFormatter()) - handler.session.mount('https://', https_adapter) - async_futures = handler.emit(dummy_log_record) - [future.result() for future in async_futures] - - assert len(https_adapter.requests) == 1 - request = https_adapter.requests[0] - assert request.headers['Authorization'] == 'Basic %s' % smart_str(base64.b64encode(b"user:pass")) - - -def test_https_logging_handler_emit_splunk_with_creds(https_adapter, - dummy_log_record): - handler = HTTPSHandler(host='127.0.0.1', - password='pass', message_type='splunk') - handler.setFormatter(LogstashFormatter()) - handler.session.mount('https://', https_adapter) - async_futures = handler.emit(dummy_log_record) - [future.result() for future in async_futures] - - assert len(https_adapter.requests) == 1 - request = https_adapter.requests[0] - assert request.headers['Authorization'] == 'Splunk pass' - - -@pytest.mark.parametrize('payload, encoded_payload', [ - ('foobar', 'foobar'), - ({'foo': 'bar'}, '{"foo": "bar"}'), - ({u'测试键': u'测试值'}, '{"测试键": "测试值"}'), -]) -def test_encode_payload_for_socket(payload, encoded_payload): - assert _encode_payload_for_socket(payload).decode('utf-8') == encoded_payload - - -def test_udp_handler_create_socket_at_init(): - handler = UDPHandler(host='127.0.0.1', port=4399) - assert hasattr(handler, 'socket') - assert isinstance(handler.socket, socket.socket) - assert handler.socket.family == socket.AF_INET - assert handler.socket.type == socket.SOCK_DGRAM - - -def test_udp_handler_send(dummy_log_record): - handler = UDPHandler(host='127.0.0.1', port=4399) - handler.setFormatter(LogstashFormatter()) - with mock.patch('awx.main.utils.handlers._encode_payload_for_socket', return_value="des") as encode_mock,\ - mock.patch.object(handler, 'socket') as socket_mock: - handler.emit(dummy_log_record) - encode_mock.assert_called_once_with(handler.format(dummy_log_record)) - socket_mock.sendto.assert_called_once_with("des", ('127.0.0.1', 4399)) - - -def test_tcp_handler_send(fake_socket, dummy_log_record): - handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5) - handler.setFormatter(LogstashFormatter()) - with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\ - mock.patch('select.select', return_value=([], [fake_socket], [])): - handler.emit(dummy_log_record) - sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM) - fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399)) - fake_socket.setblocking.assert_called_once_with(0) - fake_socket.send.assert_called_once_with(handler.format(dummy_log_record)) - fake_socket.close.assert_called_once() - - -def test_tcp_handler_return_if_socket_unavailable(fake_socket, dummy_log_record): - handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5) - handler.setFormatter(LogstashFormatter()) - with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\ - mock.patch('select.select', return_value=([], [], [])): - handler.emit(dummy_log_record) - sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM) - fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399)) - fake_socket.setblocking.assert_called_once_with(0) - assert not fake_socket.send.called - fake_socket.close.assert_called_once() - - -def test_tcp_handler_log_exception(fake_socket, dummy_log_record): - handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5) - handler.setFormatter(LogstashFormatter()) - with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\ - mock.patch('select.select', return_value=([], [], [])),\ - mock.patch('awx.main.utils.handlers.logger') as logger_mock: - fake_socket.connect.side_effect = Exception("foo") - handler.emit(dummy_log_record) - sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM) - logger_mock.exception.assert_called_once() - fake_socket.close.assert_called_once() - assert not fake_socket.send.called diff --git a/awx/main/utils/external_logging.py b/awx/main/utils/external_logging.py new file mode 100644 index 0000000000..26c13b195b --- /dev/null +++ b/awx/main/utils/external_logging.py @@ -0,0 +1,65 @@ +import urllib.parse as urlparse + +from django.conf import settings + +from awx.main.utils.reload import supervisor_service_command + + +def reconfigure_rsyslog(): + tmpl = '' + if settings.LOG_AGGREGATOR_ENABLED: + host = getattr(settings, 'LOG_AGGREGATOR_HOST', '') + port = getattr(settings, 'LOG_AGGREGATOR_PORT', '') + protocol = getattr(settings, 'LOG_AGGREGATOR_PROTOCOL', '') + + if protocol.startswith('http'): + scheme = 'https' + # urlparse requires '//' to be provided if scheme is not specified + original_parsed = urlparse.urlsplit(host) + if (not original_parsed.scheme and not host.startswith('//')) or original_parsed.hostname is None: + host = '%s://%s' % (scheme, host) if scheme else '//%s' % host + parsed = urlparse.urlsplit(host) + + host = parsed.hostname + try: + port = parsed.port or settings.LOG_AGGREGATOR_PORT + except ValueError: + port = settings.LOG_AGGREGATOR_PORT + + parts = [] + parts.extend([ + '$ModLoad imudp', + '$UDPServerRun 51414', + 'template(name="awx" type="string" string="%msg%")', + ]) + if protocol.startswith('http'): + # https://github.com/rsyslog/rsyslog-doc/blob/master/source/configuration/modules/omhttp.rst + ssl = "on" if parsed.scheme == 'https' else "off" + skip_verify = "off" if settings.LOG_AGGREGATOR_VERIFY_CERT else "on" + params = [ + 'type="omhttp"', + f'server="{host}"', + f'serverport="{port}"', + f'usehttps="{ssl}"', + f'skipverifyhost="{skip_verify}"', + 'action.resumeRetryCount="-1"', + 'template="awx"', + 'errorfile="/var/log/tower/external.err"', + ] + username = getattr(settings, 'LOG_AGGREGATOR_USERNAME', '') + password = getattr(settings, 'LOG_AGGREGATOR_PASSWORD', '') + if username: + params.append(f'uid="{username}"') + if password: + params.append(f'pwd="{password}"') + params = ' '.join(params) + parts.extend(['module(load="omhttp")', f'action({params})']) + else: + parts.append( + f'action(type="omfwd" target="{host}" port="{port}" protocol="{protocol}" action.resumeRetryCount="-1" template="awx")' # noqa + ) + tmpl = '\n'.join(parts) + + with open('/var/lib/awx/rsyslog.conf', 'w') as f: + f.write(tmpl + '\n') + supervisor_service_command(command='restart', service='awx-rsyslogd') diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index 515d410eb9..0640c14921 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -3,406 +3,10 @@ # Python import logging -import json -import os -import requests -import time -import threading -import socket -import select -from urllib import parse as urlparse -from concurrent.futures import ThreadPoolExecutor -from requests.exceptions import RequestException # Django from django.conf import settings -# requests futures, a dependency used by these handlers -from requests_futures.sessions import FuturesSession -import cachetools - -# AWX -from awx.main.utils.formatters import LogstashFormatter - - -__all__ = ['BaseHTTPSHandler', 'TCPHandler', 'UDPHandler', - 'AWXProxyHandler'] - - -logger = logging.getLogger('awx.main.utils.handlers') - - -# Translation of parameter names to names in Django settings -# logging settings category, only those related to handler / log emission -PARAM_NAMES = { - 'host': 'LOG_AGGREGATOR_HOST', - 'port': 'LOG_AGGREGATOR_PORT', - 'message_type': 'LOG_AGGREGATOR_TYPE', - 'username': 'LOG_AGGREGATOR_USERNAME', - 'password': 'LOG_AGGREGATOR_PASSWORD', - 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', - 'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT', - 'verify_cert': 'LOG_AGGREGATOR_VERIFY_CERT', - 'protocol': 'LOG_AGGREGATOR_PROTOCOL' -} - - -def unused_callback(sess, resp): - pass - - -class LoggingConnectivityException(Exception): - pass - - -class VerboseThreadPoolExecutor(ThreadPoolExecutor): - - last_log_emit = 0 - - def submit(self, func, *args, **kwargs): - def _wrapped(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception: - # If an exception occurs in a concurrent thread worker (like - # a ConnectionError or a read timeout), periodically log - # that failure. - # - # This approach isn't really thread-safe, so we could - # potentially log once per thread every 10 seconds, but it - # beats logging *every* failed HTTP request in a scenario where - # you've typo'd your log aggregator hostname. - now = time.time() - if now - self.last_log_emit > 10: - logger.exception('failed to emit log to external aggregator') - self.last_log_emit = now - raise - return super(VerboseThreadPoolExecutor, self).submit(_wrapped, *args, - **kwargs) - - -class SocketResult: - ''' - A class to be the return type of methods that send data over a socket - allows object to be used in the same way as a request futures object - ''' - def __init__(self, ok, reason=None): - self.ok = ok - self.reason = reason - - def result(self): - return self - - -class BaseHandler(logging.Handler): - def __init__(self, host=None, port=None, indv_facts=None, **kwargs): - super(BaseHandler, self).__init__() - self.host = host - self.port = port - self.indv_facts = indv_facts - - def _send(self, payload): - """Actually send message to log aggregator. - """ - return payload - - def _format_and_send_record(self, record): - if self.indv_facts: - return [self._send(json.loads(self.format(record)))] - return [self._send(self.format(record))] - - def emit(self, record): - """ - Emit a log record. Returns a list of zero or more - implementation-specific objects for tests. - """ - try: - return self._format_and_send_record(record) - except (KeyboardInterrupt, SystemExit): - raise - except Exception: - self.handleError(record) - - def _get_host(self, scheme='', hostname_only=False): - """Return the host name of log aggregator. - """ - host = self.host or '' - # urlparse requires '//' to be provided if scheme is not specified - original_parsed = urlparse.urlsplit(host) - if (not original_parsed.scheme and not host.startswith('//')) or original_parsed.hostname is None: - host = '%s://%s' % (scheme, host) if scheme else '//%s' % host - parsed = urlparse.urlsplit(host) - - if hostname_only: - return parsed.hostname - - try: - port = parsed.port or self.port - except ValueError: - port = self.port - netloc = parsed.netloc if port is None else '%s:%s' % (parsed.hostname, port) - - url_components = list(parsed) - url_components[1] = netloc - ret = urlparse.urlunsplit(url_components) - return ret.lstrip('/') - - -class BaseHTTPSHandler(BaseHandler): - ''' - Originally derived from python-logstash library - Non-blocking request accomplished by FuturesSession, similar - to the loggly-python-handler library - ''' - def _add_auth_information(self): - if self.message_type == 'logstash': - if not self.username: - # Logstash authentication not enabled - return - logstash_auth = requests.auth.HTTPBasicAuth(self.username, self.password) - self.session.auth = logstash_auth - elif self.message_type == 'splunk': - auth_header = "Splunk %s" % self.password - headers = { - "Authorization": auth_header, - "Content-Type": "application/json" - } - self.session.headers.update(headers) - - def __init__(self, fqdn=False, message_type=None, username=None, password=None, - tcp_timeout=5, verify_cert=True, **kwargs): - self.fqdn = fqdn - self.message_type = message_type - self.username = username - self.password = password - self.tcp_timeout = tcp_timeout - self.verify_cert = verify_cert - super(BaseHTTPSHandler, self).__init__(**kwargs) - self.session = FuturesSession(executor=VerboseThreadPoolExecutor( - max_workers=2 # this is the default used by requests_futures - )) - self._add_auth_information() - - def _get_post_kwargs(self, payload_input): - if self.message_type == 'splunk': - # Splunk needs data nested under key "event" - if not isinstance(payload_input, dict): - payload_input = json.loads(payload_input) - payload_input = {'event': payload_input} - if isinstance(payload_input, dict): - payload_str = json.dumps(payload_input) - else: - payload_str = payload_input - kwargs = dict(data=payload_str, background_callback=unused_callback, - timeout=self.tcp_timeout) - if self.verify_cert is False: - kwargs['verify'] = False - return kwargs - - - def _send(self, payload): - """See: - https://docs.python.org/3/library/concurrent.futures.html#future-objects - http://pythonhosted.org/futures/ - """ - return self.session.post(self._get_host(scheme='https'), - **self._get_post_kwargs(payload)) - - -def _encode_payload_for_socket(payload): - encoded_payload = payload - if isinstance(encoded_payload, dict): - encoded_payload = json.dumps(encoded_payload, ensure_ascii=False) - if isinstance(encoded_payload, str): - encoded_payload = encoded_payload.encode('utf-8') - return encoded_payload - - -class TCPHandler(BaseHandler): - def __init__(self, tcp_timeout=5, **kwargs): - self.tcp_timeout = tcp_timeout - super(TCPHandler, self).__init__(**kwargs) - - def _send(self, payload): - payload = _encode_payload_for_socket(payload) - sok = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - sok.connect((self._get_host(hostname_only=True), self.port or 0)) - sok.setblocking(0) - _, ready_to_send, _ = select.select([], [sok], [], float(self.tcp_timeout)) - if len(ready_to_send) == 0: - ret = SocketResult(False, "Socket currently busy, failed to send message") - logger.warning(ret.reason) - else: - sok.send(payload) - ret = SocketResult(True) # success! - except Exception as e: - ret = SocketResult(False, "Error sending message from %s: %s" % - (TCPHandler.__name__, - ' '.join(str(arg) for arg in e.args))) - logger.exception(ret.reason) - finally: - sok.close() - return ret - - -class UDPHandler(BaseHandler): - message = "Cannot determine if UDP messages are received." - - def __init__(self, **kwargs): - super(UDPHandler, self).__init__(**kwargs) - self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - def _send(self, payload): - payload = _encode_payload_for_socket(payload) - self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0)) - return SocketResult(True, reason=self.message) - - -class AWXNullHandler(logging.NullHandler): - ''' - Only additional this does is accept arbitrary __init__ params because - the proxy handler does not (yet) work with arbitrary handler classes - ''' - def __init__(self, *args, **kwargs): - super(AWXNullHandler, self).__init__() - - -HANDLER_MAPPING = { - 'https': BaseHTTPSHandler, - 'tcp': TCPHandler, - 'udp': UDPHandler, -} - - -TTLCache = cachetools.TTLCache - -if 'py.test' in os.environ.get('_', ''): - # don't cache settings in unit tests - class TTLCache(TTLCache): - - def __getitem__(self, item): - raise KeyError() - - -class AWXProxyHandler(logging.Handler): - ''' - Handler specific to the AWX external logging feature - - Will dynamically create a handler specific to the configured - protocol, and will create a new one automatically on setting change - - Managing parameters: - All parameters will get their value from settings as a default - if the parameter was either provided on init, or set manually, - this value will take precedence. - Parameters match same parameters in the actualized handler classes. - ''' - - thread_local = threading.local() - _auditor = None - - def __init__(self, **kwargs): - # TODO: process 'level' kwarg - super(AWXProxyHandler, self).__init__(**kwargs) - self._handler = None - self._old_kwargs = {} - - @property - def auditor(self): - if not self._auditor: - self._auditor = logging.handlers.RotatingFileHandler( - filename='/var/log/tower/external.log', - maxBytes=1024 * 1024 * 50, # 50 MB - backupCount=5, - ) - - class WritableLogstashFormatter(LogstashFormatter): - @classmethod - def serialize(cls, message): - return json.dumps(message) - - self._auditor.setFormatter(WritableLogstashFormatter()) - return self._auditor - - def get_handler_class(self, protocol): - return HANDLER_MAPPING.get(protocol, AWXNullHandler) - - @cachetools.cached(cache=TTLCache(maxsize=1, ttl=3), key=lambda *args, **kw: 'get_handler') - def get_handler(self, custom_settings=None, force_create=False): - new_kwargs = {} - use_settings = custom_settings or settings - for field_name, setting_name in PARAM_NAMES.items(): - val = getattr(use_settings, setting_name, None) - if val is None: - continue - new_kwargs[field_name] = val - if new_kwargs == self._old_kwargs and self._handler and (not force_create): - # avoids re-creating session objects, and other such things - return self._handler - self._old_kwargs = new_kwargs.copy() - # TODO: remove any kwargs no applicable to that particular handler - protocol = new_kwargs.pop('protocol', None) - HandlerClass = self.get_handler_class(protocol) - # cleanup old handler and make new one - if self._handler: - self._handler.close() - logger.debug('Creating external log handler due to startup or settings change.') - self._handler = HandlerClass(**new_kwargs) - if self.formatter: - # self.format(record) is called inside of emit method - # so not safe to assume this can be handled within self - self._handler.setFormatter(self.formatter) - return self._handler - - @cachetools.cached(cache=TTLCache(maxsize=1, ttl=3), key=lambda *args, **kw: 'should_audit') - def should_audit(self): - return settings.LOG_AGGREGATOR_AUDIT - - def emit(self, record): - if AWXProxyHandler.thread_local.enabled: - actual_handler = self.get_handler() - if self.should_audit(): - self.auditor.setLevel(settings.LOG_AGGREGATOR_LEVEL) - self.auditor.emit(record) - return actual_handler.emit(record) - - def perform_test(self, custom_settings): - """ - Tests logging connectivity for given settings module. - @raises LoggingConnectivityException - """ - handler = self.get_handler(custom_settings=custom_settings, force_create=True) - handler.setFormatter(LogstashFormatter()) - logger = logging.getLogger(__file__) - fn, lno, func, _ = logger.findCaller() - record = logger.makeRecord('awx', 10, fn, lno, - 'AWX Connection Test', tuple(), - None, func) - futures = handler.emit(record) - for future in futures: - try: - resp = future.result() - if not resp.ok: - if isinstance(resp, SocketResult): - raise LoggingConnectivityException( - 'Socket error: {}'.format(resp.reason or '') - ) - else: - raise LoggingConnectivityException( - ': '.join([str(resp.status_code), resp.reason or '']) - ) - except RequestException as e: - raise LoggingConnectivityException(str(e)) - - @classmethod - def disable(cls): - cls.thread_local.enabled = False - - -AWXProxyHandler.thread_local.enabled = True - - ColorHandler = logging.StreamHandler if settings.COLOR_LOGS is True: diff --git a/awx/main/utils/reload.py b/awx/main/utils/reload.py index bdfcc0dcc9..7c8ff4c999 100644 --- a/awx/main/utils/reload.py +++ b/awx/main/utils/reload.py @@ -11,18 +11,13 @@ from django.conf import settings logger = logging.getLogger('awx.main.utils.reload') -def _supervisor_service_command(command, communicate=True): +def supervisor_service_command(command, service='*', communicate=True): ''' example use pattern of supervisorctl: # supervisorctl restart tower-processes:receiver tower-processes:factcacher ''' - group_name = 'tower-processes' - if settings.DEBUG: - group_name = 'awx-processes' args = ['supervisorctl'] - if settings.DEBUG: - args.extend(['-c', '/supervisor.conf']) - args.extend([command, '{}:*'.format(group_name)]) + args.extend([command, ':'.join(['tower-processes', service])]) logger.debug('Issuing command to {} services, args={}'.format(command, args)) supervisor_process = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -41,4 +36,4 @@ def _supervisor_service_command(command, communicate=True): def stop_local_services(communicate=True): logger.warn('Stopping services on this node in response to user action') - _supervisor_service_command(command='stop', communicate=communicate) + supervisor_service_command(command='stop', communicate=communicate) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 53a72f369f..a53e65bfad 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1011,8 +1011,9 @@ LOGGING = { 'formatter': 'simple', }, 'external_logger': { - 'class': 'awx.main.utils.handlers.AWXProxyHandler', + 'class': 'logging.handlers.SysLogHandler', 'formatter': 'json', + 'address': ('localhost', 51414), 'filters': ['external_log_enabled', 'dynamic_level_filter'], }, 'tower_warnings': { diff --git a/awx/ui/client/src/configuration/forms/system-form/sub-forms/system-logging.form.js b/awx/ui/client/src/configuration/forms/system-form/sub-forms/system-logging.form.js index 388cfb00f8..dc57e3ece5 100644 --- a/awx/ui/client/src/configuration/forms/system-form/sub-forms/system-logging.form.js +++ b/awx/ui/client/src/configuration/forms/system-form/sub-forms/system-logging.form.js @@ -78,7 +78,7 @@ ngClick: 'vm.testLogging()', label: i18n._('Test'), class: 'btn-primary', - ngDisabled: 'configuration_logging_template_form.$invalid' + ngDisabled: 'configuration_logging_template_form.$pending' }, cancel: { ngClick: 'vm.formCancel()', diff --git a/docs/licenses/requests-futures.txt b/docs/licenses/requests-futures.txt deleted file mode 100644 index 2a2a0c6288..0000000000 --- a/docs/licenses/requests-futures.txt +++ /dev/null @@ -1,13 +0,0 @@ -Copyright 2013 Ross McFarland - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/requirements/requirements.in b/requirements/requirements.in index a1869c8978..c847496844 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -42,7 +42,6 @@ social-auth-core==3.3.1 # see UPGRADE BLOCKERs social-auth-app-django==3.1.0 # see UPGRADE BLOCKERs redis requests -requests-futures # see library notes slackclient==1.1.2 # see UPGRADE BLOCKERs tacacs_plus==1.0 # UPGRADE BLOCKER: auth does not work with later versions twilio diff --git a/tools/docker-compose/Dockerfile b/tools/docker-compose/Dockerfile index 0ec22e499d..f59f466a6f 100644 --- a/tools/docker-compose/Dockerfile +++ b/tools/docker-compose/Dockerfile @@ -102,6 +102,10 @@ RUN cd /usr/local/bin && \ curl -L https://github.com/openshift/origin/releases/download/v3.9.0/openshift-origin-client-tools-v3.9.0-191fece-linux-64bit.tar.gz | \ tar -xz --strip-components=1 --wildcards --no-anchored 'oc' +ADD tools/docker-compose/rsyslog.repo /etc/yum.repos.d/ +RUN yum install -y rsyslog-omhttp +RUN echo '$IncludeConfig /var/lib/awx/rsyslog.conf' >> /etc/rsyslog.conf + RUN dnf -y clean all && rm -rf /root/.cache # https://github.com/ansible/awx/issues/5224 diff --git a/tools/docker-compose/rsyslog.repo b/tools/docker-compose/rsyslog.repo new file mode 100644 index 0000000000..4cc2a35a42 --- /dev/null +++ b/tools/docker-compose/rsyslog.repo @@ -0,0 +1,7 @@ +[rsyslog_v8] +name=Adiscon CentOS-$releasever - local packages for $basearch +baseurl=http://rpms.adiscon.com/v8-stable/epel-$releasever/$basearch +enabled=1 +gpgcheck=0 +gpgkey=http://rpms.adiscon.com/RPM-GPG-KEY-Adiscon +protect=1 diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 6831d80203..2a39e0fba6 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -71,8 +71,20 @@ redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 +[program:awx-rsyslogd] +command = rsyslogd -nd -i /awx_devel/rsyslog.pid +autostart = true +autorestart = true +stopwaitsecs = 1 +stopsignal=KILL +stopasgroup=true +killasgroup=true +redirect_stderr=true +stdout_logfile=/tmp/ryan +stdout_logfile_maxbytes=0 + [group:tower-processes] -programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsbroadcast +programs=awx-dispatcher,awx-receiver,awx-runworker,awx-uwsgi,awx-daphne,awx-nginx,awx-wsbroadcast,awx-rsyslogd priority=5 [unix_http_server]