diff --git a/awx/conf/apps.py b/awx/conf/apps.py index a77cc84209..8f9980dde9 100644 --- a/awx/conf/apps.py +++ b/awx/conf/apps.py @@ -2,6 +2,8 @@ from django.apps import AppConfig # from django.core import checks from django.utils.translation import ugettext_lazy as _ +from django.utils.log import configure_logging +from django.conf import settings class ConfConfig(AppConfig): @@ -13,4 +15,5 @@ class ConfConfig(AppConfig): self.module.autodiscover() from .settings import SettingsWrapper SettingsWrapper.initialize() + configure_logging(settings.LOGGING_CONFIG, settings.LOGGING) # checks.register(SettingsWrapper._check_settings) diff --git a/awx/conf/signals.py b/awx/conf/signals.py index 2b9d17c227..f2ae1baf16 100644 --- a/awx/conf/signals.py +++ b/awx/conf/signals.py @@ -7,6 +7,7 @@ from django.core.cache import cache from django.core.signals import setting_changed from django.db.models.signals import post_save, pre_delete, post_delete from django.dispatch import receiver +from django.utils.log import configure_logging # Tower import awx.main.signals @@ -42,8 +43,9 @@ def handle_setting_change(key, for_delete=False): ) # TODO: Move logic to task to run on all cluster nodes if setting_key.startswith('LOG_AGGREGATOR_'): - settings.LOGGING_CONFIG = None - logging.config.dictConfig(settings.LOGGING) + configure_logging(settings.LOGGING_CONFIG, settings.LOGGING) + # settings.LOGGING_CONFIG = None + # logging.config.dictConfig(settings.LOGGING) @receiver(post_save, sender=Setting) diff --git a/awx/main/conf.py b/awx/main/conf.py index 9ac555bcb7..527f262334 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -285,3 +285,14 @@ register( category=_('Logging'), category_slug='logging', ) +register( + 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', + field_class=fields.BooleanField, + default=False, + label=_('Flag denoting to send individual messages for each fact in system tracking'), + help_text=_('If not set, the data from system tracking will be sent inside ' + 'of a single dictionary, but if set, separate requests will be sent ' + 'for each package, service, etc. that is found in the scan.'), + category=_('Logging'), + category_slug='logging', +) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index 2e74f05114..a8c56a264d 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -11,6 +11,7 @@ from awx.main.models.organization import AuthToken logger = logging.getLogger('awx.main.consumers') + def discard_groups(message): if 'groups' in message.channel_session: for group in message.channel_session['groups']: diff --git a/awx/main/log_utils/formatters.py b/awx/main/log_utils/formatters.py index 17fafca063..94f9b5c0f0 100644 --- a/awx/main/log_utils/formatters.py +++ b/awx/main/log_utils/formatters.py @@ -3,50 +3,108 @@ from logstash.formatter import LogstashFormatterVersion1 from django.conf import settings +from copy import copy +import json +import time -# # Loggly example -# 'json': { -# 'format': '{ -# "loggerName":"%(name)s", -# "asciTime":"%(asctime)s", -# "fileName":"%(filename)s", -# "logRecordCreationTime":"%(created)f", -# "functionName":"%(funcName)s", -# "levelNo":"%(levelno)s", -# "lineNo":"%(lineno)d", -# "time":"%(msecs)d", -# "levelName":"%(levelname)s", -# "message":"%(message)s"}', -# }, - class LogstashFormatter(LogstashFormatterVersion1): def __init__(self, **kwargs): ret = super(LogstashFormatter, self).__init__(**kwargs) self.host_id = settings.CLUSTER_HOST_ID return ret + def reformat_data_for_log(self, raw_data, kind=None): + ''' + Process dictionaries from various contexts (job events, activity stream + changes, etc.) to give meaningful information + Output a dictionary which will be passed in logstash or syslog format + to the logging receiver + ''' + if kind == 'activity_stream': + return raw_data + rename_fields = set(( + 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', + 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', + 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process', + 'processName', 'relativeCreated', 'thread', 'threadName', 'extra', + 'auth_token', 'tags', 'host', 'host_id', 'level', 'port', 'uuid')) + if kind == 'system_tracking': + data = copy(raw_data['facts_data']) + elif kind == 'job_events': + data = copy(raw_data['event_model_data']) + else: + data = copy(raw_data) + if isinstance(data, basestring): + data = json.loads(data) + skip_fields = ('res', 'password', 'event_data', 'stdout') + data_for_log = {} + + def index_by_name(alist): + """Takes a list of dictionaries with `name` as a key in each dict + and returns a dictionary indexed by those names""" + adict = {} + for item in alist: + subdict = copy(item) + if 'name' in subdict: + name = subdict.get('name', None) + elif 'path' in subdict: + name = subdict.get('path', None) + if name: + # Logstash v2 can not accept '.' in a name + name = name.replace('.', '_') + adict[name] = subdict + return adict + + if kind == 'job_events': + data.update(data.get('event_data', {})) + for fd in data: + if fd in skip_fields: + continue + key = fd + if fd in rename_fields: + key = 'event_%s' % fd + val = data[fd] + if key.endswith('created'): + time_float = time.mktime(data[fd].timetuple()) + val = self.format_timestamp(time_float) + data_for_log[key] = val + elif kind == 'system_tracking': + module_name = raw_data['module_name'] + if module_name in ['services', 'packages', 'files']: + data_for_log[module_name] = index_by_name(data) + elif module_name == 'ansible': + data_for_log['ansible'] = data + # Remove sub-keys with data type conflicts in elastic search + data_for_log['ansible'].pop('ansible_python_version', None) + data_for_log['ansible']['ansible_python'].pop('version_info', None) + else: + data_for_log['facts'] = data + data_for_log['module_name'] = module_name + return data_for_log + def get_extra_fields(self, record): fields = super(LogstashFormatter, self).get_extra_fields(record) - fields['cluster_host_id'] = self.host_id + if record.name.startswith('awx.analytics'): + log_kind = record.name.split('.')[-1] + fields = self.reformat_data_for_log(fields, kind=log_kind) return fields def format(self, record): - # Create message dict - # message = record.getMessage() - # print ' message ' + str(message) message = { + # Fields not included, but exist in related logs + # 'path': record.pathname + # '@version': '1', # from python-logstash + # 'tags': self.tags, '@timestamp': self.format_timestamp(record.created), - '@version': '1', 'message': record.getMessage(), 'host': self.host, - 'path': record.pathname, - 'tags': self.tags, 'type': self.message_type, # Extra Fields 'level': record.levelname, 'logger_name': record.name, + 'cluster_host_id': self.host_id } # Add extra fields diff --git a/awx/main/log_utils/handlers.py b/awx/main/log_utils/handlers.py index 614d2aeb66..55027dc7d9 100644 --- a/awx/main/log_utils/handlers.py +++ b/awx/main/log_utils/handlers.py @@ -24,12 +24,11 @@ from requests_futures.sessions import FuturesSession from logstash import formatter # custom +from copy import copy from requests.auth import HTTPBasicAuth from django.conf import settings as django_settings -ENABLED_LOGS = ['ansible'] - # Logstash # https://github.com/vklochan/python-logstash class TCPLogstashHandler(logging.handlers.SocketHandler, object): @@ -64,14 +63,16 @@ PARAM_NAMES = { 'message_type': 'LOG_AGGREGATOR_TYPE', 'username': 'LOG_AGGREGATOR_USERNAME', 'password': 'LOG_AGGREGATOR_PASSWORD', + 'enabled_loggers': 'LOG_AGGREGATOR_LOGGERS', + 'indv_facts': 'LOG_AGGREGATOR_INDIVIDUAL_FACTS', } # TODO: figure out what to do with LOG_AGGREGATOR_LOGGERS (if anything) -def bg_cb(sess, resp): - """ Don't do anything with the response """ +def unused_callback(sess, resp): pass + class HTTPSHandler(logging.Handler): def __init__(self, fqdn=False, **kwargs): super(HTTPSHandler, self).__init__() @@ -83,7 +84,6 @@ class HTTPSHandler(logging.Handler): if settings_val: setattr(self, fd, settings_val) elif fd in kwargs: - attr_name = fd setattr(self, fd, kwargs[fd]) else: setattr(self, fd, None) @@ -100,37 +100,53 @@ class HTTPSHandler(logging.Handler): if self.message_type == 'logstash': if not self.username: # Logstash authentication not enabled - return kwargs + return logstash_auth = HTTPBasicAuth(self.username, self.password) self.session.auth = logstash_auth elif self.message_type == 'splunk': - ## Auth used by Splunk logger library - # self.session.auth = ('x', self.access_token) - # self.session.headers.update({'Content-Encoding': 'gzip'}) - auth_header = "Splunk %s" % self.token - headers = dict(Authorization=auth_header) + auth_header = "Splunk %s" % self.password + headers = { + "Authorization": auth_header, + "Content-Type": "application/json" + } self.session.headers.update(headers) def emit(self, record): + if (self.host == '' or self.enabled_loggers is None or + record.name.split('.')[-1] not in self.enabled_loggers): + return try: payload = self.format(record) - # TODO: move this enablement logic to rely on individual loggers once - # the enablement config variable is hooked up - payload_data = json.loads(payload) - if payload_data['logger_name'].startswith('awx.analytics.system_tracking'): - st_type = None - for fd in ['services', 'packages', 'files', 'ansible']: - if fd in payload_data: - st_type = fd - break - if st_type not in ENABLED_LOGS: - return host = self.host if not host.startswith('http'): host = 'http://%s' % self.host - if self.port != 80: + if self.port != 80 and self.port is not None: host = '%s:%s' % (host, str(self.port)) - kwargs = dict(data=payload, background_callback=bg_cb) + + # Special action for System Tracking, queue up multiple log messages + if self.indv_facts: + payload_data = json.loads(payload) + if record.name.startswith('awx.analytics.system_tracking'): + module_name = payload_data['module_name'] + if module_name in ['services', 'packages', 'files']: + facts_dict = payload_data.pop(module_name) + for key in facts_dict: + fact_payload = copy(payload_data) + fact_payload.update(facts_dict[key]) + kwargs = dict(data=json.dumps(fact_payload), background_callback=unused_callback) + self.session.post(host, **kwargs) + return + + kwargs = dict(data=payload, background_callback=unused_callback) + # # splunk doesn't take "@" in the keys + # if self.message_type == 'splunk': + # payload_dict = json.loads(payload) + # new_dict = {} + # for key in payload_dict: + # new_key = key.replace('@', '') + # new_dict[new_key] = payload_dict[key] + # new_payload = json.dumps(new_dict) + # kwargs['data'] = json.dumps(new_dict) self.session.post(host, **kwargs) except (KeyboardInterrupt, SystemExit): raise @@ -162,9 +178,7 @@ class SplunkLogger(logging.Handler): def _set_auth(self, access_token, project_id, api_domain): # The access token and project id passed as parameter override the ones # configured in the .splunk_logger file. - if access_token is not None\ - and project_id is not None\ - and api_domain is not None: + if access_token is not None and project_id is not None and api_domain is not None: self.project_id = project_id self.access_token = access_token self.api_domain = api_domain @@ -173,9 +187,7 @@ class SplunkLogger(logging.Handler): # Try to get the credentials form the configuration file self.project_id, self.access_token, self.api_domain = parse_config_file() - if self.project_id is None\ - or self.access_token is None\ - or self.api_domain is None: + if self.project_id is None or self.access_token is None or self.api_domain is None: # Try to get the credentials form the environment variables self.project_id, self.access_token, self.api_domain = get_config_from_env() diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index a0082416c2..5a111c6fa2 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -16,10 +16,9 @@ from django.utils import timezone # AWX from awx.main.models.fact import Fact from awx.main.models.inventory import Host -from awx.main.utils import format_for_log logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') -data_logger = logging.getLogger('awx.analytics.system_tracking') +analytics_logger = logging.getLogger('awx.analytics.system_tracking') class FactBrokerWorker(ConsumerMixin): @@ -83,7 +82,8 @@ class FactBrokerWorker(ConsumerMixin): # Create new Fact entry fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts) logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) - data_logger.info('Received message with fact data', extra=format_for_log({module_name: facts}, kind="fact")) + analytics_logger.info('Received message with fact data', extra=dict( + module_name=module_name, facts_data=facts)) return fact_obj diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index fb4cd10638..9671575f30 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -32,7 +32,6 @@ from awx.main.models.notifications import ( from awx.main.utils import ( ignore_inventory_computed_fields, parse_yaml_or_json, - format_for_log ) from awx.main.redact import PlainTextCleaner from awx.main.fields import ImplicitRoleField @@ -44,7 +43,7 @@ from awx.main.consumers import emit_channel_notification logger = logging.getLogger('awx.main.models.jobs') -event_logger = logging.getLogger('awx.analytics.job_events') +analytics_logger = logging.getLogger('awx.analytics.job_events') __all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent', 'SystemJobOptions', 'SystemJobTemplate', 'SystemJob'] @@ -1188,8 +1187,7 @@ class JobEvent(CreatedModifiedModel): if parent_id: kwargs['parent_id'] = parent_id - # event_logger.info('Body: {}'.format(str(data_for_log)), extra=data_for_log) - event_logger.info('Job event data saved.', extra=format_for_log(kwargs, kind='event')) + analytics_logger.info('Job event data saved.', extra=dict(event_model_data=kwargs)) job_event = JobEvent.objects.create(**kwargs) @@ -1201,6 +1199,7 @@ class JobEvent(CreatedModifiedModel): # Save artifact data to parent job (if provided). if artifact_data: artifact_dict = json.loads(artifact_data) + event_data = kwargs.get('event_data', None) if event_data and isinstance(event_data, dict): res = event_data.get('res', None) if res and isinstance(res, dict): diff --git a/awx/main/signals.py b/awx/main/signals.py index 477a9f2257..ceda8899b1 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -20,7 +20,7 @@ from crum.signals import current_user_getter from awx.main.models import * # noqa from awx.api.serializers import * # noqa from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore -from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates, format_for_log +from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates from awx.main.tasks import update_inventory_computed_fields from awx.main.consumers import emit_channel_notification @@ -28,7 +28,6 @@ from awx.main.consumers import emit_channel_notification __all__ = [] logger = logging.getLogger('awx.main.signals') -analytics_logger = logging.getLogger('awx.analytics.activity_stream') # Update has_active_failures for inventory/groups when a Host/Group is deleted, # when a Host-Group or Group-Group relationship is updated, or when a Job is deleted @@ -370,15 +369,11 @@ def activity_stream_create(sender, instance, created, **kwargs): if type(instance) == Job: if 'extra_vars' in changes: changes['extra_vars'] = instance.display_extra_vars() - changes_dict = json.dumps(changes) activity_entry = ActivityStream( operation='create', object1=object1, - changes=changes_dict) + changes=json.dumps(changes)) activity_entry.save() - # analytics_logger.info('Activity Stream create entry for %s' % str(object1), - # extra=format_for_log(changes, kind='activity_stream', - # actor=activity_entry.actor, operation='update', object1=object1)) #TODO: Weird situation where cascade SETNULL doesn't work # it might actually be a good idea to remove all of these FK references since # we don't really use them anyway. @@ -406,9 +401,6 @@ def activity_stream_update(sender, instance, **kwargs): object1=object1, changes=json.dumps(changes)) activity_entry.save() - # analytics_logger.info('Activity Stream update entry for %s' % str(object1), - # extra=format_for_log(changes, kind='activity_stream', - # actor=activity_entry.actor, operation='update', object1=object1)) if instance._meta.model_name != 'setting': # Is not conf.Setting instance getattr(activity_entry, object1).add(instance) diff --git a/awx/main/tests/unit/test_log_formatter.py b/awx/main/tests/unit/test_log_formatter.py index afa2a799e4..1d3b46328b 100644 --- a/awx/main/tests/unit/test_log_formatter.py +++ b/awx/main/tests/unit/test_log_formatter.py @@ -6,12 +6,13 @@ event_data = { 'stdout': u'\x1b[0;36mskipping: [host1]\x1b[0m\r\n', u'uuid': u'ffe4858c-ac38-4cab-9192-b07bdbe80502', u'created': datetime.datetime(2016, 11, 10, 14, 59, 16, 376051), 'counter': 17, u'job_id': 209, u'event': u'runner_on_skipped', 'parent_id': 1937, 'end_line': 24, 'start_line': 23, - u'event_data': {u'play_pattern': u'all', u'play': u'all', u'task': u'Scan files (Windows)', - u'task_args': u'paths={{ scan_file_paths }}, recursive={{ scan_use_recursive }}, get_checksum={{ scan_use_checksum }}', - u'remote_addr': u'host1', u'pid': 1427, u'play_uuid': u'da784361-3811-4ea7-9cc8-46ec758fde66', - u'task_uuid': u'4f9525fd-bc25-4ace-9eb2-adad9fa21a94', u'event_loop': None, - u'playbook_uuid': u'653fd95e-f718-428e-9df0-3f279df9f07e', u'playbook': u'scan_facts.yml', - u'task_action': u'win_scan_files', u'host': u'host1', u'task_path': None}} + u'event_data': { + u'play_pattern': u'all', u'play': u'all', u'task': u'Scan files (Windows)', + u'task_args': u'paths={{ scan_file_paths }}, recursive={{ scan_use_recursive }}, get_checksum={{ scan_use_checksum }}', + u'remote_addr': u'host1', u'pid': 1427, u'play_uuid': u'da784361-3811-4ea7-9cc8-46ec758fde66', + u'task_uuid': u'4f9525fd-bc25-4ace-9eb2-adad9fa21a94', u'event_loop': None, + u'playbook_uuid': u'653fd95e-f718-428e-9df0-3f279df9f07e', u'playbook': u'scan_facts.yml', + u'task_action': u'win_scan_files', u'host': u'host1', u'task_path': None}} event_stats = { 'stdout': u'asdf', u'created': datetime.datetime(2016, 11, 10, 14, 59, 16, 385416), diff --git a/awx/main/utils.py b/awx/main/utils.py index 13ece206a9..00937a84c1 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -17,7 +17,6 @@ import urlparse import threading import contextlib import tempfile -from copy import copy # Decorator from decorator import decorator @@ -825,64 +824,3 @@ class OutputEventFilter(object): self._current_event_data = next_event_data else: self._current_event_data = None - -def format_for_log(raw_data, kind=None, **kwargs): - ''' - Process dictionaries from various contexts (job events, activity stream - changes, etc.) to give meaningful information - Output a dictionary which will be passed in logstash or syslog format - to the logging receiver - ''' - rename_fields = set(( - 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', - 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', - 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process', - 'processName', 'relativeCreated', 'thread', 'threadName', 'extra', - 'auth_token', 'tags', 'host', 'host_id', 'level', 'port', 'uuid')) - data = copy(raw_data) - if isinstance(data, basestring): - data = json.loads(data) - data.update(kwargs) - skip_fields = ('res', 'password', 'event_data', 'stdout') - data_for_log = {} - - def index_by_name(alist): - """Takes a list of dictionaries with `name` as a key in each dict - and returns a dictionary indexed by those names""" - adict = {} - for item in alist: - subdict = copy(item) - name = subdict.pop('name', None) - if name: - # Logstash v2 can not accept '.' in a name - name = name.replace('.', '_') - adict[name] = subdict - return adict - - if kind == 'event': - data.update(data.get('event_data', {})) - for fd in data: - if fd in skip_fields: - continue - key = fd - if fd in rename_fields: - key = 'event_%s' % fd - if type(data[fd]) is dict: - data_for_log[key] = len(data[fd]) - else: - data_for_log[key] = data[fd] - elif kind == 'fact': - if 'services' in data: - data_for_log['services'] = index_by_name(data['services']) - elif 'packages' in data: - data_for_log['packages'] = index_by_name(data['packages']) - elif 'files' in data: - data_for_log['files'] = index_by_name(data['files']) - elif 'ansible' in data: - data_for_log['ansible'] = data['ansible'] - # Remove sub-keys with data type conflicts in elastic search - data_for_log['ansible'].pop('ansible_python_version', None) - data_for_log['ansible']['ansible_python'].pop('version_info', None) - else: - data_for_log['facts'] = data - return data_for_log diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 89afcc0f26..1be3f2ca89 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -842,19 +842,9 @@ LOGGING = { 'simple': { 'format': '%(asctime)s %(levelname)-8s %(name)s %(message)s', }, - 'logstash': { - '()': 'awx.main.log_utils.formatters.LogstashFormatter' - }, 'json': { '()': 'awx.main.log_utils.formatters.LogstashFormatter' } - # From loggly examples - # 'json': { - # 'format': '{ "loggerName":"%(name)s", "asciTime":"%(asctime)s", "fileName":"%(filename)s", "logRecordCreationTime":"%(created)f", "functionName":"%(funcName)s", "levelNo":"%(levelno)s", "lineNo":"%(lineno)d", "time":"%(msecs)d", "levelName":"%(levelname)s", "message":"%(message)s"}', - # }, - # 'json': { - # 'format': '{"message": %(message)s}', - # }, }, 'handlers': { 'console': { @@ -882,20 +872,6 @@ LOGGING = { 'formatter': 'json', 'host': '', }, - 'logstash': { - 'level': 'INFO', - 'class': 'awx.main.log_utils.handlers.HTTPSHandler', - 'host': 'logstash', # IP/name of our Logstash EC2 instance - 'port': 8085, - # 'port': 5000, - # 'version': 1, - 'message_type': 'logstash', - 'fqdn': True, - # 'tags': ['tower'], - 'formatter': 'json', - 'username': 'awx_logger', - 'password': 'workflows', - }, 'mail_admins': { 'level': 'ERROR', 'filters': ['require_debug_false'],