From a448cb17d985f6f94bc1daed04fc1da29da07a24 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 5 Mar 2021 15:18:49 -0500 Subject: [PATCH] Refactor analytics.gather With the change to use pk-based interval slicing for the job events table, we need analytics.gather to be the code that manages all of the "expensive" collector slicing. While we are at it, let's ship each chunked tarball file as we produce it. --- awx/main/analytics/__init__.py | 2 +- awx/main/analytics/collectors.py | 41 ++- awx/main/analytics/core.py | 281 +++++++++--------- awx/main/conf.py | 8 + .../management/commands/gather_analytics.py | 8 +- awx/main/tasks.py | 62 +--- awx/settings/defaults.py | 2 + 7 files changed, 193 insertions(+), 211 deletions(-) diff --git a/awx/main/analytics/__init__.py b/awx/main/analytics/__init__.py index 6aee1cef91..4302dc9cf2 100644 --- a/awx/main/analytics/__init__.py +++ b/awx/main/analytics/__init__.py @@ -1 +1 @@ -from .core import all_collectors, expensive_collectors, register, gather, ship # noqa +from .core import all_collectors, register, gather, ship # noqa diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index abdeb88b6c..2244d65ac5 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -5,9 +5,9 @@ import platform import distro from django.db import connection -from django.db.models import Count +from django.db.models import Count, Max from django.conf import settings -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.translation import ugettext_lazy as _ from awx.conf.license import get_license @@ -33,6 +33,14 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' +def four_hour_slicing(key, since, until): + start, end = since, None + while start < until: + end = min(start + timedelta(hours=4), until) + yield (start, end) + start = end + + @register('config', '1.3', description=_('General platform configuration.')) def config(since, **kwargs): license_info = get_license() @@ -270,7 +278,7 @@ class FileSplitter(io.StringIO): def write(self, s): if not self.header: - self.header = s[0 : s.index('\n')] + self.header = s[: s.index('\n')] self.counter += self.currentfile.write(s) if self.counter >= MAX_TABLE_SIZE: self.cycle_file() @@ -284,7 +292,20 @@ def _copy_table(table, query, path): return file.file_list() -@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=True) +def events_slicing(key, since, until): + from awx.conf.models import Setting + + step = 100000 + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = last_entries.value if last_entries is not None else {} + previous_pk = last_entries.get(key) or 0 + final_pk = models.JobEvent.objects.filter(created_lte=until).aggregate(Max('pk'))['pk__max'] + + for start in range(previous_pk, final_pk + 1, step): + yield (start, min(start + step, final_pk)) + + +@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing) def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, main_jobevent.created, @@ -302,22 +323,22 @@ def events_table(since, full_path, until, **kwargs): main_jobevent.role, main_jobevent.job_id, main_jobevent.host_id, - main_jobevent.host_name - , CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, + main_jobevent.host_name, + CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, CAST(main_jobevent.event_data::json->>'end' AS TIMESTAMP WITH TIME ZONE) AS end, main_jobevent.event_data::json->'duration' AS duration, main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'deprecations' AS deprecations FROM main_jobevent - WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}') + WHERE (main_jobevent.id > {} AND main_jobevent.id <= {}) ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER '''.format( - since.isoformat(), until.isoformat() + since, until ) return _copy_table(table='events', query=events_query, path=full_path) -@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=True) +@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing) def unified_jobs_table(since, full_path, until, **kwargs): unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, @@ -381,7 +402,7 @@ def unified_job_template_table(since, full_path, **kwargs): return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) -@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True) +@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=four_hour_slicing) def workflow_job_node_table(since, full_path, until, **kwargs): workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 69b992a1c2..cb3deee215 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -1,20 +1,24 @@ import inspect +import io import json import logging import os import os.path -import tempfile +import pathlib import shutil -import requests +import tarfile +import tempfile from django.conf import settings from django.utils.timezone import now, timedelta from rest_framework.exceptions import PermissionDenied +import requests from awx.conf.license import get_license from awx.main.models import Job from awx.main.access import access_registry from awx.main.utils import get_awx_http_client_headers, set_environ +from awx.main.utils.pglock import advisory_lock __all__ = ['register', 'gather', 'ship'] @@ -36,29 +40,18 @@ def _valid_license(): def all_collectors(): from awx.main.analytics import collectors - collector_dict = {} - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): - key = func.__awx_analytics_key__ - desc = func.__awx_analytics_description__ or '' - version = func.__awx_analytics_version__ - collector_dict[key] = {'name': key, 'version': version, 'description': desc} - return collector_dict + return { + func.__awx_analytics_key__: { + 'name': func.__awx_analytics_key__, + 'version': func.__awx_analytics_version__, + 'description': func.__awx_analytics_description__ or '', + } + for name, func in inspect.getmembers(collectors) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') + } -def expensive_collectors(): - from awx.main.analytics import collectors - - ret = [] - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__: - ret.append(func.__awx_analytics_key__) - return ret - - -def register(key, version, description=None, format='json', expensive=False): +def register(key, version, description=None, format='json', expensive=None): """ A decorator used to register a function as a metric collector. @@ -82,136 +75,154 @@ def register(key, version, description=None, format='json', expensive=False): return decorate -def gather(dest=None, module=None, subset=None, since=None, until=now(), collection_type='scheduled'): +def package(target, data, timestamp): + try: + tarname_base = f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}' + path = pathlib.Path(target) + index = len(list(path.glob(f'{tarname_base}-*.*'))) + tarname = f'{tarname_base}-{index}.tar.gz' + + manifest = {} + with tarfile.open(target.joinpath(tarname), 'w:gz') as f: + for name, (item, version) in data.items(): + manifest[name] = version + if isinstance(item, str): + info = f.gettarinfo(item, arcname=name) + f.addfile(info) + else: + info = tarfile.TarInfo(name) + fileobj = io.BytesIO(json.dumps(item).encode('utf-8')) + fileobj.size = len(fileobj.getvalue()) + f.addfile(info, fileobj=fileobj) + + info = tarfile.TarInfo('manifest.json') + fileobj = io.BytesIO(json.dumps(manifest).encode('utf-8')) + fileobj.size = len(fileobj.getvalue()) + f.addfile(info, fileobj=fileobj) + + return f.name + except Exception: + logger.exception("Failed to write analytics archive file") + return None + + +def gather(dest=None, module=None, subset=None, since=None, until=None, collection_type='scheduled'): """ Gather all defined metrics and write them as JSON files in a .tgz - :param dest: the (optional) absolute path to write a compressed tarball + :param dest: the (optional) absolute path to write a compressed tarball :param module: the module to search for registered analytic collector - functions; defaults to awx.main.analytics.collectors + functions; defaults to awx.main.analytics.collectors """ - def _write_manifest(destdir, manifest): - path = os.path.join(destdir, 'manifest.json') - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(manifest, f) - except Exception: - f.close() - os.remove(f.name) - logger.exception("Could not generate manifest.json") - - last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4)) - logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) - - if _valid_license() is False: - logger.exception("Invalid License provided, or No License Provided") + if not _valid_license(): + logger.error("Invalid License provided, or No License Provided") return None - if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE: - logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") - return None + if collection_type != 'dry-run': + if not settings.INSIGHTS_TRACKING_STATE: + logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") + return None - collector_list = [] - if module: - collector_module = module - else: - from awx.main.analytics import collectors + if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): + logger.debug('Not gathering analytics, configuration is invalid') + return None - collector_module = collectors - for name, func in inspect.getmembers(collector_module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset): - collector_list.append((name, func)) + with advisory_lock('gather_analytics_lock', wait=False) as acquired: + if not acquired: + logger.debug('Not gathering analytics, another task holds lock') + return None - manifest = dict() - dest = dest or tempfile.mkdtemp(prefix='awx_analytics') - gather_dir = os.path.join(dest, 'stage') - os.mkdir(gather_dir, 0o700) - num_splits = 1 - for name, func in collector_list: - if func.__awx_analytics_type__ == 'json': + from awx.conf.models import Setting + from awx.main.signals import disable_activity_stream + + if until is None: + until = now() + last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (until - timedelta(weeks=4)) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = last_entries.value if last_entries is not None else {} + logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) + + if module: + collector_module = module + else: + from awx.main.analytics import collectors + + collector_module = collectors + if subset: # ensure that the config collector is added to any explicit subset of our builtins + subset = set(subset) | { + 'config', + } + + collector_list = [ + func + for name, func in inspect.getmembers(collector_module) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset) + ] + json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json'] + csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv'] + + dest = pathlib.Path(dest or tempfile.mkdtemp(prefix='awx_analytics')) + gather_dir = dest.joinpath('stage') + gather_dir.mkdir(mode=0o700) + tarfiles = [] + + # These json collectors are pretty compact, so collect all of them before shipping to analytics. + data = {} + for func in json_collectors: key = func.__awx_analytics_key__ - path = '{}.json'.format(os.path.join(gather_dir, key)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(func(last_run, collection_type=collection_type, until=until), f) - manifest['{}.json'.format(key)] = func.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - elif func.__awx_analytics_type__ == 'csv': + filename = f'{key}.json' + try: + data[filename] = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__) + except Exception: + logger.exception("Could not generate metric {}".format(filename)) + if data: + tgzfile = package(dest.parent, data, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + + if collection_type != 'dry-run': + ship(tgzfile) + with disable_activity_stream(): + for filename in data: + last_entries[filename.replace('.json', '')] = until + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries + + for func in csv_collectors: key = func.__awx_analytics_key__ + filename = f'{key}.csv' try: - files = func(last_run, full_path=gather_dir, until=until) - if files: - manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ - if len(files) > num_splits: - num_splits = len(files) + slices = [(last_run, until)] + if func.__awx_expensive__: + slices = func.__awx_expensive__(key, last_run, until) # it's ok if this returns a generator + for start, end in slices: + files = func(start, full_path=gather_dir, until=end) + + if not files: + continue + for fpath in files: + tgzfile = package(dest.parent, {filename: (fpath, func.__awx_analytics_version__)}, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + + if collection_type != 'dry-run': + ship(tgzfile) + with disable_activity_stream(): + last_entries[key] = end + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries except Exception: - logger.exception("Could not generate metric {}.csv".format(key)) + logger.exception("Could not generate metric {}".format(filename)) - if not manifest: - # No data was collected - logger.warning("No data from {} to {}".format(last_run, until)) - shutil.rmtree(dest) - return None + with disable_activity_stream(): + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until - # Always include config.json if we're using our collectors - if 'config.json' not in manifest.keys() and not module: - from awx.main.analytics import collectors + shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files + if not tarfiles: + # No data was collected + logger.warning("No data from {} to {}".format(last_run, until)) + return None - config = collectors.config - path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(collectors.config(last_run), f) - manifest['config.json'] = config.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - shutil.rmtree(dest) - return None - - stage_dirs = [gather_dir] - if num_splits > 1: - for i in range(0, num_splits): - split_path = os.path.join(dest, 'split{}'.format(i)) - os.mkdir(split_path, 0o700) - filtered_manifest = {} - shutil.copy(os.path.join(gather_dir, 'config.json'), split_path) - filtered_manifest['config.json'] = manifest['config.json'] - suffix = '_split{}'.format(i) - for file in os.listdir(gather_dir): - if file.endswith(suffix): - old_file = os.path.join(gather_dir, file) - new_filename = file.replace(suffix, '') - new_file = os.path.join(split_path, new_filename) - shutil.move(old_file, new_file) - filtered_manifest[new_filename] = manifest[new_filename] - _write_manifest(split_path, filtered_manifest) - stage_dirs.append(split_path) - - for item in list(manifest.keys()): - if not os.path.exists(os.path.join(gather_dir, item)): - manifest.pop(item) - _write_manifest(gather_dir, manifest) - - tarfiles = [] - try: - for i in range(0, len(stage_dirs)): - stage_dir = stage_dirs[i] - # can't use isoformat() since it has colons, which GNU tar doesn't like - tarname = '_'.join([settings.SYSTEM_UUID, until.strftime('%Y-%m-%d-%H%M%S%z'), str(i)]) - tgz = shutil.make_archive(os.path.join(os.path.dirname(dest), tarname), 'gztar', stage_dir) - tarfiles.append(tgz) - except Exception: - shutil.rmtree(stage_dir, ignore_errors=True) - logger.exception("Failed to write analytics archive file") - finally: - shutil.rmtree(dest, ignore_errors=True) - return tarfiles + return tarfiles def ship(path): diff --git a/awx/main/conf.py b/awx/main/conf.py index 5cfd2977f7..3699d3f685 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -10,6 +10,7 @@ from rest_framework.fields import FloatField # Tower from awx.conf import fields, register, register_validate +from awx.main.fields import JSONField from awx.main.models import ExecutionEnvironment @@ -778,6 +779,13 @@ register( category=_('System'), category_slug='system', ) +register( + 'AUTOMATION_ANALYTICS_LAST_ENTRIES', + field_class=JSONField, + # label=_('Last gathered entries for expensive Automation Analytics collectors.'), + category=_('System'), + category_slug='system', +) register( diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index 5099d4d0d1..25fffb4dec 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -1,6 +1,6 @@ import logging -from awx.main.analytics import gather, ship +from awx.main import analytics from dateutil import parser from django.core.management.base import BaseCommand from django.utils.timezone import now @@ -48,13 +48,9 @@ class Command(BaseCommand): if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) + tgzfiles = analytics.gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) if tgzfiles: for tgz in tgzfiles: self.logger.info(tgz) else: self.logger.error('No analytics collected') - if opt_ship: - if tgzfiles: - for tgz in tgzfiles: - ship(tgz) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 992620fbd8..dfd63569ec 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -57,7 +57,6 @@ from receptorctl.socket_interface import ReceptorControl from awx import __version__ as awx_application_version from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.access import access_registry -from awx.main.analytics import all_collectors, expensive_collectors from awx.main.redact import UriCleaner from awx.main.models import ( Schedule, @@ -377,70 +376,15 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): - def _gather_and_ship(subset, since, until): - tgzfiles = [] - try: - tgzfiles = analytics.gather(subset=subset, since=since, until=until) - # empty analytics without raising an exception is not an error - if not tgzfiles: - return True - logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles)) - for tgz in tgzfiles: - analytics.ship(tgz) - except Exception: - logger.exception('Error gathering and sending analytics for {} to {}.'.format(since, until)) - return False - finally: - if tgzfiles: - for tgz in tgzfiles: - if os.path.exists(tgz): - os.remove(tgz) - return True - from awx.conf.models import Setting from rest_framework.fields import DateTimeField - from awx.main.signals import disable_activity_stream - if not settings.INSIGHTS_TRACKING_STATE: - return - if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): - logger.debug('Not gathering analytics, configuration is invalid') - return last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first() - if last_gather: - last_time = DateTimeField().to_internal_value(last_gather.value) - else: - last_time = None + last_time = DateTimeField().to_internal_value(last_gather.value) if last_gather else None gather_time = now() + if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): - with advisory_lock('gather_analytics_lock', wait=False) as acquired: - if acquired is False: - logger.debug('Not gathering analytics, another task holds lock') - return - subset = list(all_collectors().keys()) - incremental_collectors = [] - for collector in expensive_collectors(): - if collector in subset: - subset.remove(collector) - incremental_collectors.append(collector) - - # Cap gathering at 4 weeks of data if there has been no data gathering - since = last_time or (gather_time - timedelta(weeks=4)) - - if incremental_collectors: - start = since - until = None - while start < gather_time: - until = start + timedelta(hours=4) - if until > gather_time: - until = gather_time - if not _gather_and_ship(incremental_collectors, since=start, until=until): - break - start = until - with disable_activity_stream(): - settings.AUTOMATION_ANALYTICS_LAST_GATHER = until - if subset: - _gather_and_ship(subset, since=since, until=gather_time) + analytics.gather() @task(queue=get_local_queuename) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b6a3966647..1a91d392c7 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -620,6 +620,8 @@ INSIGHTS_TRACKING_STATE = False # Last gather date for Analytics AUTOMATION_ANALYTICS_LAST_GATHER = None +# Last gathered entries for expensive Analytics +AUTOMATION_ANALYTICS_LAST_ENTRIES = None # Default list of modules allowed for ad hoc commands. # Note: This setting may be overridden by database settings.