mirror of
https://github.com/ansible/awx.git
synced 2026-03-16 08:27:29 -02:30
Refactor log handler and support TCP/UDP communications
This commit is contained in:
@@ -1,16 +1,21 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import base64
|
||||
import cStringIO
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
from uuid import uuid4
|
||||
|
||||
import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.conf import LazySettings
|
||||
import pytest
|
||||
import requests
|
||||
from requests_futures.sessions import FuturesSession
|
||||
|
||||
from awx.main.utils.handlers import (BaseHTTPSHandler as HTTPSHandler,
|
||||
from awx.main.utils.handlers import (BaseHandler, BaseHTTPSHandler as HTTPSHandler,
|
||||
TCPHandler, UDPHandler, _encode_payload_for_socket,
|
||||
PARAM_NAMES, LoggingConnectivityException)
|
||||
from awx.main.utils.formatters import LogstashFormatter
|
||||
|
||||
@@ -57,6 +62,16 @@ def connection_error_adapter():
|
||||
return ConnectionErrorAdapter()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_socket(tmpdir_factory, request):
|
||||
sok = socket._socketobject
|
||||
sok.send = mock.MagicMock()
|
||||
sok.connect = mock.MagicMock()
|
||||
sok.setblocking = mock.MagicMock()
|
||||
sok.close = mock.MagicMock()
|
||||
return sok
|
||||
|
||||
|
||||
def test_https_logging_handler_requests_async_implementation():
|
||||
handler = HTTPSHandler()
|
||||
assert isinstance(handler.session, FuturesSession)
|
||||
@@ -64,31 +79,121 @@ def test_https_logging_handler_requests_async_implementation():
|
||||
|
||||
def test_https_logging_handler_has_default_http_timeout():
|
||||
handler = HTTPSHandler.from_django_settings(settings)
|
||||
assert handler.http_timeout == 5
|
||||
assert handler.tcp_timeout == 5
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||
def test_https_logging_handler_defaults(param):
|
||||
handler = HTTPSHandler()
|
||||
def test_base_logging_handler_defaults(param):
|
||||
handler = BaseHandler()
|
||||
assert hasattr(handler, param) and getattr(handler, param) is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param', PARAM_NAMES.keys())
|
||||
def test_https_logging_handler_kwargs(param):
|
||||
handler = HTTPSHandler(**{param: 'EXAMPLE'})
|
||||
def test_base_logging_handler_kwargs(param):
|
||||
handler = BaseHandler(**{param: 'EXAMPLE'})
|
||||
assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('param, django_settings_name', PARAM_NAMES.items())
|
||||
def test_https_logging_handler_from_django_settings(param, django_settings_name):
|
||||
def test_base_logging_handler_from_django_settings(param, django_settings_name):
|
||||
settings = LazySettings()
|
||||
settings.configure(**{
|
||||
django_settings_name: 'EXAMPLE'
|
||||
})
|
||||
handler = HTTPSHandler.from_django_settings(settings)
|
||||
handler = BaseHandler.from_django_settings(settings)
|
||||
assert hasattr(handler, param) and getattr(handler, param) == 'EXAMPLE'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('params, logger_name, expected', [
|
||||
# skip all records if enabled_flag = False
|
||||
({'enabled_flag': False}, 'awx.main', True),
|
||||
# skip all records if the host is undefined
|
||||
({'host': '', 'enabled_flag': True}, 'awx.main', True),
|
||||
# skip all records if underlying logger is used by handlers themselves
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main.utils.handlers', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main', False),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', False),
|
||||
])
|
||||
def test_base_logging_handler_skip_log(params, logger_name, expected):
|
||||
handler = BaseHandler(**params)
|
||||
assert handler._skip_log(logger_name) is expected
|
||||
|
||||
|
||||
def test_base_logging_handler_emit(dummy_log_record):
|
||||
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
sent_payloads = handler.emit(dummy_log_record)
|
||||
|
||||
assert len(sent_payloads) == 1
|
||||
body = json.loads(sent_payloads[0])
|
||||
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx'
|
||||
assert body['message'] == 'User joe logged in'
|
||||
|
||||
|
||||
def test_base_logging_handler_emit_one_record_per_fact():
|
||||
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', indv_facts=True,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
record = logging.LogRecord(
|
||||
'awx.analytics.system_tracking', # logger name
|
||||
20, # loglevel INFO
|
||||
'./awx/some/module.py', # pathname
|
||||
100, # lineno
|
||||
None, # msg
|
||||
tuple(), # args,
|
||||
None # exc_info
|
||||
)
|
||||
record.module_name = 'packages'
|
||||
record.facts_data = [{
|
||||
"name": "ansible",
|
||||
"version": "2.2.1.0"
|
||||
}, {
|
||||
"name": "ansible-tower",
|
||||
"version": "3.1.0"
|
||||
}]
|
||||
sent_payloads = handler.emit(record)
|
||||
|
||||
assert len(sent_payloads) == 2
|
||||
sent_payloads.sort(key=lambda payload: payload['version'])
|
||||
|
||||
assert sent_payloads[0]['level'] == 'INFO'
|
||||
assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert sent_payloads[0]['name'] == 'ansible'
|
||||
assert sent_payloads[0]['version'] == '2.2.1.0'
|
||||
assert sent_payloads[1]['level'] == 'INFO'
|
||||
assert sent_payloads[1]['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert sent_payloads[1]['name'] == 'ansible-tower'
|
||||
assert sent_payloads[1]['version'] == '3.1.0'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('host, port, normalized, hostname_only', [
|
||||
('localhost', None, 'http://localhost', False),
|
||||
('localhost', 8080, 'http://localhost:8080', False),
|
||||
('http://localhost', None, 'http://localhost', False),
|
||||
('http://localhost', 8080, 'http://localhost:8080', False),
|
||||
('https://localhost', 443, 'https://localhost:443', False),
|
||||
('ftp://localhost', 443, 'ftp://localhost:443', False),
|
||||
('https://localhost:550', 443, 'https://localhost:550', False),
|
||||
('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar', False),
|
||||
('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar', False),
|
||||
('http://splunk.server:8088/services/collector/event', 80,
|
||||
'http://splunk.server:8088/services/collector/event', False),
|
||||
('http://splunk.server/services/collector/event', 8088,
|
||||
'http://splunk.server:8088/services/collector/event', False),
|
||||
('localhost', 4399, 'localhost', True),
|
||||
('tcp://localhost:4399/foo/bar', 4399, 'localhost', True),
|
||||
])
|
||||
def test_base_logging_handler_host_format(host, port, normalized, hostname_only):
|
||||
handler = BaseHandler(host=host, port=port)
|
||||
assert handler._get_host(scheme='http', hostname_only=hostname_only) == normalized
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'status, reason, exc',
|
||||
[(200, '200 OK', None), (404, 'Not Found', LoggingConnectivityException)]
|
||||
@@ -127,7 +232,7 @@ def test_https_logging_handler_connectivity_test(http_adapter, status, reason, e
|
||||
|
||||
def test_https_logging_handler_logstash_auth_info():
|
||||
handler = HTTPSHandler(message_type='logstash', username='bob', password='ansible')
|
||||
handler.add_auth_information()
|
||||
handler._add_auth_information()
|
||||
assert isinstance(handler.session.auth, requests.auth.HTTPBasicAuth)
|
||||
assert handler.session.auth.username == 'bob'
|
||||
assert handler.session.auth.password == 'ansible'
|
||||
@@ -135,47 +240,11 @@ def test_https_logging_handler_logstash_auth_info():
|
||||
|
||||
def test_https_logging_handler_splunk_auth_info():
|
||||
handler = HTTPSHandler(message_type='splunk', password='ansible')
|
||||
handler.add_auth_information()
|
||||
handler._add_auth_information()
|
||||
assert handler.session.headers['Authorization'] == 'Splunk ansible'
|
||||
assert handler.session.headers['Content-Type'] == 'application/json'
|
||||
|
||||
|
||||
@pytest.mark.parametrize('host, port, normalized', [
|
||||
('localhost', None, 'http://localhost'),
|
||||
('localhost', 80, 'http://localhost'),
|
||||
('localhost', 8080, 'http://localhost:8080'),
|
||||
('http://localhost', None, 'http://localhost'),
|
||||
('http://localhost', 80, 'http://localhost'),
|
||||
('http://localhost', 8080, 'http://localhost:8080'),
|
||||
('https://localhost', 443, 'https://localhost:443'),
|
||||
('ftp://localhost', 443, 'ftp://localhost:443'),
|
||||
('https://localhost:550', 443, 'https://localhost:550'),
|
||||
('https://localhost:yoho/foobar', 443, 'https://localhost:443/foobar'),
|
||||
('https://localhost:yoho/foobar', None, 'https://localhost:yoho/foobar'),
|
||||
('http://splunk.server:8088/services/collector/event', 80,
|
||||
'http://splunk.server:8088/services/collector/event'),
|
||||
('http://splunk.server/services/collector/event', 80,
|
||||
'http://splunk.server/services/collector/event'),
|
||||
('http://splunk.server/services/collector/event', 8088,
|
||||
'http://splunk.server:8088/services/collector/event'),
|
||||
])
|
||||
def test_https_logging_handler_http_host_format(host, port, normalized):
|
||||
handler = HTTPSHandler(host=host, port=port)
|
||||
assert handler.get_http_host() == normalized
|
||||
|
||||
|
||||
@pytest.mark.parametrize('params, logger_name, expected', [
|
||||
({'enabled_flag': False}, 'awx.main', True), # skip all records if enabled_flag = False
|
||||
({'host': '', 'enabled_flag': True}, 'awx.main', True), # skip all records if the host is undefined
|
||||
({'host': '127.0.0.1', 'enabled_flag': True}, 'awx.main', False),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['abc']}, 'awx.analytics.xyz', True),
|
||||
({'host': '127.0.0.1', 'enabled_flag': True, 'enabled_loggers': ['xyz']}, 'awx.analytics.xyz', False),
|
||||
])
|
||||
def test_https_logging_handler_skip_log(params, logger_name, expected):
|
||||
handler = HTTPSHandler(**params)
|
||||
assert handler.skip_log(logger_name) is expected
|
||||
|
||||
|
||||
def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
dummy_log_record):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
@@ -204,8 +273,8 @@ def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
|
||||
|
||||
@pytest.mark.parametrize('message_type', ['logstash', 'splunk'])
|
||||
def test_https_logging_handler_emit(http_adapter, dummy_log_record,
|
||||
message_type):
|
||||
def test_https_logging_handler_emit_without_cred(http_adapter, dummy_log_record,
|
||||
message_type):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type=message_type,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
@@ -218,21 +287,14 @@ def test_https_logging_handler_emit(http_adapter, dummy_log_record,
|
||||
request = http_adapter.requests[0]
|
||||
assert request.url == 'http://127.0.0.1/'
|
||||
assert request.method == 'POST'
|
||||
body = json.loads(request.body)
|
||||
|
||||
if message_type == 'logstash':
|
||||
# A username + password weren't used, so this header should be missing
|
||||
assert 'Authorization' not in request.headers
|
||||
|
||||
if message_type == 'splunk':
|
||||
# splunk messages are nested under the 'event' key
|
||||
body = body['event']
|
||||
assert request.headers['Authorization'] == 'Splunk None'
|
||||
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx'
|
||||
assert body['message'] == 'User joe logged in'
|
||||
|
||||
|
||||
def test_https_logging_handler_emit_logstash_with_creds(http_adapter,
|
||||
dummy_log_record):
|
||||
@@ -265,49 +327,78 @@ def test_https_logging_handler_emit_splunk_with_creds(http_adapter,
|
||||
assert request.headers['Authorization'] == 'Splunk pass'
|
||||
|
||||
|
||||
def test_https_logging_handler_emit_one_record_per_fact(http_adapter):
|
||||
handler = HTTPSHandler(host='127.0.0.1', enabled_flag=True,
|
||||
message_type='logstash', indv_facts=True,
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
@pytest.mark.parametrize('payload, encoded_payload', [
|
||||
('foobar', 'foobar'),
|
||||
({'foo': 'bar'}, '{"foo": "bar"}'),
|
||||
({u'测试键': u'测试值'}, '{"测试键": "测试值"}'),
|
||||
])
|
||||
def test_encode_payload_for_socket(payload, encoded_payload):
|
||||
assert _encode_payload_for_socket(payload) == encoded_payload
|
||||
|
||||
|
||||
def test_udp_handler_create_socket_at_init():
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
assert hasattr(handler, 'socket')
|
||||
assert isinstance(handler.socket, socket.socket)
|
||||
assert handler.socket.family == socket.AF_INET
|
||||
assert handler.socket.type == socket.SOCK_DGRAM
|
||||
|
||||
|
||||
def test_udp_handler_send(dummy_log_record):
|
||||
handler = UDPHandler(host='127.0.0.1', port=4399,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', http_adapter)
|
||||
record = logging.LogRecord(
|
||||
'awx.analytics.system_tracking', # logger name
|
||||
20, # loglevel INFO
|
||||
'./awx/some/module.py', # pathname
|
||||
100, # lineno
|
||||
None, # msg
|
||||
tuple(), # args,
|
||||
None # exc_info
|
||||
)
|
||||
record.module_name = 'packages'
|
||||
record.facts_data = [{
|
||||
"name": "ansible",
|
||||
"version": "2.2.1.0"
|
||||
}, {
|
||||
"name": "ansible-tower",
|
||||
"version": "3.1.0"
|
||||
}]
|
||||
async_futures = handler.emit(record)
|
||||
[future.result() for future in async_futures]
|
||||
with mock.patch('awx.main.utils.handlers._encode_payload_for_socket', return_value="des") as encode_mock,\
|
||||
mock.patch.object(handler, 'socket') as socket_mock:
|
||||
handler.emit(dummy_log_record)
|
||||
encode_mock.assert_called_once_with(handler.format(dummy_log_record))
|
||||
socket_mock.sendto.assert_called_once_with("des", ('127.0.0.1', 4399))
|
||||
|
||||
assert len(http_adapter.requests) == 2
|
||||
requests = sorted(http_adapter.requests, key=lambda request: json.loads(request.body)['version'])
|
||||
|
||||
request = requests[0]
|
||||
assert request.url == 'http://127.0.0.1/'
|
||||
assert request.method == 'POST'
|
||||
body = json.loads(request.body)
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert body['name'] == 'ansible'
|
||||
assert body['version'] == '2.2.1.0'
|
||||
def test_tcp_handler_send(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [fake_socket], [])):
|
||||
handler.emit(dummy_log_record)
|
||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
||||
fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399))
|
||||
fake_socket.setblocking.assert_called_once_with(0)
|
||||
fake_socket.send.assert_called_once_with(handler.format(dummy_log_record))
|
||||
fake_socket.close.assert_called_once()
|
||||
|
||||
request = requests[1]
|
||||
assert request.url == 'http://127.0.0.1/'
|
||||
assert request.method == 'POST'
|
||||
body = json.loads(request.body)
|
||||
assert body['level'] == 'INFO'
|
||||
assert body['logger_name'] == 'awx.analytics.system_tracking'
|
||||
assert body['name'] == 'ansible-tower'
|
||||
assert body['version'] == '3.1.0'
|
||||
|
||||
def test_tcp_handler_return_if_socket_unavailable(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [], [])):
|
||||
handler.emit(dummy_log_record)
|
||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
||||
fake_socket.connect.assert_called_once_with(('127.0.0.1', 4399))
|
||||
fake_socket.setblocking.assert_called_once_with(0)
|
||||
assert not fake_socket.send.called
|
||||
fake_socket.close.assert_called_once()
|
||||
|
||||
|
||||
def test_tcp_handler_log_exception(fake_socket, dummy_log_record):
|
||||
handler = TCPHandler(host='127.0.0.1', port=4399, tcp_timeout=5,
|
||||
enabled_flag=True, message_type='splunk',
|
||||
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
with mock.patch('socket.socket', return_value=fake_socket) as sok_init_mock,\
|
||||
mock.patch('select.select', return_value=([], [], [])),\
|
||||
mock.patch('awx.main.utils.handlers.logger') as logger_mock:
|
||||
fake_socket.connect.side_effect = Exception("foo")
|
||||
handler.emit(dummy_log_record)
|
||||
sok_init_mock.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM)
|
||||
logger_mock.exception.assert_called_once()
|
||||
fake_socket.close.assert_called_once()
|
||||
assert not fake_socket.send.called
|
||||
|
||||
Reference in New Issue
Block a user