mirror of
https://github.com/ansible/awx.git
synced 2026-01-17 12:41:19 -03:30
Merge pull request #1630 from AlanCoding/no_restart_try2
Replace service restart with new handler changing dynamically with settings
This commit is contained in:
commit
72655961e4
2
Makefile
2
Makefile
@ -296,7 +296,7 @@ uwsgi: collectstatic
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
. $(VENV_BASE)/awx/bin/activate; \
|
||||
fi; \
|
||||
uwsgi -b 32768 --socket 127.0.0.1:8050 --module=awx.wsgi:application --home=/venv/awx --chdir=/awx_devel/ --vacuum --processes=5 --harakiri=120 --master --no-orphans --py-autoreload 1 --max-requests=1000 --stats /tmp/stats.socket --master-fifo=/awxfifo --lazy-apps --logformat "%(addr) %(method) %(uri) - %(proto) %(status)" --hook-accepting1-once="exec:/bin/sh -c '[ -f /tmp/celery_pid ] && kill -1 `cat /tmp/celery_pid` || true'"
|
||||
uwsgi -b 32768 --socket 127.0.0.1:8050 --module=awx.wsgi:application --home=/venv/awx --chdir=/awx_devel/ --vacuum --processes=5 --harakiri=120 --master --no-orphans --py-autoreload 1 --max-requests=1000 --stats /tmp/stats.socket --lazy-apps --logformat "%(addr) %(method) %(uri) - %(proto) %(status)" --hook-accepting1-once="exec:/bin/sh -c '[ -f /tmp/celery_pid ] && kill -1 `cat /tmp/celery_pid` || true'"
|
||||
|
||||
daphne:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
|
||||
@ -2,8 +2,6 @@
|
||||
from django.apps import AppConfig
|
||||
# from django.core import checks
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from awx.main.utils.handlers import configure_external_logger
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
class ConfConfig(AppConfig):
|
||||
@ -15,4 +13,3 @@ class ConfConfig(AppConfig):
|
||||
self.module.autodiscover()
|
||||
from .settings import SettingsWrapper
|
||||
SettingsWrapper.initialize()
|
||||
configure_external_logger(settings)
|
||||
|
||||
@ -338,13 +338,14 @@ def test_setting_singleton_delete_no_read_only_fields(api_request, dummy_setting
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_setting_logging_test(api_request):
|
||||
with mock.patch('awx.conf.views.BaseHTTPSHandler.perform_test') as mock_func:
|
||||
with mock.patch('awx.conf.views.AWXProxyHandler.perform_test') as mock_func:
|
||||
api_request(
|
||||
'post',
|
||||
reverse('api:setting_logging_test'),
|
||||
data={'LOG_AGGREGATOR_HOST': 'http://foobar', 'LOG_AGGREGATOR_TYPE': 'logstash'}
|
||||
)
|
||||
test_arguments = mock_func.call_args[0][0]
|
||||
assert test_arguments.LOG_AGGREGATOR_HOST == 'http://foobar'
|
||||
assert test_arguments.LOG_AGGREGATOR_TYPE == 'logstash'
|
||||
assert test_arguments.LOG_AGGREGATOR_LEVEL == 'DEBUG'
|
||||
call = mock_func.call_args_list[0]
|
||||
args, kwargs = call
|
||||
given_settings = kwargs['custom_settings']
|
||||
assert given_settings.LOG_AGGREGATOR_HOST == 'http://foobar'
|
||||
assert given_settings.LOG_AGGREGATOR_TYPE == 'logstash'
|
||||
|
||||
@ -21,7 +21,7 @@ from awx.api.generics import * # noqa
|
||||
from awx.api.permissions import IsSuperUser
|
||||
from awx.api.versioning import reverse, get_request_version
|
||||
from awx.main.utils import * # noqa
|
||||
from awx.main.utils.handlers import BaseHTTPSHandler, UDPHandler, LoggingConnectivityException
|
||||
from awx.main.utils.handlers import AWXProxyHandler, LoggingConnectivityException
|
||||
from awx.main.tasks import handle_setting_changes
|
||||
from awx.conf.license import get_licensed_features
|
||||
from awx.conf.models import Setting
|
||||
@ -198,12 +198,9 @@ class SettingLoggingTest(GenericAPIView):
|
||||
mock_settings = MockSettings()
|
||||
for k, v in serializer.validated_data.items():
|
||||
setattr(mock_settings, k, v)
|
||||
mock_settings.LOG_AGGREGATOR_LEVEL = 'DEBUG'
|
||||
AWXProxyHandler().perform_test(custom_settings=mock_settings)
|
||||
if mock_settings.LOG_AGGREGATOR_PROTOCOL.upper() == 'UDP':
|
||||
UDPHandler.perform_test(mock_settings)
|
||||
return Response(status=status.HTTP_201_CREATED)
|
||||
else:
|
||||
BaseHTTPSHandler.perform_test(mock_settings)
|
||||
except LoggingConnectivityException as e:
|
||||
return Response({'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||||
return Response(status=status.HTTP_200_OK)
|
||||
|
||||
@ -29,7 +29,7 @@ except Exception:
|
||||
|
||||
# Celery
|
||||
from celery import Task, shared_task, Celery
|
||||
from celery.signals import celeryd_init, worker_process_init, worker_shutdown, worker_ready, celeryd_after_setup
|
||||
from celery.signals import celeryd_init, worker_shutdown, worker_ready, celeryd_after_setup
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
@ -59,10 +59,9 @@ from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field,
|
||||
wrap_args_with_proot, OutputEventFilter, OutputVerboseFilter, ignore_inventory_computed_fields,
|
||||
ignore_inventory_group_removal, get_type_for_model, extract_ansible_vars)
|
||||
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
||||
from awx.main.utils.reload import restart_local_services, stop_local_services
|
||||
from awx.main.utils.reload import stop_local_services
|
||||
from awx.main.utils.pglock import advisory_lock
|
||||
from awx.main.utils.ha import update_celery_worker_routes, register_celery_worker_queues
|
||||
from awx.main.utils.handlers import configure_external_logger
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
from awx.conf import settings_registry
|
||||
|
||||
@ -117,15 +116,6 @@ def celery_startup(conf=None, **kwargs):
|
||||
logger.exception(six.text_type("Failed to rebuild schedule {}.").format(sch))
|
||||
|
||||
|
||||
@worker_process_init.connect
|
||||
def task_set_logger_pre_run(*args, **kwargs):
|
||||
try:
|
||||
cache.close()
|
||||
configure_external_logger(settings, is_startup=False)
|
||||
except Exception:
|
||||
logger.exception('Encountered error on initial log configuration.')
|
||||
|
||||
|
||||
@worker_shutdown.connect
|
||||
def inform_cluster_of_shutdown(*args, **kwargs):
|
||||
try:
|
||||
@ -200,10 +190,6 @@ def handle_setting_changes(self, setting_keys):
|
||||
cache_keys = set(setting_keys)
|
||||
logger.debug('cache delete_many(%r)', cache_keys)
|
||||
cache.delete_many(cache_keys)
|
||||
for key in cache_keys:
|
||||
if key.startswith('LOG_AGGREGATOR_'):
|
||||
restart_local_services(['uwsgi', 'celery', 'beat', 'callback'])
|
||||
break
|
||||
|
||||
|
||||
@shared_task(bind=True, exchange='tower_broadcast_all')
|
||||
|
||||
@ -14,7 +14,7 @@ import mock
|
||||
# AWX
|
||||
from awx.api.versioning import reverse
|
||||
from awx.conf.models import Setting
|
||||
from awx.main.utils.handlers import BaseHTTPSHandler, LoggingConnectivityException
|
||||
from awx.main.utils.handlers import AWXProxyHandler, LoggingConnectivityException
|
||||
|
||||
import six
|
||||
|
||||
@ -217,7 +217,7 @@ def test_logging_aggregrator_connection_test_bad_request(get, post, admin, key):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_logging_aggregrator_connection_test_valid(mocker, get, post, admin):
|
||||
with mock.patch.object(BaseHTTPSHandler, 'perform_test') as perform_test:
|
||||
with mock.patch.object(AWXProxyHandler, 'perform_test') as perform_test:
|
||||
url = reverse('api:setting_logging_test')
|
||||
user_data = {
|
||||
'LOG_AGGREGATOR_TYPE': 'logstash',
|
||||
@ -227,7 +227,8 @@ def test_logging_aggregrator_connection_test_valid(mocker, get, post, admin):
|
||||
'LOG_AGGREGATOR_PASSWORD': 'mcstash'
|
||||
}
|
||||
post(url, user_data, user=admin, expect=200)
|
||||
create_settings = perform_test.call_args[0][0]
|
||||
args, kwargs = perform_test.call_args_list[0]
|
||||
create_settings = kwargs['custom_settings']
|
||||
for k, v in user_data.items():
|
||||
assert hasattr(create_settings, k)
|
||||
assert getattr(create_settings, k) == v
|
||||
@ -238,7 +239,7 @@ def test_logging_aggregrator_connection_test_with_masked_password(mocker, patch,
|
||||
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'logging'})
|
||||
patch(url, user=admin, data={'LOG_AGGREGATOR_PASSWORD': 'password123'}, expect=200)
|
||||
|
||||
with mock.patch.object(BaseHTTPSHandler, 'perform_test') as perform_test:
|
||||
with mock.patch.object(AWXProxyHandler, 'perform_test') as perform_test:
|
||||
url = reverse('api:setting_logging_test')
|
||||
user_data = {
|
||||
'LOG_AGGREGATOR_TYPE': 'logstash',
|
||||
@ -248,13 +249,14 @@ def test_logging_aggregrator_connection_test_with_masked_password(mocker, patch,
|
||||
'LOG_AGGREGATOR_PASSWORD': '$encrypted$'
|
||||
}
|
||||
post(url, user_data, user=admin, expect=200)
|
||||
create_settings = perform_test.call_args[0][0]
|
||||
args, kwargs = perform_test.call_args_list[0]
|
||||
create_settings = kwargs['custom_settings']
|
||||
assert getattr(create_settings, 'LOG_AGGREGATOR_PASSWORD') == 'password123'
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_logging_aggregrator_connection_test_invalid(mocker, get, post, admin):
|
||||
with mock.patch.object(BaseHTTPSHandler, 'perform_test') as perform_test:
|
||||
with mock.patch.object(AWXProxyHandler, 'perform_test') as perform_test:
|
||||
perform_test.side_effect = LoggingConnectivityException('404: Not Found')
|
||||
url = reverse('api:setting_logging_test')
|
||||
resp = post(url, {
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import pytest
|
||||
import logging
|
||||
|
||||
from mock import PropertyMock
|
||||
|
||||
@ -7,3 +8,16 @@ from mock import PropertyMock
|
||||
def _disable_database_settings(mocker):
|
||||
m = mocker.patch('awx.conf.settings.SettingsWrapper.all_supported_settings', new_callable=PropertyMock)
|
||||
m.return_value = []
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def dummy_log_record():
|
||||
return logging.LogRecord(
|
||||
'awx', # logger name
|
||||
20, # loglevel INFO
|
||||
'./awx/some/module.py', # pathname
|
||||
100, # lineno
|
||||
'User joe logged in', # msg
|
||||
tuple(), # args,
|
||||
None # exc_info
|
||||
)
|
||||
|
||||
@ -5,7 +5,7 @@ import mock
|
||||
from collections import namedtuple
|
||||
|
||||
# AWX
|
||||
from awx.main.utils.filters import SmartFilter
|
||||
from awx.main.utils.filters import SmartFilter, ExternalLoggerEnabled
|
||||
|
||||
# Django
|
||||
from django.db.models import Q
|
||||
@ -13,6 +13,37 @@ from django.db.models import Q
|
||||
import six
|
||||
|
||||
|
||||
@pytest.mark.parametrize('params, logger_name, expected', [
|
||||
# skip all records if enabled_flag = False
|
||||
({'enabled_flag': False}, 'awx.main', False),
|
||||
# skip all records if the host is undefined
|
||||
({'enabled_flag': True}, 'awx.main', False),
|
||||
# skip all records if underlying logger is used by handlers themselves
|
||||
({'enabled_flag': True}, 'awx.main.utils.handlers', False),
|
||||
({'enabled_flag': True, 'enabled_loggers': ['awx']}, 'awx.main', True),
|
||||
({'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', False),
|
||||
({'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', True),
|
||||
])
|
||||
def test_base_logging_handler_skip_log(params, logger_name, expected, dummy_log_record):
|
||||
filter = ExternalLoggerEnabled(**params)
|
||||
dummy_log_record.name = logger_name
|
||||
assert filter.filter(dummy_log_record) is expected, (params, logger_name)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('level, expect', [
|
||||
(30, True), # warning
|
||||
(20, False) # info
|
||||
])
|
||||
def test_log_configurable_severity(level, expect, dummy_log_record):
|
||||
dummy_log_record.levelno = level
|
||||
filter = ExternalLoggerEnabled(
|
||||
enabled_flag=True,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'],
|
||||
lvl='WARNING'
|
||||
)
|
||||
assert filter.filter(dummy_log_record) is expect
|
||||
|
||||
|
||||
Field = namedtuple('Field', 'name')
|
||||
Meta = namedtuple('Meta', 'fields')
|
||||
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import base64
|
||||
import cStringIO
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import datetime
|
||||
@ -10,7 +9,6 @@ from uuid import uuid4
|
||||
|
||||
import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.conf import LazySettings
|
||||
import pytest
|
||||
import requests
|
||||
@ -18,23 +16,11 @@ from requests_futures.sessions import FuturesSession
|
||||
|
||||
from awx.main.utils.handlers import (BaseHandler, BaseHTTPSHandler as HTTPSHandler,
|
||||
TCPHandler, UDPHandler, _encode_payload_for_socket,
|
||||
PARAM_NAMES, LoggingConnectivityException)
|
||||
PARAM_NAMES, LoggingConnectivityException,
|
||||
AWXProxyHandler)
|
||||
from awx.main.utils.formatters import LogstashFormatter
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def dummy_log_record():
|
||||
return logging.LogRecord(
|
||||
'awx', # logger name
|
||||
20, # loglevel INFO
|
||||
'./awx/some/module.py', # pathname
|
||||
100, # lineno
|
||||
'User joe logged in', # msg
|
||||
tuple(), # args,
|
||||
None # exc_info
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def http_adapter():
|
||||
class FakeHTTPAdapter(requests.adapters.HTTPAdapter):
|
||||
@ -80,105 +66,91 @@ def test_https_logging_handler_requests_async_implementation():
|
||||
|
||||
|
||||
def test_https_logging_handler_has_default_http_timeout():
|
||||
handler = HTTPSHandler.from_django_settings(settings)
|
||||
handler = TCPHandler()
|
||||
assert handler.tcp_timeout == 5
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||
@pytest.mark.parametrize('param', ['host', 'port', 'indv_facts'])
|
||||
def test_base_logging_handler_defaults(param):
|
||||
handler = BaseHandler()
|
||||
assert hasattr(handler, param) and getattr(handler, param) is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||
@pytest.mark.parametrize('param', ['host', 'port', 'indv_facts'])
|
||||
def test_base_logging_handler_kwargs(param):
|
||||
handler = BaseHandler(**{param: 'EXAMPLE'})
|
||||
assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param, django_settings_name', PARAM_NAMES.items())
|
||||
def test_base_logging_handler_from_django_settings(param, django_settings_name):
|
||||
@pytest.mark.parametrize('params', [
|
||||
{
|
||||
'LOG_AGGREGATOR_HOST': 'https://server.invalid',
|
||||
'LOG_AGGREGATOR_PORT': 22222,
|
||||
'LOG_AGGREGATOR_TYPE': 'loggly',
|
||||
'LOG_AGGREGATOR_USERNAME': 'foo',
|
||||
'LOG_AGGREGATOR_PASSWORD': 'bar',
|
||||
'LOG_AGGREGATOR_INDIVIDUAL_FACTS': True,
|
||||
'LOG_AGGREGATOR_TCP_TIMEOUT': 96,
|
||||
'LOG_AGGREGATOR_VERIFY_CERT': False,
|
||||
'LOG_AGGREGATOR_PROTOCOL': 'https'
|
||||
},
|
||||
{
|
||||
'LOG_AGGREGATOR_HOST': 'https://server.invalid',
|
||||
'LOG_AGGREGATOR_PORT': 22222,
|
||||
'LOG_AGGREGATOR_PROTOCOL': 'udp'
|
||||
}
|
||||
])
|
||||
def test_real_handler_from_django_settings(params):
|
||||
settings = LazySettings()
|
||||
settings.configure(**params)
|
||||
handler = AWXProxyHandler().get_handler(custom_settings=settings)
|
||||
# need the _reverse_ dictionary from PARAM_NAMES
|
||||
attr_lookup = {}
|
||||
for attr_name, setting_name in PARAM_NAMES.items():
|
||||
attr_lookup[setting_name] = attr_name
|
||||
for setting_name, val in params.items():
|
||||
attr_name = attr_lookup[setting_name]
|
||||
if attr_name == 'protocol':
|
||||
continue
|
||||
assert hasattr(handler, attr_name)
|
||||
|
||||
|
||||
def test_invalid_kwarg_to_real_handler():
|
||||
settings = LazySettings()
|
||||
settings.configure(**{
|
||||
django_settings_name: 'EXAMPLE'
|
||||
'LOG_AGGREGATOR_HOST': 'https://server.invalid',
|
||||
'LOG_AGGREGATOR_PORT': 22222,
|
||||
'LOG_AGGREGATOR_PROTOCOL': 'udp',
|
||||
'LOG_AGGREGATOR_VERIFY_CERT': False # setting not valid for UDP handler
|
||||
})
|
||||
handler = BaseHandler.from_django_settings(settings)
|
||||
assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE'
|
||||
handler = AWXProxyHandler().get_handler(custom_settings=settings)
|
||||
assert not hasattr(handler, 'verify_cert')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('params, logger_name, expected', [
|
||||
# skip all records if enabled_flag = False
|
||||
({'enabled_flag': False}, 'awx.main', True),
|
||||
# skip all records if the host is undefined
|
||||
({'host': '', 'enabled_flag': True}, 'awx.main', True),
|
||||
# skip all records if underlying logger is used by handlers themselves
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main.utils.handlers', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main', False),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', False),
|
||||
])
|
||||
def test_base_logging_handler_skip_log(params, logger_name, expected):
|
||||
handler = BaseHandler(**params)
|
||||
assert handler._skip_log(logger_name) is expected
|
||||
|
||||
|
||||
def test_base_logging_handler_emit(dummy_log_record):
|
||||
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
def test_base_logging_handler_emit_system_tracking(dummy_log_record):
|
||||
handler = BaseHandler(host='127.0.0.1', indv_facts=True)
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
sent_payloads = handler.emit(dummy_log_record)
|
||||
|
||||
assert len(sent_payloads) == 1
|
||||
body = json.loads(sent_payloads[0])
|
||||
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx'
|
||||
assert body['message'] == 'User joe logged in'
|
||||
|
||||
|
||||
def test_base_logging_handler_ignore_low_severity_msg(dummy_log_record):
|
||||
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', lvl='WARNING',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
sent_payloads = handler.emit(dummy_log_record)
|
||||
assert len(sent_payloads) == 0
|
||||
|
||||
|
||||
def test_base_logging_handler_emit_system_tracking():
|
||||
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', indv_facts=True, lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
record = logging.LogRecord(
|
||||
'awx.analytics.system_tracking', # logger name
|
||||
20, # loglevel INFO
|
||||
'./awx/some/module.py', # pathname
|
||||
100, # lineno
|
||||
None, # msg
|
||||
tuple(), # args,
|
||||
None # exc_info
|
||||
)
|
||||
record.inventory_id = 11
|
||||
record.host_name = 'my_lucky_host'
|
||||
record.job_id = 777
|
||||
record.ansible_facts = {
|
||||
dummy_log_record.name = 'awx.analytics.system_tracking'
|
||||
dummy_log_record.msg = None
|
||||
dummy_log_record.inventory_id = 11
|
||||
dummy_log_record.host_name = 'my_lucky_host'
|
||||
dummy_log_record.job_id = 777
|
||||
dummy_log_record.ansible_facts = {
|
||||
"ansible_kernel": "4.4.66-boot2docker",
|
||||
"ansible_machine": "x86_64",
|
||||
"ansible_swapfree_mb": 4663,
|
||||
}
|
||||
record.ansible_facts_modified = datetime.datetime.now(tzutc()).isoformat()
|
||||
sent_payloads = handler.emit(record)
|
||||
dummy_log_record.ansible_facts_modified = datetime.datetime.now(tzutc()).isoformat()
|
||||
sent_payloads = handler.emit(dummy_log_record)
|
||||
|
||||
assert len(sent_payloads) == 1
|
||||
assert sent_payloads[0]['ansible_facts'] == record.ansible_facts
|
||||
assert sent_payloads[0]['ansible_facts_modified'] == record.ansible_facts_modified
|
||||
assert sent_payloads[0]['ansible_facts'] == dummy_log_record.ansible_facts
|
||||
assert sent_payloads[0]['ansible_facts_modified'] == dummy_log_record.ansible_facts_modified
|
||||
assert sent_payloads[0]['level'] == 'INFO'
|
||||
assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert sent_payloads[0]['job_id'] == record.job_id
|
||||
assert sent_payloads[0]['inventory_id'] == record.inventory_id
|
||||
assert sent_payloads[0]['host_name'] == record.host_name
|
||||
assert sent_payloads[0]['job_id'] == dummy_log_record.job_id
|
||||
assert sent_payloads[0]['inventory_id'] == dummy_log_record.inventory_id
|
||||
assert sent_payloads[0]['host_name'] == dummy_log_record.host_name
|
||||
|
||||
|
||||
@pytest.mark.parametrize('host, port, normalized, hostname_only', [
|
||||
@ -236,16 +208,18 @@ def test_https_logging_handler_connectivity_test(http_adapter, status, reason, e
|
||||
def emit(self, record):
|
||||
return super(FakeHTTPSHandler, self).emit(record)
|
||||
|
||||
if exc:
|
||||
with pytest.raises(exc) as e:
|
||||
FakeHTTPSHandler.perform_test(settings)
|
||||
assert str(e).endswith('%s: %s' % (status, reason))
|
||||
else:
|
||||
assert FakeHTTPSHandler.perform_test(settings) is None
|
||||
with mock.patch.object(AWXProxyHandler, 'get_handler_class') as mock_get_class:
|
||||
mock_get_class.return_value = FakeHTTPSHandler
|
||||
if exc:
|
||||
with pytest.raises(exc) as e:
|
||||
AWXProxyHandler().perform_test(settings)
|
||||
assert str(e).endswith('%s: %s' % (status, reason))
|
||||
else:
|
||||
assert AWXProxyHandler().perform_test(settings) is None
|
||||
|
||||
|
||||
def test_https_logging_handler_logstash_auth_info():
|
||||
handler = HTTPSHandler(message_type='logstash', username='bob', password='ansible', lvl='INFO')
|
||||
handler = HTTPSHandler(message_type='logstash', username='bob', password='ansible')
|
||||
handler._add_auth_information()
|
||||
assert isinstance(handler.session.auth, requests.auth.HTTPBasicAuth)
|
||||
assert handler.session.auth.username == 'bob'
|
||||
@ -261,9 +235,7 @@ def test_https_logging_handler_splunk_auth_info():
|
||||
|
||||
def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = HTTPSHandler(host='127.0.0.1', message_type='logstash')
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', connection_error_adapter)
|
||||
|
||||
@ -289,9 +261,7 @@ def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
||||
def test_https_logging_handler_emit_without_cred(http_adapter, dummy_log_record,
|
||||
message_type):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type=message_type, lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = HTTPSHandler(host='127.0.0.1', message_type=message_type)
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
@ -312,10 +282,9 @@ def test_https_logging_handler_emit_without_cred(http_adapter, dummy_log_record,
|
||||
|
||||
def test_https_logging_handler_emit_logstash_with_creds(http_adapter,
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
handler = HTTPSHandler(host='127.0.0.1',
|
||||
username='user', password='pass',
|
||||
message_type='logstash', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
message_type='logstash')
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
@ -328,9 +297,8 @@ def test_https_logging_handler_emit_logstash_with_creds(http_adapter,
|
||||
|
||||
def test_https_logging_handler_emit_splunk_with_creds(http_adapter,
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
password='pass', message_type='splunk', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = HTTPSHandler(host='127.0.0.1',
|
||||
password='pass', message_type='splunk')
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
async_futures = handler.emit(dummy_log_record)
|
||||
@ -351,9 +319,7 @@ def test_encode_payload_for_socket(payload, encoded_payload):
|
||||
|
||||
|
||||
def test_udp_handler_create_socket_at_init():
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399,
|
||||
enabled_flag=True, message_type='splunk', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399)
|
||||
assert hasattr(handler, 'socket')
|
||||
assert isinstance(handler.socket, socket.socket)
|
||||
assert handler.socket.family == socket.AF_INET
|
||||
@ -361,9 +327,7 @@ def test_udp_handler_create_socket_at_init():
|
||||
|
||||
|
||||
def test_udp_handler_send(dummy_log_record):
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399,
|
||||
enabled_flag=True, message_type='splunk', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399)
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('awx.main.utils.handlers._encode_payload_for_socket', return_value="des") as encode_mock,\
|
||||
mock.patch.object(handler, 'socket') as socket_mock:
|
||||
@ -373,9 +337,7 @@ def test_udp_handler_send(dummy_log_record):
|
||||
|
||||
|
||||
def test_tcp_handler_send(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5)
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [fake_socket], [])):
|
||||
@ -388,9 +350,7 @@ def test_tcp_handler_send(fake_socket, dummy_log_record):
|
||||
|
||||
|
||||
def test_tcp_handler_return_if_socket_unavailable(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5)
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [], [])):
|
||||
@ -403,9 +363,7 @@ def test_tcp_handler_return_if_socket_unavailable(fake_socket, dummy_log_record)
|
||||
|
||||
|
||||
def test_tcp_handler_log_exception(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk', lvl='INFO',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5)
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [], [])),\
|
||||
|
||||
@ -13,31 +13,3 @@ def test_produce_supervisor_command(mocker):
|
||||
['supervisorctl', 'restart', 'tower-processes:receiver',],
|
||||
stderr=-1, stdin=-1, stdout=-1)
|
||||
|
||||
|
||||
def test_routing_of_service_restarts_works(mocker):
|
||||
'''
|
||||
This tests that the parent restart method will call the appropriate
|
||||
service restart methods, depending on which services are given in args
|
||||
'''
|
||||
with mocker.patch.object(reload, '_uwsgi_fifo_command'),\
|
||||
mocker.patch.object(reload, '_reset_celery_thread_pool'),\
|
||||
mocker.patch.object(reload, '_supervisor_service_command'):
|
||||
reload.restart_local_services(['uwsgi', 'celery', 'flower', 'daphne'])
|
||||
reload._uwsgi_fifo_command.assert_called_once_with(uwsgi_command="c")
|
||||
reload._reset_celery_thread_pool.assert_called_once_with()
|
||||
reload._supervisor_service_command.assert_called_once_with(['flower', 'daphne'], command="restart")
|
||||
|
||||
|
||||
|
||||
def test_routing_of_service_restarts_diables(mocker):
|
||||
'''
|
||||
Test that methods are not called if not in the args
|
||||
'''
|
||||
with mocker.patch.object(reload, '_uwsgi_fifo_command'),\
|
||||
mocker.patch.object(reload, '_reset_celery_thread_pool'),\
|
||||
mocker.patch.object(reload, '_supervisor_service_command'):
|
||||
reload.restart_local_services(['flower'])
|
||||
reload._uwsgi_fifo_command.assert_not_called()
|
||||
reload._reset_celery_thread_pool.assert_not_called()
|
||||
reload._supervisor_service_command.assert_called_once_with(['flower'], command="restart")
|
||||
|
||||
|
||||
@ -8,14 +8,106 @@ from pyparsing import (
|
||||
CharsNotIn,
|
||||
ParseException,
|
||||
)
|
||||
from logging import Filter, _levelNames
|
||||
|
||||
import six
|
||||
|
||||
import django
|
||||
from django.apps import apps
|
||||
from django.db import models
|
||||
from django.conf import settings
|
||||
|
||||
from awx.main.utils.common import get_search_fields
|
||||
|
||||
__all__ = ['SmartFilter']
|
||||
__all__ = ['SmartFilter', 'ExternalLoggerEnabled']
|
||||
|
||||
|
||||
class FieldFromSettings(object):
|
||||
"""
|
||||
Field interface - defaults to getting value from setting
|
||||
if otherwise set, provided value will take precedence
|
||||
over value in settings
|
||||
"""
|
||||
|
||||
def __init__(self, setting_name):
|
||||
self.setting_name = setting_name
|
||||
|
||||
def __get__(self, instance, type=None):
|
||||
if self.setting_name in getattr(instance, 'settings_override', {}):
|
||||
return instance.settings_override[self.setting_name]
|
||||
return getattr(settings, self.setting_name, None)
|
||||
|
||||
def __set__(self, instance, value):
|
||||
if value is None:
|
||||
if hasattr(instance, 'settings_override'):
|
||||
instance.settings_override.pop('instance', None)
|
||||
else:
|
||||
if not hasattr(instance, 'settings_override'):
|
||||
instance.settings_override = {}
|
||||
instance.settings_override[self.setting_name] = value
|
||||
|
||||
|
||||
class ExternalLoggerEnabled(Filter):
|
||||
|
||||
# Prevents recursive logging loops from swamping the server
|
||||
LOGGER_BLACKLIST = (
|
||||
# loggers that may be called in process of emitting a log
|
||||
'awx.main.utils.handlers',
|
||||
'awx.main.utils.formatters',
|
||||
'awx.main.utils.filters',
|
||||
'awx.main.utils.encryption',
|
||||
'awx.main.utils.log',
|
||||
# loggers that may be called getting logging settings
|
||||
'awx.conf'
|
||||
)
|
||||
|
||||
lvl = FieldFromSettings('LOG_AGGREGATOR_LEVEL')
|
||||
enabled_loggers = FieldFromSettings('LOG_AGGREGATOR_LOGGERS')
|
||||
enabled_flag = FieldFromSettings('LOG_AGGREGATOR_ENABLED')
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(ExternalLoggerEnabled, self).__init__()
|
||||
for field_name, field_value in kwargs.items():
|
||||
if not isinstance(ExternalLoggerEnabled.__dict__.get(field_name, None), FieldFromSettings):
|
||||
raise Exception('%s is not a valid kwarg' % field_name)
|
||||
if field_value is None:
|
||||
continue
|
||||
setattr(self, field_name, field_value)
|
||||
|
||||
def filter(self, record):
|
||||
"""
|
||||
Uses the database settings to determine if the current
|
||||
external log configuration says that this particular record
|
||||
should be sent to the external log aggregator
|
||||
|
||||
False - should not be logged
|
||||
True - should be logged
|
||||
"""
|
||||
# Logger exceptions
|
||||
for logger_name in self.LOGGER_BLACKLIST:
|
||||
if record.name.startswith(logger_name):
|
||||
return False
|
||||
# General enablement
|
||||
if not self.enabled_flag:
|
||||
return False
|
||||
|
||||
# Level enablement
|
||||
if record.levelno < _levelNames[self.lvl]:
|
||||
# logging._levelNames -> logging._nameToLevel in python 3
|
||||
return False
|
||||
|
||||
# Logger type enablement
|
||||
loggers = self.enabled_loggers
|
||||
if not loggers:
|
||||
return False
|
||||
if record.name.startswith('awx.analytics'):
|
||||
base_path, headline_name = record.name.rsplit('.', 1)
|
||||
return bool(headline_name in loggers)
|
||||
else:
|
||||
if '.' in record.name:
|
||||
base_name, trailing_path = record.name.split('.', 1)
|
||||
else:
|
||||
base_name = record.name
|
||||
return bool(base_name in loggers)
|
||||
|
||||
|
||||
def string_to_type(t):
|
||||
@ -36,7 +128,7 @@ def string_to_type(t):
|
||||
|
||||
|
||||
def get_model(name):
|
||||
return django.apps.apps.get_model('main', name)
|
||||
return apps.get_model('main', name)
|
||||
|
||||
|
||||
class SmartFilter(object):
|
||||
@ -52,7 +144,7 @@ class SmartFilter(object):
|
||||
search_kwargs = self._expand_search(k, v)
|
||||
if search_kwargs:
|
||||
kwargs.update(search_kwargs)
|
||||
q = reduce(lambda x, y: x | y, [django.db.models.Q(**{u'%s__contains' % _k:_v}) for _k, _v in kwargs.items()])
|
||||
q = reduce(lambda x, y: x | y, [models.Q(**{u'%s__contains' % _k:_v}) for _k, _v in kwargs.items()])
|
||||
self.result = Host.objects.filter(q)
|
||||
else:
|
||||
kwargs[k] = v
|
||||
|
||||
@ -9,6 +9,8 @@ import logging
|
||||
|
||||
import six
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
class TimeFormatter(logging.Formatter):
|
||||
'''
|
||||
@ -20,15 +22,6 @@ class TimeFormatter(logging.Formatter):
|
||||
|
||||
|
||||
class LogstashFormatter(LogstashFormatterVersion1):
|
||||
def __init__(self, **kwargs):
|
||||
settings_module = kwargs.pop('settings_module', None)
|
||||
ret = super(LogstashFormatter, self).__init__(**kwargs)
|
||||
if settings_module:
|
||||
self.host_id = getattr(settings_module, 'CLUSTER_HOST_ID', None)
|
||||
if hasattr(settings_module, 'LOG_AGGREGATOR_TOWER_UUID'):
|
||||
self.tower_uuid = settings_module.LOG_AGGREGATOR_TOWER_UUID
|
||||
self.message_type = getattr(settings_module, 'LOG_AGGREGATOR_TYPE', 'other')
|
||||
return ret
|
||||
|
||||
def reformat_data_for_log(self, raw_data, kind=None):
|
||||
'''
|
||||
@ -147,6 +140,15 @@ class LogstashFormatter(LogstashFormatterVersion1):
|
||||
if record.name.startswith('awx.analytics'):
|
||||
log_kind = record.name[len('awx.analytics.'):]
|
||||
fields = self.reformat_data_for_log(fields, kind=log_kind)
|
||||
# General AWX metadata
|
||||
for log_name, setting_name in [
|
||||
('type', 'LOG_AGGREGATOR_TYPE'),
|
||||
('cluster_host_id', 'CLUSTER_HOST_ID'),
|
||||
('tower_uuid', 'LOG_AGGREGATOR_TOWER_UUID')]:
|
||||
if hasattr(settings, setting_name):
|
||||
fields[log_name] = getattr(settings, setting_name, None)
|
||||
elif log_name == 'type':
|
||||
fields[log_name] = 'other'
|
||||
return fields
|
||||
|
||||
def format(self, record):
|
||||
@ -158,18 +160,12 @@ class LogstashFormatter(LogstashFormatterVersion1):
|
||||
'@timestamp': self.format_timestamp(record.created),
|
||||
'message': record.getMessage(),
|
||||
'host': self.host,
|
||||
'type': self.message_type,
|
||||
|
||||
# Extra Fields
|
||||
'level': record.levelname,
|
||||
'logger_name': record.name,
|
||||
}
|
||||
|
||||
if getattr(self, 'tower_uuid', None):
|
||||
message['tower_uuid'] = self.tower_uuid
|
||||
if getattr(self, 'host_id', None):
|
||||
message['cluster_host_id'] = self.host_id
|
||||
|
||||
# Add extra fields
|
||||
message.update(self.get_extra_fields(record))
|
||||
|
||||
|
||||
@ -13,40 +13,35 @@ import six
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
# loggly
|
||||
import traceback
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
|
||||
# requests futures, a dependency used by these handlers
|
||||
from requests_futures.sessions import FuturesSession
|
||||
|
||||
# AWX
|
||||
from awx.main.utils.formatters import LogstashFormatter
|
||||
|
||||
|
||||
__all__ = ['HTTPSNullHandler', 'BaseHTTPSHandler', 'TCPHandler', 'UDPHandler',
|
||||
'configure_external_logger']
|
||||
__all__ = ['BaseHTTPSHandler', 'TCPHandler', 'UDPHandler',
|
||||
'AWXProxyHandler']
|
||||
|
||||
|
||||
logger = logging.getLogger('awx.main.utils.handlers')
|
||||
|
||||
# AWX external logging handler, generally designed to be used
|
||||
# with the accompanying LogstashHandler, derives from python-logstash library
|
||||
# Non-blocking request accomplished by FuturesSession, similar
|
||||
# to the loggly-python-handler library (not used)
|
||||
|
||||
# Translation of parameter names to names in Django settings
|
||||
# logging settings category, only those related to handler / log emission
|
||||
PARAM_NAMES = {
|
||||
'host': 'LOG_AGGREGATOR_HOST',
|
||||
'port': 'LOG_AGGREGATOR_PORT',
|
||||
'message_type': 'LOG_AGGREGATOR_TYPE',
|
||||
'username': 'LOG_AGGREGATOR_USERNAME',
|
||||
'password': 'LOG_AGGREGATOR_PASSWORD',
|
||||
'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS',
|
||||
'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS',
|
||||
'enabled_flag': 'LOG_AGGREGATOR_ENABLED',
|
||||
'tcp_timeout': 'LOG_AGGREGATOR_TCP_TIMEOUT',
|
||||
'verify_cert': 'LOG_AGGREGATOR_VERIFY_CERT',
|
||||
'lvl': 'LOG_AGGREGATOR_LEVEL',
|
||||
'protocol': 'LOG_AGGREGATOR_PROTOCOL'
|
||||
}
|
||||
|
||||
|
||||
@ -58,13 +53,6 @@ class LoggingConnectivityException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class HTTPSNullHandler(logging.NullHandler):
|
||||
"Placeholder null handler to allow loading without database access"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
return super(HTTPSNullHandler, self).__init__()
|
||||
|
||||
|
||||
class VerboseThreadPoolExecutor(ThreadPoolExecutor):
|
||||
|
||||
last_log_emit = 0
|
||||
@ -91,32 +79,25 @@ class VerboseThreadPoolExecutor(ThreadPoolExecutor):
|
||||
**kwargs)
|
||||
|
||||
|
||||
LEVEL_MAPPING = {
|
||||
'DEBUG': logging.DEBUG,
|
||||
'INFO': logging.INFO,
|
||||
'WARNING': logging.WARNING,
|
||||
'ERROR': logging.ERROR,
|
||||
'CRITICAL': logging.CRITICAL,
|
||||
}
|
||||
class SocketResult:
|
||||
'''
|
||||
A class to be the return type of methods that send data over a socket
|
||||
allows object to be used in the same way as a request futures object
|
||||
'''
|
||||
def __init__(self, ok, reason=None):
|
||||
self.ok = ok
|
||||
self.reason = reason
|
||||
|
||||
def result(self):
|
||||
return self
|
||||
|
||||
|
||||
class BaseHandler(logging.Handler):
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, host=None, port=None, indv_facts=None, **kwargs):
|
||||
super(BaseHandler, self).__init__()
|
||||
for fd in PARAM_NAMES:
|
||||
setattr(self, fd, kwargs.get(fd, None))
|
||||
|
||||
@classmethod
|
||||
def from_django_settings(cls, settings, *args, **kwargs):
|
||||
for param, django_setting_name in PARAM_NAMES.items():
|
||||
kwargs[param] = getattr(settings, django_setting_name, None)
|
||||
return cls(*args, **kwargs)
|
||||
|
||||
def get_full_message(self, record):
|
||||
if record.exc_info:
|
||||
return '\n'.join(traceback.format_exception(*record.exc_info))
|
||||
else:
|
||||
return record.getMessage()
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.indv_facts = indv_facts
|
||||
|
||||
def _send(self, payload):
|
||||
"""Actually send message to log aggregator.
|
||||
@ -128,26 +109,11 @@ class BaseHandler(logging.Handler):
|
||||
return [self._send(json.loads(self.format(record)))]
|
||||
return [self._send(self.format(record))]
|
||||
|
||||
def _skip_log(self, logger_name):
|
||||
if self.host == '' or (not self.enabled_flag):
|
||||
return True
|
||||
# Don't send handler-related records.
|
||||
if logger_name == logger.name:
|
||||
return True
|
||||
# AWX log emission is only turned off by enablement setting
|
||||
if not logger_name.startswith('awx.analytics'):
|
||||
return False
|
||||
return self.enabled_loggers is None or logger_name[len('awx.analytics.'):] not in self.enabled_loggers
|
||||
|
||||
def emit(self, record):
|
||||
"""
|
||||
Emit a log record. Returns a list of zero or more
|
||||
implementation-specific objects for tests.
|
||||
"""
|
||||
if not record.name.startswith('awx.analytics') and record.levelno < LEVEL_MAPPING[self.lvl]:
|
||||
return []
|
||||
if self._skip_log(record.name):
|
||||
return []
|
||||
try:
|
||||
return self._format_and_send_record(record)
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
@ -181,6 +147,11 @@ class BaseHandler(logging.Handler):
|
||||
|
||||
|
||||
class BaseHTTPSHandler(BaseHandler):
|
||||
'''
|
||||
Originally derived from python-logstash library
|
||||
Non-blocking request accomplished by FuturesSession, similar
|
||||
to the loggly-python-handler library
|
||||
'''
|
||||
def _add_auth_information(self):
|
||||
if self.message_type == 'logstash':
|
||||
if not self.username:
|
||||
@ -196,39 +167,20 @@ class BaseHTTPSHandler(BaseHandler):
|
||||
}
|
||||
self.session.headers.update(headers)
|
||||
|
||||
def __init__(self, fqdn=False, **kwargs):
|
||||
def __init__(self, fqdn=False, message_type=None, username=None, password=None,
|
||||
tcp_timeout=5, verify_cert=True, **kwargs):
|
||||
self.fqdn = fqdn
|
||||
self.message_type = message_type
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.tcp_timeout = tcp_timeout
|
||||
self.verify_cert = verify_cert
|
||||
super(BaseHTTPSHandler, self).__init__(**kwargs)
|
||||
self.session = FuturesSession(executor=VerboseThreadPoolExecutor(
|
||||
max_workers=2 # this is the default used by requests_futures
|
||||
))
|
||||
self._add_auth_information()
|
||||
|
||||
@classmethod
|
||||
def perform_test(cls, settings):
|
||||
"""
|
||||
Tests logging connectivity for the current logging settings.
|
||||
@raises LoggingConnectivityException
|
||||
"""
|
||||
handler = cls.from_django_settings(settings)
|
||||
handler.enabled_flag = True
|
||||
handler.setFormatter(LogstashFormatter(settings_module=settings))
|
||||
logger = logging.getLogger(__file__)
|
||||
fn, lno, func = logger.findCaller()
|
||||
record = logger.makeRecord('awx', 10, fn, lno,
|
||||
'AWX Connection Test', tuple(),
|
||||
None, func)
|
||||
futures = handler.emit(record)
|
||||
for future in futures:
|
||||
try:
|
||||
resp = future.result()
|
||||
if not resp.ok:
|
||||
raise LoggingConnectivityException(
|
||||
': '.join([str(resp.status_code), resp.reason or ''])
|
||||
)
|
||||
except RequestException as e:
|
||||
raise LoggingConnectivityException(str(e))
|
||||
|
||||
def _get_post_kwargs(self, payload_input):
|
||||
if self.message_type == 'splunk':
|
||||
# Splunk needs data nested under key "event"
|
||||
@ -265,6 +217,10 @@ def _encode_payload_for_socket(payload):
|
||||
|
||||
|
||||
class TCPHandler(BaseHandler):
|
||||
def __init__(self, tcp_timeout=5, **kwargs):
|
||||
self.tcp_timeout = tcp_timeout
|
||||
super(TCPHandler, self).__init__(**kwargs)
|
||||
|
||||
def _send(self, payload):
|
||||
payload = _encode_payload_for_socket(payload)
|
||||
sok = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
@ -273,39 +229,32 @@ class TCPHandler(BaseHandler):
|
||||
sok.setblocking(0)
|
||||
_, ready_to_send, _ = select.select([], [sok], [], float(self.tcp_timeout))
|
||||
if len(ready_to_send) == 0:
|
||||
logger.warning("Socket currently busy, failed to send message")
|
||||
sok.close()
|
||||
return
|
||||
sok.send(payload)
|
||||
ret = SocketResult(False, "Socket currently busy, failed to send message")
|
||||
logger.warning(ret.reason)
|
||||
else:
|
||||
sok.send(payload)
|
||||
ret = SocketResult(True) # success!
|
||||
except Exception as e:
|
||||
logger.exception("Error sending message from %s: %s" %
|
||||
(TCPHandler.__name__, e.message))
|
||||
sok.close()
|
||||
ret = SocketResult(False, "Error sending message from %s: %s" %
|
||||
(TCPHandler.__name__,
|
||||
' '.join(six.text_type(arg) for arg in e.args)))
|
||||
logger.exception(ret.reason)
|
||||
finally:
|
||||
sok.close()
|
||||
return ret
|
||||
|
||||
|
||||
class UDPHandler(BaseHandler):
|
||||
message = "Cannot determine if UDP messages are received."
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(UDPHandler, self).__init__(**kwargs)
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
def _send(self, payload):
|
||||
payload = _encode_payload_for_socket(payload)
|
||||
return self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0))
|
||||
|
||||
@classmethod
|
||||
def perform_test(cls, settings):
|
||||
"""
|
||||
Tests logging connectivity for the current logging settings.
|
||||
"""
|
||||
handler = cls.from_django_settings(settings)
|
||||
handler.enabled_flag = True
|
||||
handler.setFormatter(LogstashFormatter(settings_module=settings))
|
||||
logger = logging.getLogger(__file__)
|
||||
fn, lno, func = logger.findCaller()
|
||||
record = logger.makeRecord('awx', 10, fn, lno,
|
||||
'AWX Connection Test', tuple(),
|
||||
None, func)
|
||||
handler.emit(_encode_payload_for_socket(record))
|
||||
self.socket.sendto(payload, (self._get_host(hostname_only=True), self.port or 0))
|
||||
return SocketResult(True, reason=self.message)
|
||||
|
||||
|
||||
HANDLER_MAPPING = {
|
||||
@ -315,6 +264,88 @@ HANDLER_MAPPING = {
|
||||
}
|
||||
|
||||
|
||||
class AWXProxyHandler(logging.Handler):
|
||||
'''
|
||||
Handler specific to the AWX external logging feature
|
||||
|
||||
Will dynamically create a handler specific to the configured
|
||||
protocol, and will create a new one automatically on setting change
|
||||
|
||||
Managing parameters:
|
||||
All parameters will get their value from settings as a default
|
||||
if the parameter was either provided on init, or set manually,
|
||||
this value will take precedence.
|
||||
Parameters match same parameters in the actualized handler classes.
|
||||
'''
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
# TODO: process 'level' kwarg
|
||||
super(AWXProxyHandler, self).__init__(**kwargs)
|
||||
self._handler = None
|
||||
self._old_kwargs = {}
|
||||
|
||||
def get_handler_class(self, protocol):
|
||||
return HANDLER_MAPPING[protocol]
|
||||
|
||||
def get_handler(self, custom_settings=None, force_create=False):
|
||||
new_kwargs = {}
|
||||
use_settings = custom_settings or settings
|
||||
for field_name, setting_name in PARAM_NAMES.items():
|
||||
val = getattr(use_settings, setting_name, None)
|
||||
if val is None:
|
||||
continue
|
||||
new_kwargs[field_name] = val
|
||||
if new_kwargs == self._old_kwargs and self._handler and (not force_create):
|
||||
# avoids re-creating session objects, and other such things
|
||||
return self._handler
|
||||
self._old_kwargs = new_kwargs.copy()
|
||||
# TODO: remove any kwargs no applicable to that particular handler
|
||||
protocol = new_kwargs.pop('protocol', None)
|
||||
HandlerClass = self.get_handler_class(protocol)
|
||||
# cleanup old handler and make new one
|
||||
if self._handler:
|
||||
self._handler.close()
|
||||
logger.debug('Creating external log handler due to startup or settings change.')
|
||||
self._handler = HandlerClass(**new_kwargs)
|
||||
if self.formatter:
|
||||
# self.format(record) is called inside of emit method
|
||||
# so not safe to assume this can be handled within self
|
||||
self._handler.setFormatter(self.formatter)
|
||||
return self._handler
|
||||
|
||||
def emit(self, record):
|
||||
actual_handler = self.get_handler()
|
||||
return actual_handler.emit(record)
|
||||
|
||||
def perform_test(self, custom_settings):
|
||||
"""
|
||||
Tests logging connectivity for given settings module.
|
||||
@raises LoggingConnectivityException
|
||||
"""
|
||||
handler = self.get_handler(custom_settings=custom_settings, force_create=True)
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
logger = logging.getLogger(__file__)
|
||||
fn, lno, func = logger.findCaller()
|
||||
record = logger.makeRecord('awx', 10, fn, lno,
|
||||
'AWX Connection Test', tuple(),
|
||||
None, func)
|
||||
futures = handler.emit(record)
|
||||
for future in futures:
|
||||
try:
|
||||
resp = future.result()
|
||||
if not resp.ok:
|
||||
if isinstance(resp, SocketResult):
|
||||
raise LoggingConnectivityException(
|
||||
'Socket error: {}'.format(resp.reason or '')
|
||||
)
|
||||
else:
|
||||
raise LoggingConnectivityException(
|
||||
': '.join([str(resp.status_code), resp.reason or ''])
|
||||
)
|
||||
except RequestException as e:
|
||||
raise LoggingConnectivityException(str(e))
|
||||
|
||||
|
||||
ColorHandler = logging.StreamHandler
|
||||
|
||||
if settings.COLOR_LOGS is True:
|
||||
@ -340,41 +371,3 @@ if settings.COLOR_LOGS is True:
|
||||
except ImportError:
|
||||
# logutils is only used for colored logs in the dev environment
|
||||
pass
|
||||
|
||||
|
||||
def _add_or_remove_logger(address, instance):
|
||||
specific_logger = logging.getLogger(address)
|
||||
for i, handler in enumerate(specific_logger.handlers):
|
||||
if isinstance(handler, (HTTPSNullHandler, BaseHTTPSHandler)):
|
||||
specific_logger.handlers[i] = instance or HTTPSNullHandler()
|
||||
break
|
||||
else:
|
||||
if instance is not None:
|
||||
specific_logger.handlers.append(instance)
|
||||
|
||||
|
||||
def configure_external_logger(settings_module, is_startup=True):
|
||||
is_enabled = settings_module.LOG_AGGREGATOR_ENABLED
|
||||
if is_startup and (not is_enabled):
|
||||
# Pass-through if external logging not being used
|
||||
return
|
||||
|
||||
instance = None
|
||||
if is_enabled:
|
||||
handler_class = HANDLER_MAPPING[settings_module.LOG_AGGREGATOR_PROTOCOL]
|
||||
instance = handler_class.from_django_settings(settings_module)
|
||||
|
||||
# Obtain the Formatter class from settings to maintain customizations
|
||||
configurator = logging.config.DictConfigurator(settings_module.LOGGING)
|
||||
formatter_config = settings_module.LOGGING['formatters']['json'].copy()
|
||||
formatter_config['settings_module'] = settings_module
|
||||
formatter = configurator.configure_custom(formatter_config)
|
||||
|
||||
instance.setFormatter(formatter)
|
||||
|
||||
awx_logger_instance = instance
|
||||
if is_enabled and 'awx' not in settings_module.LOG_AGGREGATOR_LOGGERS:
|
||||
awx_logger_instance = None
|
||||
|
||||
_add_or_remove_logger('awx.analytics', instance)
|
||||
_add_or_remove_logger('awx', awx_logger_instance)
|
||||
|
||||
@ -8,29 +8,9 @@ import logging
|
||||
# Django
|
||||
from django.conf import settings
|
||||
|
||||
# Celery
|
||||
from celery import Celery
|
||||
|
||||
logger = logging.getLogger('awx.main.utils.reload')
|
||||
|
||||
|
||||
def _uwsgi_fifo_command(uwsgi_command):
|
||||
# http://uwsgi-docs.readthedocs.io/en/latest/MasterFIFO.html#available-commands
|
||||
logger.warn('Initiating uWSGI chain reload of server')
|
||||
TRIGGER_COMMAND = uwsgi_command
|
||||
with open(settings.UWSGI_FIFO_LOCATION, 'w') as awxfifo:
|
||||
awxfifo.write(TRIGGER_COMMAND)
|
||||
|
||||
|
||||
def _reset_celery_thread_pool():
|
||||
# Do not use current_app because of this outstanding issue:
|
||||
# https://github.com/celery/celery/issues/4410
|
||||
app = Celery('awx')
|
||||
app.config_from_object('django.conf:settings')
|
||||
app.control.broadcast('pool_restart', arguments={'reload': True},
|
||||
destination=['celery@{}'.format(settings.CLUSTER_HOST_ID)], reply=False)
|
||||
|
||||
|
||||
def _supervisor_service_command(service_internal_names, command, communicate=True):
|
||||
'''
|
||||
Service internal name options:
|
||||
@ -68,21 +48,6 @@ def _supervisor_service_command(service_internal_names, command, communicate=Tru
|
||||
logger.info('Submitted supervisorctl {} command, not waiting for result'.format(command))
|
||||
|
||||
|
||||
def restart_local_services(service_internal_names):
|
||||
logger.warn('Restarting services {} on this node in response to user action'.format(service_internal_names))
|
||||
if 'uwsgi' in service_internal_names:
|
||||
_uwsgi_fifo_command(uwsgi_command='c')
|
||||
service_internal_names.remove('uwsgi')
|
||||
restart_celery = False
|
||||
if 'celery' in service_internal_names:
|
||||
restart_celery = True
|
||||
service_internal_names.remove('celery')
|
||||
_supervisor_service_command(service_internal_names, command='restart')
|
||||
if restart_celery:
|
||||
# Celery restarted last because this probably includes current process
|
||||
_reset_celery_thread_pool()
|
||||
|
||||
|
||||
def stop_local_services(service_internal_names, communicate=True):
|
||||
logger.warn('Stopping services {} on this node in response to user action'.format(service_internal_names))
|
||||
_supervisor_service_command(service_internal_names, command='stop', communicate=communicate)
|
||||
|
||||
@ -1001,6 +1001,9 @@ LOGGING = {
|
||||
'require_debug_true_or_test': {
|
||||
'()': 'awx.main.utils.RequireDebugTrueOrTest',
|
||||
},
|
||||
'external_log_enabled': {
|
||||
'()': 'awx.main.utils.filters.ExternalLoggerEnabled'
|
||||
},
|
||||
},
|
||||
'formatters': {
|
||||
'simple': {
|
||||
@ -1034,11 +1037,10 @@ LOGGING = {
|
||||
'class': 'logging.NullHandler',
|
||||
'formatter': 'simple',
|
||||
},
|
||||
'http_receiver': {
|
||||
'class': 'awx.main.utils.handlers.HTTPSNullHandler',
|
||||
'level': 'DEBUG',
|
||||
'external_logger': {
|
||||
'class': 'awx.main.utils.handlers.AWXProxyHandler',
|
||||
'formatter': 'json',
|
||||
'host': '',
|
||||
'filters': ['external_log_enabled'],
|
||||
},
|
||||
'mail_admins': {
|
||||
'level': 'ERROR',
|
||||
@ -1131,7 +1133,7 @@ LOGGING = {
|
||||
'handlers': ['console'],
|
||||
},
|
||||
'awx': {
|
||||
'handlers': ['console', 'file', 'tower_warnings'],
|
||||
'handlers': ['console', 'file', 'tower_warnings', 'external_logger'],
|
||||
'level': 'DEBUG',
|
||||
},
|
||||
'awx.conf': {
|
||||
@ -1179,7 +1181,7 @@ LOGGING = {
|
||||
'propagate': False,
|
||||
},
|
||||
'awx.analytics': {
|
||||
'handlers': ['http_receiver'],
|
||||
'handlers': ['external_logger'],
|
||||
'level': 'INFO',
|
||||
'propagate': False
|
||||
},
|
||||
|
||||
@ -149,8 +149,6 @@ SERVICE_NAME_DICT = {
|
||||
"uwsgi": "uwsgi",
|
||||
"daphne": "daphne",
|
||||
"nginx": "nginx"}
|
||||
# Used for sending commands in automatic restart
|
||||
UWSGI_FIFO_LOCATION = '/awxfifo'
|
||||
|
||||
try:
|
||||
socket.gethostbyname('docker.for.mac.internal')
|
||||
|
||||
@ -68,8 +68,6 @@ SERVICE_NAME_DICT = {
|
||||
"channels": "awx-channels-worker",
|
||||
"uwsgi": "awx-uwsgi",
|
||||
"daphne": "awx-daphne"}
|
||||
# Used for sending commands in automatic restart
|
||||
UWSGI_FIFO_LOCATION = '/var/lib/awx/awxfifo'
|
||||
|
||||
# Store a snapshot of default settings at this point before loading any
|
||||
# customizable config files.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user