From a448cb17d985f6f94bc1daed04fc1da29da07a24 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 5 Mar 2021 15:18:49 -0500 Subject: [PATCH 01/15] Refactor analytics.gather With the change to use pk-based interval slicing for the job events table, we need analytics.gather to be the code that manages all of the "expensive" collector slicing. While we are at it, let's ship each chunked tarball file as we produce it. --- awx/main/analytics/__init__.py | 2 +- awx/main/analytics/collectors.py | 41 ++- awx/main/analytics/core.py | 281 +++++++++--------- awx/main/conf.py | 8 + .../management/commands/gather_analytics.py | 8 +- awx/main/tasks.py | 62 +--- awx/settings/defaults.py | 2 + 7 files changed, 193 insertions(+), 211 deletions(-) diff --git a/awx/main/analytics/__init__.py b/awx/main/analytics/__init__.py index 6aee1cef91..4302dc9cf2 100644 --- a/awx/main/analytics/__init__.py +++ b/awx/main/analytics/__init__.py @@ -1 +1 @@ -from .core import all_collectors, expensive_collectors, register, gather, ship # noqa +from .core import all_collectors, register, gather, ship # noqa diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index abdeb88b6c..2244d65ac5 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -5,9 +5,9 @@ import platform import distro from django.db import connection -from django.db.models import Count +from django.db.models import Count, Max from django.conf import settings -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.translation import ugettext_lazy as _ from awx.conf.license import get_license @@ -33,6 +33,14 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' +def four_hour_slicing(key, since, until): + start, end = since, None + while start < until: + end = min(start + timedelta(hours=4), until) + yield (start, end) + start = end + + @register('config', '1.3', description=_('General platform configuration.')) def config(since, **kwargs): license_info = get_license() @@ -270,7 +278,7 @@ class FileSplitter(io.StringIO): def write(self, s): if not self.header: - self.header = s[0 : s.index('\n')] + self.header = s[: s.index('\n')] self.counter += self.currentfile.write(s) if self.counter >= MAX_TABLE_SIZE: self.cycle_file() @@ -284,7 +292,20 @@ def _copy_table(table, query, path): return file.file_list() -@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=True) +def events_slicing(key, since, until): + from awx.conf.models import Setting + + step = 100000 + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = last_entries.value if last_entries is not None else {} + previous_pk = last_entries.get(key) or 0 + final_pk = models.JobEvent.objects.filter(created_lte=until).aggregate(Max('pk'))['pk__max'] + + for start in range(previous_pk, final_pk + 1, step): + yield (start, min(start + step, final_pk)) + + +@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing) def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, main_jobevent.created, @@ -302,22 +323,22 @@ def events_table(since, full_path, until, **kwargs): main_jobevent.role, main_jobevent.job_id, main_jobevent.host_id, - main_jobevent.host_name - , CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, + main_jobevent.host_name, + CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, CAST(main_jobevent.event_data::json->>'end' AS TIMESTAMP WITH TIME ZONE) AS end, main_jobevent.event_data::json->'duration' AS duration, main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'deprecations' AS deprecations FROM main_jobevent - WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}') + WHERE (main_jobevent.id > {} AND main_jobevent.id <= {}) ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER '''.format( - since.isoformat(), until.isoformat() + since, until ) return _copy_table(table='events', query=events_query, path=full_path) -@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=True) +@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing) def unified_jobs_table(since, full_path, until, **kwargs): unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, @@ -381,7 +402,7 @@ def unified_job_template_table(since, full_path, **kwargs): return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) -@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True) +@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=four_hour_slicing) def workflow_job_node_table(since, full_path, until, **kwargs): workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 69b992a1c2..cb3deee215 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -1,20 +1,24 @@ import inspect +import io import json import logging import os import os.path -import tempfile +import pathlib import shutil -import requests +import tarfile +import tempfile from django.conf import settings from django.utils.timezone import now, timedelta from rest_framework.exceptions import PermissionDenied +import requests from awx.conf.license import get_license from awx.main.models import Job from awx.main.access import access_registry from awx.main.utils import get_awx_http_client_headers, set_environ +from awx.main.utils.pglock import advisory_lock __all__ = ['register', 'gather', 'ship'] @@ -36,29 +40,18 @@ def _valid_license(): def all_collectors(): from awx.main.analytics import collectors - collector_dict = {} - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): - key = func.__awx_analytics_key__ - desc = func.__awx_analytics_description__ or '' - version = func.__awx_analytics_version__ - collector_dict[key] = {'name': key, 'version': version, 'description': desc} - return collector_dict + return { + func.__awx_analytics_key__: { + 'name': func.__awx_analytics_key__, + 'version': func.__awx_analytics_version__, + 'description': func.__awx_analytics_description__ or '', + } + for name, func in inspect.getmembers(collectors) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') + } -def expensive_collectors(): - from awx.main.analytics import collectors - - ret = [] - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__: - ret.append(func.__awx_analytics_key__) - return ret - - -def register(key, version, description=None, format='json', expensive=False): +def register(key, version, description=None, format='json', expensive=None): """ A decorator used to register a function as a metric collector. @@ -82,136 +75,154 @@ def register(key, version, description=None, format='json', expensive=False): return decorate -def gather(dest=None, module=None, subset=None, since=None, until=now(), collection_type='scheduled'): +def package(target, data, timestamp): + try: + tarname_base = f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}' + path = pathlib.Path(target) + index = len(list(path.glob(f'{tarname_base}-*.*'))) + tarname = f'{tarname_base}-{index}.tar.gz' + + manifest = {} + with tarfile.open(target.joinpath(tarname), 'w:gz') as f: + for name, (item, version) in data.items(): + manifest[name] = version + if isinstance(item, str): + info = f.gettarinfo(item, arcname=name) + f.addfile(info) + else: + info = tarfile.TarInfo(name) + fileobj = io.BytesIO(json.dumps(item).encode('utf-8')) + fileobj.size = len(fileobj.getvalue()) + f.addfile(info, fileobj=fileobj) + + info = tarfile.TarInfo('manifest.json') + fileobj = io.BytesIO(json.dumps(manifest).encode('utf-8')) + fileobj.size = len(fileobj.getvalue()) + f.addfile(info, fileobj=fileobj) + + return f.name + except Exception: + logger.exception("Failed to write analytics archive file") + return None + + +def gather(dest=None, module=None, subset=None, since=None, until=None, collection_type='scheduled'): """ Gather all defined metrics and write them as JSON files in a .tgz - :param dest: the (optional) absolute path to write a compressed tarball + :param dest: the (optional) absolute path to write a compressed tarball :param module: the module to search for registered analytic collector - functions; defaults to awx.main.analytics.collectors + functions; defaults to awx.main.analytics.collectors """ - def _write_manifest(destdir, manifest): - path = os.path.join(destdir, 'manifest.json') - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(manifest, f) - except Exception: - f.close() - os.remove(f.name) - logger.exception("Could not generate manifest.json") - - last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4)) - logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) - - if _valid_license() is False: - logger.exception("Invalid License provided, or No License Provided") + if not _valid_license(): + logger.error("Invalid License provided, or No License Provided") return None - if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE: - logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") - return None + if collection_type != 'dry-run': + if not settings.INSIGHTS_TRACKING_STATE: + logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") + return None - collector_list = [] - if module: - collector_module = module - else: - from awx.main.analytics import collectors + if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): + logger.debug('Not gathering analytics, configuration is invalid') + return None - collector_module = collectors - for name, func in inspect.getmembers(collector_module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset): - collector_list.append((name, func)) + with advisory_lock('gather_analytics_lock', wait=False) as acquired: + if not acquired: + logger.debug('Not gathering analytics, another task holds lock') + return None - manifest = dict() - dest = dest or tempfile.mkdtemp(prefix='awx_analytics') - gather_dir = os.path.join(dest, 'stage') - os.mkdir(gather_dir, 0o700) - num_splits = 1 - for name, func in collector_list: - if func.__awx_analytics_type__ == 'json': + from awx.conf.models import Setting + from awx.main.signals import disable_activity_stream + + if until is None: + until = now() + last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (until - timedelta(weeks=4)) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = last_entries.value if last_entries is not None else {} + logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) + + if module: + collector_module = module + else: + from awx.main.analytics import collectors + + collector_module = collectors + if subset: # ensure that the config collector is added to any explicit subset of our builtins + subset = set(subset) | { + 'config', + } + + collector_list = [ + func + for name, func in inspect.getmembers(collector_module) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset) + ] + json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json'] + csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv'] + + dest = pathlib.Path(dest or tempfile.mkdtemp(prefix='awx_analytics')) + gather_dir = dest.joinpath('stage') + gather_dir.mkdir(mode=0o700) + tarfiles = [] + + # These json collectors are pretty compact, so collect all of them before shipping to analytics. + data = {} + for func in json_collectors: key = func.__awx_analytics_key__ - path = '{}.json'.format(os.path.join(gather_dir, key)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(func(last_run, collection_type=collection_type, until=until), f) - manifest['{}.json'.format(key)] = func.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - elif func.__awx_analytics_type__ == 'csv': + filename = f'{key}.json' + try: + data[filename] = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__) + except Exception: + logger.exception("Could not generate metric {}".format(filename)) + if data: + tgzfile = package(dest.parent, data, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + + if collection_type != 'dry-run': + ship(tgzfile) + with disable_activity_stream(): + for filename in data: + last_entries[filename.replace('.json', '')] = until + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries + + for func in csv_collectors: key = func.__awx_analytics_key__ + filename = f'{key}.csv' try: - files = func(last_run, full_path=gather_dir, until=until) - if files: - manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ - if len(files) > num_splits: - num_splits = len(files) + slices = [(last_run, until)] + if func.__awx_expensive__: + slices = func.__awx_expensive__(key, last_run, until) # it's ok if this returns a generator + for start, end in slices: + files = func(start, full_path=gather_dir, until=end) + + if not files: + continue + for fpath in files: + tgzfile = package(dest.parent, {filename: (fpath, func.__awx_analytics_version__)}, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + + if collection_type != 'dry-run': + ship(tgzfile) + with disable_activity_stream(): + last_entries[key] = end + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries except Exception: - logger.exception("Could not generate metric {}.csv".format(key)) + logger.exception("Could not generate metric {}".format(filename)) - if not manifest: - # No data was collected - logger.warning("No data from {} to {}".format(last_run, until)) - shutil.rmtree(dest) - return None + with disable_activity_stream(): + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until - # Always include config.json if we're using our collectors - if 'config.json' not in manifest.keys() and not module: - from awx.main.analytics import collectors + shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files + if not tarfiles: + # No data was collected + logger.warning("No data from {} to {}".format(last_run, until)) + return None - config = collectors.config - path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(collectors.config(last_run), f) - manifest['config.json'] = config.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - shutil.rmtree(dest) - return None - - stage_dirs = [gather_dir] - if num_splits > 1: - for i in range(0, num_splits): - split_path = os.path.join(dest, 'split{}'.format(i)) - os.mkdir(split_path, 0o700) - filtered_manifest = {} - shutil.copy(os.path.join(gather_dir, 'config.json'), split_path) - filtered_manifest['config.json'] = manifest['config.json'] - suffix = '_split{}'.format(i) - for file in os.listdir(gather_dir): - if file.endswith(suffix): - old_file = os.path.join(gather_dir, file) - new_filename = file.replace(suffix, '') - new_file = os.path.join(split_path, new_filename) - shutil.move(old_file, new_file) - filtered_manifest[new_filename] = manifest[new_filename] - _write_manifest(split_path, filtered_manifest) - stage_dirs.append(split_path) - - for item in list(manifest.keys()): - if not os.path.exists(os.path.join(gather_dir, item)): - manifest.pop(item) - _write_manifest(gather_dir, manifest) - - tarfiles = [] - try: - for i in range(0, len(stage_dirs)): - stage_dir = stage_dirs[i] - # can't use isoformat() since it has colons, which GNU tar doesn't like - tarname = '_'.join([settings.SYSTEM_UUID, until.strftime('%Y-%m-%d-%H%M%S%z'), str(i)]) - tgz = shutil.make_archive(os.path.join(os.path.dirname(dest), tarname), 'gztar', stage_dir) - tarfiles.append(tgz) - except Exception: - shutil.rmtree(stage_dir, ignore_errors=True) - logger.exception("Failed to write analytics archive file") - finally: - shutil.rmtree(dest, ignore_errors=True) - return tarfiles + return tarfiles def ship(path): diff --git a/awx/main/conf.py b/awx/main/conf.py index 5cfd2977f7..3699d3f685 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -10,6 +10,7 @@ from rest_framework.fields import FloatField # Tower from awx.conf import fields, register, register_validate +from awx.main.fields import JSONField from awx.main.models import ExecutionEnvironment @@ -778,6 +779,13 @@ register( category=_('System'), category_slug='system', ) +register( + 'AUTOMATION_ANALYTICS_LAST_ENTRIES', + field_class=JSONField, + # label=_('Last gathered entries for expensive Automation Analytics collectors.'), + category=_('System'), + category_slug='system', +) register( diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index 5099d4d0d1..25fffb4dec 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -1,6 +1,6 @@ import logging -from awx.main.analytics import gather, ship +from awx.main import analytics from dateutil import parser from django.core.management.base import BaseCommand from django.utils.timezone import now @@ -48,13 +48,9 @@ class Command(BaseCommand): if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) + tgzfiles = analytics.gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) if tgzfiles: for tgz in tgzfiles: self.logger.info(tgz) else: self.logger.error('No analytics collected') - if opt_ship: - if tgzfiles: - for tgz in tgzfiles: - ship(tgz) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 992620fbd8..dfd63569ec 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -57,7 +57,6 @@ from receptorctl.socket_interface import ReceptorControl from awx import __version__ as awx_application_version from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.access import access_registry -from awx.main.analytics import all_collectors, expensive_collectors from awx.main.redact import UriCleaner from awx.main.models import ( Schedule, @@ -377,70 +376,15 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): - def _gather_and_ship(subset, since, until): - tgzfiles = [] - try: - tgzfiles = analytics.gather(subset=subset, since=since, until=until) - # empty analytics without raising an exception is not an error - if not tgzfiles: - return True - logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles)) - for tgz in tgzfiles: - analytics.ship(tgz) - except Exception: - logger.exception('Error gathering and sending analytics for {} to {}.'.format(since, until)) - return False - finally: - if tgzfiles: - for tgz in tgzfiles: - if os.path.exists(tgz): - os.remove(tgz) - return True - from awx.conf.models import Setting from rest_framework.fields import DateTimeField - from awx.main.signals import disable_activity_stream - if not settings.INSIGHTS_TRACKING_STATE: - return - if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): - logger.debug('Not gathering analytics, configuration is invalid') - return last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first() - if last_gather: - last_time = DateTimeField().to_internal_value(last_gather.value) - else: - last_time = None + last_time = DateTimeField().to_internal_value(last_gather.value) if last_gather else None gather_time = now() + if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): - with advisory_lock('gather_analytics_lock', wait=False) as acquired: - if acquired is False: - logger.debug('Not gathering analytics, another task holds lock') - return - subset = list(all_collectors().keys()) - incremental_collectors = [] - for collector in expensive_collectors(): - if collector in subset: - subset.remove(collector) - incremental_collectors.append(collector) - - # Cap gathering at 4 weeks of data if there has been no data gathering - since = last_time or (gather_time - timedelta(weeks=4)) - - if incremental_collectors: - start = since - until = None - while start < gather_time: - until = start + timedelta(hours=4) - if until > gather_time: - until = gather_time - if not _gather_and_ship(incremental_collectors, since=start, until=until): - break - start = until - with disable_activity_stream(): - settings.AUTOMATION_ANALYTICS_LAST_GATHER = until - if subset: - _gather_and_ship(subset, since=since, until=gather_time) + analytics.gather() @task(queue=get_local_queuename) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b6a3966647..1a91d392c7 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -620,6 +620,8 @@ INSIGHTS_TRACKING_STATE = False # Last gather date for Analytics AUTOMATION_ANALYTICS_LAST_GATHER = None +# Last gathered entries for expensive Analytics +AUTOMATION_ANALYTICS_LAST_ENTRIES = None # Default list of modules allowed for ad hoc commands. # Note: This setting may be overridden by database settings. From 3e4e255d3f30e4114680877cc30e49d6e0838c95 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 8 Mar 2021 11:27:23 -0500 Subject: [PATCH 02/15] Require config collector data for all posts to analytics We need the cluster ID when consuming all incoming data. --- awx/main/analytics/core.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index cb3deee215..938c110daf 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -134,6 +134,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti return None from awx.conf.models import Setting + from awx.main.analytics import collectors from awx.main.signals import disable_activity_stream if until is None: @@ -143,22 +144,14 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti last_entries = last_entries.value if last_entries is not None else {} logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) - if module: - collector_module = module - else: - from awx.main.analytics import collectors - - collector_module = collectors - if subset: # ensure that the config collector is added to any explicit subset of our builtins - subset = set(subset) | { - 'config', - } - + collector_module = module if module else collectors collector_list = [ func for name, func in inspect.getmembers(collector_module) if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset) ] + if collection_type != 'dry-run' and not any(c.__awx_analytics_key__ == 'config' for c in collector_list): + collector_list.append(collectors.config) json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json'] csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv'] @@ -182,6 +175,9 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti tarfiles.append(tgzfile) if collection_type != 'dry-run': + if data.get('config.json') is None: + logger.error("'config' collector data is missing, and is required to ship.") + return None ship(tgzfile) with disable_activity_stream(): for filename in data: @@ -201,7 +197,13 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if not files: continue for fpath in files: - tgzfile = package(dest.parent, {filename: (fpath, func.__awx_analytics_version__)}, until) + payload = {filename: (fpath, func.__awx_analytics_version__)} + if collection_type != 'dry-run': + payload['config.json'] = data.get('config.json') + if payload['config.json'] is None: + logger.error("'config' collector data is missing, and is required to ship.") + return None + tgzfile = package(dest.parent, payload, until) if tgzfile is not None: tarfiles.append(tgzfile) From 77f7e88e687990346e8765b2d427c1c50c18a005 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 12 Mar 2021 11:11:50 -0500 Subject: [PATCH 03/15] Address the problems with trying to use a JSONField --- awx/main/analytics/collectors.py | 11 +++++++---- awx/main/analytics/core.py | 8 +++++--- awx/main/conf.py | 7 ++++--- awx/settings/defaults.py | 2 +- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 2244d65ac5..2ca217c8d7 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -1,11 +1,12 @@ import io +import json import os import os.path import platform import distro from django.db import connection -from django.db.models import Count, Max +from django.db.models import Count, Max, Min from django.conf import settings from django.utils.timezone import now, timedelta from django.utils.translation import ugettext_lazy as _ @@ -295,11 +296,13 @@ def _copy_table(table, query, path): def events_slicing(key, since, until): from awx.conf.models import Setting + pk_values = models.JobEvent.objects.filter(created__gte=since, created__lte=until).aggregate(Min('pk'), Max('pk')) + step = 100000 last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = last_entries.value if last_entries is not None else {} - previous_pk = last_entries.get(key) or 0 - final_pk = models.JobEvent.objects.filter(created_lte=until).aggregate(Max('pk'))['pk__max'] + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + previous_pk = last_entries.get(key) or pk_values['pk__min'] or 0 + final_pk = pk_values['pk__max'] for start in range(previous_pk, final_pk + 1, step): yield (start, min(start + step, final_pk)) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 938c110daf..7abcd76b9f 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -141,7 +141,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti until = now() last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (until - timedelta(weeks=4)) last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = last_entries.value if last_entries is not None else {} + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) collector_module = module if module else collectors @@ -151,7 +151,9 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset) ] if collection_type != 'dry-run' and not any(c.__awx_analytics_key__ == 'config' for c in collector_list): + # In order to ship to analytics, we must include the output of the built-in 'config' collector. collector_list.append(collectors.config) + json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json'] csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv'] @@ -182,7 +184,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti with disable_activity_stream(): for filename in data: last_entries[filename.replace('.json', '')] = until - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries) for func in csv_collectors: key = func.__awx_analytics_key__ @@ -211,7 +213,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti ship(tgzfile) with disable_activity_stream(): last_entries[key] = end - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries) except Exception: logger.exception("Could not generate metric {}".format(filename)) diff --git a/awx/main/conf.py b/awx/main/conf.py index 3699d3f685..5c5df72218 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -10,7 +10,6 @@ from rest_framework.fields import FloatField # Tower from awx.conf import fields, register, register_validate -from awx.main.fields import JSONField from awx.main.models import ExecutionEnvironment @@ -781,8 +780,10 @@ register( ) register( 'AUTOMATION_ANALYTICS_LAST_ENTRIES', - field_class=JSONField, - # label=_('Last gathered entries for expensive Automation Analytics collectors.'), + field_class=fields.CharField, + label=_('Last gathered entries for expensive Automation Analytics collectors.'), + default='', + allow_blank=True, category=_('System'), category_slug='system', ) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 1a91d392c7..93ad306b1e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -621,7 +621,7 @@ INSIGHTS_TRACKING_STATE = False # Last gather date for Analytics AUTOMATION_ANALYTICS_LAST_GATHER = None # Last gathered entries for expensive Analytics -AUTOMATION_ANALYTICS_LAST_ENTRIES = None +AUTOMATION_ANALYTICS_LAST_ENTRIES = '' # Default list of modules allowed for ad hoc commands. # Note: This setting may be overridden by database settings. From 1bf37266b431f39904f41fc5e6aace253a3d31af Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 12 Mar 2021 11:50:03 -0500 Subject: [PATCH 04/15] Differentiate the log level depending on whether we are run from the task --- awx/main/analytics/core.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 7abcd76b9f..c551c3d992 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -114,23 +114,24 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti :param module: the module to search for registered analytic collector functions; defaults to awx.main.analytics.collectors """ + log_level = logging.ERROR if collection_type != 'scheduled' else logging.DEBUG if not _valid_license(): - logger.error("Invalid License provided, or No License Provided") + logger.log(log_level, "Invalid License provided, or No License Provided") return None if collection_type != 'dry-run': if not settings.INSIGHTS_TRACKING_STATE: - logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") + logger.log(log_level, "Automation Analytics not enabled. Use --dry-run to gather locally without sending.") return None if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): - logger.debug('Not gathering analytics, configuration is invalid') + logger.log(log_level, "Not gathering analytics, configuration is invalid") return None with advisory_lock('gather_analytics_lock', wait=False) as acquired: if not acquired: - logger.debug('Not gathering analytics, another task holds lock') + logger.log(log_level, "Not gathering analytics, another task holds lock") return None from awx.conf.models import Setting From 9cde10c93aadb3f26379ae55773473fe19fc305f Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 12 Mar 2021 13:52:42 -0500 Subject: [PATCH 05/15] Fix problems with the package() function --- awx/main/analytics/core.py | 35 ++++++++++++------- .../tests/functional/analytics/test_core.py | 2 +- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index c551c3d992..fb979ded10 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -85,20 +85,29 @@ def package(target, data, timestamp): manifest = {} with tarfile.open(target.joinpath(tarname), 'w:gz') as f: for name, (item, version) in data.items(): - manifest[name] = version - if isinstance(item, str): - info = f.gettarinfo(item, arcname=name) - f.addfile(info) - else: - info = tarfile.TarInfo(name) - fileobj = io.BytesIO(json.dumps(item).encode('utf-8')) - fileobj.size = len(fileobj.getvalue()) - f.addfile(info, fileobj=fileobj) + try: + if isinstance(item, str): + info = f.gettarinfo(item, arcname=f'./{name}') + f.addfile(info) + else: + fileobj = io.BytesIO(json.dumps(item).encode('utf-8')) + info = tarfile.TarInfo(f'./{name}') + info.size = len(fileobj.getvalue()) + info.mtime = timestamp.timestamp() + f.addfile(info, fileobj=fileobj) + manifest[name] = version + except Exception: + logger.exception(f"Could not generate metric {name}") - info = tarfile.TarInfo('manifest.json') - fileobj = io.BytesIO(json.dumps(manifest).encode('utf-8')) - fileobj.size = len(fileobj.getvalue()) - f.addfile(info, fileobj=fileobj) + try: + fileobj = io.BytesIO(json.dumps(manifest).encode('utf-8')) + info = tarfile.TarInfo('./manifest.json') + info.size = len(fileobj.getvalue()) + info.mtime = timestamp.timestamp() + f.addfile(info, fileobj=fileobj) + except Exception: + logger.exception("Could not generate manifest.json") + return None return f.name except Exception: diff --git a/awx/main/tests/functional/analytics/test_core.py b/awx/main/tests/functional/analytics/test_core.py index dbb819a87f..e37f30d26b 100644 --- a/awx/main/tests/functional/analytics/test_core.py +++ b/awx/main/tests/functional/analytics/test_core.py @@ -39,7 +39,7 @@ def mock_valid_license(): def test_gather(mock_valid_license): settings.INSIGHTS_TRACKING_STATE = True - tgzfiles = gather(module=importlib.import_module(__name__)) + tgzfiles = gather(module=importlib.import_module(__name__), collection_type='dry-run') files = {} with tarfile.open(tgzfiles[0], "r:gz") as archive: for member in archive.getmembers(): From 772da61980eb7e314bc1ea79f95d18dcfa506560 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Fri, 12 Mar 2021 15:39:20 -0500 Subject: [PATCH 06/15] If a csv collector is successful but results in no files, increment anyway --- awx/main/analytics/core.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index fb979ded10..ff878cf624 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -207,6 +207,10 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti files = func(start, full_path=gather_dir, until=end) if not files: + if collection_type != 'dry-run': + with disable_activity_stream(): + last_entries[key] = end + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries) continue for fpath in files: payload = {filename: (fpath, func.__awx_analytics_version__)} From 0b31e771b17da18b5b92d4f10b81c893bbbefb04 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Wed, 17 Mar 2021 14:01:32 -0400 Subject: [PATCH 07/15] Fix the gather_analytics management command Previously, invoking the command with neither of the --ship or --dry-run flags would result in effectively doing a dry run. With the stricter checks now in place in analytics.core.gather, let's make sure that we pass the 'dry-run' parameter in to gather() in the no-flags case. --- awx/main/analytics/core.py | 2 +- awx/main/management/commands/gather_analytics.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index ff878cf624..4077730408 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -135,7 +135,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti return None if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): - logger.log(log_level, "Not gathering analytics, configuration is invalid") + logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.") return None with advisory_lock('gather_analytics_lock', wait=False) as acquired: diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index 25fffb4dec..cb5506103e 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -48,7 +48,8 @@ class Command(BaseCommand): if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgzfiles = analytics.gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) + tgzfiles = analytics.gather(collection_type='manual' if opt_ship else 'dry-run', + since=since, until=until) if tgzfiles: for tgz in tgzfiles: self.logger.info(tgz) From 3568558571873b9f53067ec841b2a16c7fbcb117 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Wed, 17 Mar 2021 16:46:12 -0400 Subject: [PATCH 08/15] A couple of bug fixes --- awx/main/analytics/collectors.py | 7 +++++-- awx/main/analytics/core.py | 16 ++++++++-------- awx/main/management/commands/gather_analytics.py | 3 +-- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 2ca217c8d7..247613639d 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -274,7 +274,10 @@ class FileSplitter(io.StringIO): self.files = self.files[:-1] # If we only have one file, remove the suffix if len(self.files) == 1: - os.rename(self.files[0], self.files[0].replace('_split0', '')) + filename = self.files.pop() + new_filename = filename.replace('_split0', '') + os.rename(filename, new_filename) + self.files.append(new_filename) return self.files def write(self, s): @@ -302,7 +305,7 @@ def events_slicing(key, since, until): last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') previous_pk = last_entries.get(key) or pk_values['pk__min'] or 0 - final_pk = pk_values['pk__max'] + final_pk = pk_values['pk__max'] or 0 for start in range(previous_pk, final_pk + 1, step): yield (start, min(start + step, final_pk)) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 4077730408..2572321e2b 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -87,24 +87,24 @@ def package(target, data, timestamp): for name, (item, version) in data.items(): try: if isinstance(item, str): - info = f.gettarinfo(item, arcname=f'./{name}') - f.addfile(info) + f.add(item, arcname=f'./{name}') else: - fileobj = io.BytesIO(json.dumps(item).encode('utf-8')) + buf = json.dumps(item).encode('utf-8') info = tarfile.TarInfo(f'./{name}') - info.size = len(fileobj.getvalue()) + info.size = len(buf) info.mtime = timestamp.timestamp() - f.addfile(info, fileobj=fileobj) + f.addfile(info, fileobj=io.BytesIO(buf)) manifest[name] = version except Exception: logger.exception(f"Could not generate metric {name}") + return None try: - fileobj = io.BytesIO(json.dumps(manifest).encode('utf-8')) + buf = json.dumps(manifest).encode('utf-8') info = tarfile.TarInfo('./manifest.json') - info.size = len(fileobj.getvalue()) + info.size = len(buf) info.mtime = timestamp.timestamp() - f.addfile(info, fileobj=fileobj) + f.addfile(info, fileobj=io.BytesIO(buf)) except Exception: logger.exception("Could not generate manifest.json") return None diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index cb5506103e..60d1524fc7 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -48,8 +48,7 @@ class Command(BaseCommand): if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgzfiles = analytics.gather(collection_type='manual' if opt_ship else 'dry-run', - since=since, until=until) + tgzfiles = analytics.gather(collection_type='manual' if opt_ship else 'dry-run', since=since, until=until) if tgzfiles: for tgz in tgzfiles: self.logger.info(tgz) From 99daa4319e99b7cfc0bf58cd55b93a79a3d42536 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Thu, 18 Mar 2021 09:22:45 -0400 Subject: [PATCH 09/15] Require the config.json file to be in dry-run tarballs --- awx/main/analytics/core.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 2572321e2b..7c0ae49842 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -160,7 +160,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti for name, func in inspect.getmembers(collector_module) if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset) ] - if collection_type != 'dry-run' and not any(c.__awx_analytics_key__ == 'config' for c in collector_list): + if not any(c.__awx_analytics_key__ == 'config' for c in collector_list): # In order to ship to analytics, we must include the output of the built-in 'config' collector. collector_list.append(collectors.config) @@ -178,18 +178,21 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti key = func.__awx_analytics_key__ filename = f'{key}.json' try: - data[filename] = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__) + results = (func(last_run, collection_type=collection_type, until=until), + func.__awx_analytics_version__) + json.dumps(results) # throwaway check to see if the data is json-serializable + data[filename] = results except Exception: logger.exception("Could not generate metric {}".format(filename)) if data: + if data.get('config.json') is None: + logger.error("'config' collector data is missing.") + return None + tgzfile = package(dest.parent, data, until) if tgzfile is not None: tarfiles.append(tgzfile) - if collection_type != 'dry-run': - if data.get('config.json') is None: - logger.error("'config' collector data is missing, and is required to ship.") - return None ship(tgzfile) with disable_activity_stream(): for filename in data: @@ -214,11 +217,12 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti continue for fpath in files: payload = {filename: (fpath, func.__awx_analytics_version__)} - if collection_type != 'dry-run': - payload['config.json'] = data.get('config.json') - if payload['config.json'] is None: - logger.error("'config' collector data is missing, and is required to ship.") - return None + + payload['config.json'] = data.get('config.json') + if payload['config.json'] is None: + logger.error("'config' collector data is missing, and is required to ship.") + return None + tgzfile = package(dest.parent, payload, until) if tgzfile is not None: tarfiles.append(tgzfile) From 39886da4b62cc8f9b5e2bc06046a63f9a787c477 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Mon, 22 Mar 2021 14:56:12 -0400 Subject: [PATCH 10/15] Deal with datetimes in AUTOMATION_ANALYTICS_LAST_ENTRIES --- awx/main/analytics/core.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 7c0ae49842..348bcad443 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -10,6 +10,7 @@ import tarfile import tempfile from django.conf import settings +from django.core.serializers.json import DjangoJSONEncoder from django.utils.timezone import now, timedelta from rest_framework.exceptions import PermissionDenied import requests @@ -178,8 +179,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti key = func.__awx_analytics_key__ filename = f'{key}.json' try: - results = (func(last_run, collection_type=collection_type, until=until), - func.__awx_analytics_version__) + results = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__) json.dumps(results) # throwaway check to see if the data is json-serializable data[filename] = results except Exception: @@ -197,7 +197,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti with disable_activity_stream(): for filename in data: last_entries[filename.replace('.json', '')] = until - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries) + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) for func in csv_collectors: key = func.__awx_analytics_key__ @@ -213,7 +213,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if collection_type != 'dry-run': with disable_activity_stream(): last_entries[key] = end - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries) + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) continue for fpath in files: payload = {filename: (fpath, func.__awx_analytics_version__)} @@ -231,7 +231,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti ship(tgzfile) with disable_activity_stream(): last_entries[key] = end - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries) + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) except Exception: logger.exception("Could not generate metric {}".format(filename)) From 1dacd7e8cf030c767a337dc06f6b299501da1528 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Tue, 23 Mar 2021 10:36:59 -0400 Subject: [PATCH 11/15] Only clean up tarballs if we had all successfully ship --- awx/main/analytics/core.py | 84 ++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 348bcad443..b80db5094c 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -172,6 +172,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti gather_dir = dest.joinpath('stage') gather_dir.mkdir(mode=0o700) tarfiles = [] + succeeded = True # These json collectors are pretty compact, so collect all of them before shipping to analytics. data = {} @@ -193,7 +194,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if tgzfile is not None: tarfiles.append(tgzfile) if collection_type != 'dry-run': - ship(tgzfile) + if not ship(tgzfile): + succeeded = False with disable_activity_stream(): for filename in data: last_entries[filename.replace('.json', '')] = until @@ -228,15 +230,21 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti tarfiles.append(tgzfile) if collection_type != 'dry-run': - ship(tgzfile) + if not ship(tgzfile): + succeeded = False with disable_activity_stream(): last_entries[key] = end settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) except Exception: logger.exception("Could not generate metric {}".format(filename)) - with disable_activity_stream(): - settings.AUTOMATION_ANALYTICS_LAST_GATHER = until + if collection_type != 'dry-run': + if succeeded: + for fpath in tarfiles: + if os.path.exists(fpath): + os.remove(fpath) + with disable_activity_stream(): + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files if not tarfiles: @@ -253,42 +261,38 @@ def ship(path): """ if not path: logger.error('Automation Analytics TAR not found') - return + return False if not os.path.exists(path): logger.error('Automation Analytics TAR {} not found'.format(path)) - return + return False if "Error:" in str(path): - return - try: - logger.debug('shipping analytics file: {}'.format(path)) - url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None) - if not url: - logger.error('AUTOMATION_ANALYTICS_URL is not set') - return - rh_user = getattr(settings, 'REDHAT_USERNAME', None) - rh_password = getattr(settings, 'REDHAT_PASSWORD', None) - if not rh_user: - return logger.error('REDHAT_USERNAME is not set') - if not rh_password: - return logger.error('REDHAT_PASSWORD is not set') - with open(path, 'rb') as f: - files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} - s = requests.Session() - s.headers = get_awx_http_client_headers() - s.headers.pop('Content-Type') - with set_environ(**settings.AWX_TASK_ENV): - response = s.post( - url, - files=files, - verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", - auth=(rh_user, rh_password), - headers=s.headers, - timeout=(31, 31), - ) - # Accept 2XX status_codes - if response.status_code >= 300: - return logger.exception('Upload failed with status {}, {}'.format(response.status_code, response.text)) - finally: - # cleanup tar.gz - if os.path.exists(path): - os.remove(path) + return False + + logger.debug('shipping analytics file: {}'.format(path)) + url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None) + if not url: + logger.error('AUTOMATION_ANALYTICS_URL is not set') + return False + rh_user = getattr(settings, 'REDHAT_USERNAME', None) + rh_password = getattr(settings, 'REDHAT_PASSWORD', None) + if not rh_user: + logger.error('REDHAT_USERNAME is not set') + return False + if not rh_password: + logger.error('REDHAT_PASSWORD is not set') + return False + with open(path, 'rb') as f: + files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} + s = requests.Session() + s.headers = get_awx_http_client_headers() + s.headers.pop('Content-Type') + with set_environ(**settings.AWX_TASK_ENV): + response = s.post( + url, files=files, verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31) + ) + # Accept 2XX status_codes + if response.status_code >= 300: + logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text)) + return False + + return True From 8ce3a14da5ee2377a08247a255cf9daf5b5a13bf Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Tue, 23 Mar 2021 13:46:21 -0400 Subject: [PATCH 12/15] A couple more fixes: - stop trying to ship csv slices when one breaks - only update LAST_ENTRIES if all of the files in a time/pk slice succeed - don't allow an explicit --until parameter to set the GATHER/ENTRIES values backwards --- awx/main/analytics/core.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index b80db5094c..2671f19601 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -194,12 +194,14 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if tgzfile is not None: tarfiles.append(tgzfile) if collection_type != 'dry-run': - if not ship(tgzfile): + if ship(tgzfile): + with disable_activity_stream(): + for filename in data: + key = filename.replace('.json', '') + last_entries[key] = max(last_entries[key], until) if last_entries.get(key) else until + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) + else: succeeded = False - with disable_activity_stream(): - for filename in data: - last_entries[filename.replace('.json', '')] = until - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) for func in csv_collectors: key = func.__awx_analytics_key__ @@ -214,9 +216,11 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if not files: if collection_type != 'dry-run': with disable_activity_stream(): - last_entries[key] = end + last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) continue + + slice_succeeded = True for fpath in files: payload = {filename: (fpath, func.__awx_analytics_version__)} @@ -228,14 +232,16 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti tgzfile = package(dest.parent, payload, until) if tgzfile is not None: tarfiles.append(tgzfile) + if not ship(tgzfile): + slice_succeeded, succeeded = False, False + break - if collection_type != 'dry-run': - if not ship(tgzfile): - succeeded = False - with disable_activity_stream(): - last_entries[key] = end - settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) + if slice_succeeded and collection_type != 'dry-run': + with disable_activity_stream(): + last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) except Exception: + succeeded = False logger.exception("Could not generate metric {}".format(filename)) if collection_type != 'dry-run': @@ -244,7 +250,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if os.path.exists(fpath): os.remove(fpath) with disable_activity_stream(): - settings.AUTOMATION_ANALYTICS_LAST_GATHER = until + if not settings.AUTOMATION_ANALYTICS_LAST_GATHER or until > settings.AUTOMATION_ANALYTICS_LAST_GATHER: + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files if not tarfiles: From 6030c5cf4ca8dd5bae892b0e51d29660de5de11d Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Wed, 24 Mar 2021 16:09:33 -0400 Subject: [PATCH 13/15] Rationalize the interval calculations for analytics gathering - `since` should not be after `until` - neither `since` nor `until` should be in the future - `since`, `AUTOMATION_ANALYTICS_LAST_GATHER`, and `AUTOMATION_ANALYTICS_LAST_ENTRIES[key]` should be truncated to 4 weeks prior to `until` - an explicit `since` parameter should always take precedence over the settings values --- awx/main/analytics/collectors.py | 63 ++++++++++++++++++++++++-------- awx/main/analytics/core.py | 32 ++++++++++++---- 2 files changed, 71 insertions(+), 24 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 247613639d..1e30494845 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -34,14 +34,60 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' +def trivial_slicing(key, since, until): + if since is not None: + return [(since, until)] + + from awx.conf.models import Setting + + horizon = until - timedelta(weeks=4) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + return [(last_entry, until)] + + def four_hour_slicing(key, since, until): - start, end = since, None + if since is not None: + last_entry = since + else: + from awx.conf.models import Setting + + horizon = until - timedelta(weeks=4) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + + start, end = last_entry, None while start < until: end = min(start + timedelta(hours=4), until) yield (start, end) start = end +def events_slicing(key, since, until): + from awx.conf.models import Setting + + last_gather = settings.AUTOMATION_ANALYTICS_LAST_GATHER + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + horizon = until - timedelta(weeks=4) + + lower = since or last_gather or horizon + if last_entries.get(key): + lower = horizon + pk_values = models.JobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk')) + + previous_pk = pk_values['pk__min'] or 0 + if last_entries.get(key): + previous_pk = max(last_entries[key] + 1, previous_pk) + final_pk = pk_values['pk__max'] or 0 + + step = 100000 + for start in range(previous_pk, final_pk + 1, step): + yield (start, min(start + step, final_pk)) + + @register('config', '1.3', description=_('General platform configuration.')) def config(since, **kwargs): license_info = get_license() @@ -296,21 +342,6 @@ def _copy_table(table, query, path): return file.file_list() -def events_slicing(key, since, until): - from awx.conf.models import Setting - - pk_values = models.JobEvent.objects.filter(created__gte=since, created__lte=until).aggregate(Min('pk'), Max('pk')) - - step = 100000 - last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') - previous_pk = last_entries.get(key) or pk_values['pk__min'] or 0 - final_pk = pk_values['pk__max'] or 0 - - for start in range(previous_pk, final_pk + 1, step): - yield (start, min(start + step, final_pk)) - - @register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing) def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 2671f19601..5411b856cf 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -148,12 +148,22 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti from awx.main.analytics import collectors from awx.main.signals import disable_activity_stream - if until is None: - until = now() - last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (until - timedelta(weeks=4)) + _now = now() + until = _now if until is None else min(until, _now) # Make sure the end isn't in the future. + horizon = until - timedelta(weeks=4) + if since is not None: + # Make sure the start isn't in the future or more than 4 weeks prior to `until`. + since = max(min(since, _now), horizon) + if since and since >= until: + logger.warning("Start of the collection interval is later than the end, ignoring request.") + return None + + logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) + # LAST_GATHER time should always get truncated to less than 4 weeks back. + last_gather = max(settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') - logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) collector_module = module if module else collectors collector_list = [ @@ -180,7 +190,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti key = func.__awx_analytics_key__ filename = f'{key}.json' try: - results = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__) + last_entry = max(last_entries.get(key) or last_gather, horizon) + results = (func(since or last_entry, collection_type=collection_type, until=until), func.__awx_analytics_version__) json.dumps(results) # throwaway check to see if the data is json-serializable data[filename] = results except Exception: @@ -207,9 +218,14 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti key = func.__awx_analytics_key__ filename = f'{key}.csv' try: - slices = [(last_run, until)] + # These slicer functions may return a generator. The `since` parameter is + # allowed to be None, and will fall back to LAST_ENTRIES[key] or to + # LAST_GATHER (truncated appropriately to match the 4-week limit). if func.__awx_expensive__: - slices = func.__awx_expensive__(key, last_run, until) # it's ok if this returns a generator + slices = func.__awx_expensive__(key, since, until) + else: + slices = collectors.trivial_slicing(key, since, until) + for start, end in slices: files = func(start, full_path=gather_dir, until=end) @@ -256,7 +272,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files if not tarfiles: # No data was collected - logger.warning("No data from {} to {}".format(last_run, until)) + logger.warning("No data from {} to {}".format(since or last_gather, until)) return None return tarfiles From f85e8a44ded240bf828eb6d370aa3bacf3fb49dc Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Thu, 25 Mar 2021 12:52:50 -0400 Subject: [PATCH 14/15] Properly parse datetimes from AUTOMATION_ANALYTICS_LAST_ENTRIES --- awx/main/analytics/collectors.py | 8 ++++---- awx/main/analytics/core.py | 4 ++-- awx/main/utils/common.py | 13 +++++++++++++ 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 1e30494845..d57f0969e6 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -12,7 +12,7 @@ from django.utils.timezone import now, timedelta from django.utils.translation import ugettext_lazy as _ from awx.conf.license import get_license -from awx.main.utils import get_awx_version, get_custom_venv_choices, camelcase_to_underscore +from awx.main.utils import get_awx_version, get_custom_venv_choices, camelcase_to_underscore, datetime_hook from awx.main import models from django.contrib.sessions.models import Session from awx.main.analytics import register @@ -42,7 +42,7 @@ def trivial_slicing(key, since, until): horizon = until - timedelta(weeks=4) last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) return [(last_entry, until)] @@ -55,7 +55,7 @@ def four_hour_slicing(key, since, until): horizon = until - timedelta(weeks=4) last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) start, end = last_entry, None @@ -70,7 +70,7 @@ def events_slicing(key, since, until): last_gather = settings.AUTOMATION_ANALYTICS_LAST_GATHER last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) horizon = until - timedelta(weeks=4) lower = since or last_gather or horizon diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 5411b856cf..71008fc2a5 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -18,7 +18,7 @@ import requests from awx.conf.license import get_license from awx.main.models import Job from awx.main.access import access_registry -from awx.main.utils import get_awx_http_client_headers, set_environ +from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook from awx.main.utils.pglock import advisory_lock __all__ = ['register', 'gather', 'ship'] @@ -163,7 +163,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti last_gather = max(settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) collector_module = module if module else collectors collector_list = [ diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 8ad4a9f485..38927bd308 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -18,6 +18,8 @@ from functools import reduce, wraps from decimal import Decimal +from dateutil import parser + # Django from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist from django.utils.translation import ugettext_lazy as _ @@ -54,6 +56,7 @@ __all__ = [ 'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean', + 'datetime_hook', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', '_inventory_updates', @@ -115,6 +118,16 @@ def to_python_boolean(value, allow_none=False): raise ValueError(_(u'Unable to convert "%s" to boolean') % value) +def datetime_hook(d): + new_d = {} + for key, value in d.items(): + try: + new_d[key] = parser.parse(value) + except Exception: + new_d[key] = value + return new_d + + def camelcase_to_underscore(s): """ Convert CamelCase names to lowercase_with_underscore. From f8b91f9b0ef778c57d906446be2cbf112ea5c200 Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Thu, 25 Mar 2021 15:44:08 -0400 Subject: [PATCH 15/15] Fixes - use parse_datetime from Django for the datetime_hook - deal with a fencepost error in the events slicer --- awx/main/analytics/collectors.py | 14 +++++++------- awx/main/utils/common.py | 7 +++---- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index d57f0969e6..48262c8e0a 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -8,16 +8,16 @@ import distro from django.db import connection from django.db.models import Count, Max, Min from django.conf import settings +from django.contrib.sessions.models import Session from django.utils.timezone import now, timedelta from django.utils.translation import ugettext_lazy as _ from awx.conf.license import get_license from awx.main.utils import get_awx_version, get_custom_venv_choices, camelcase_to_underscore, datetime_hook from awx.main import models -from django.contrib.sessions.models import Session from awx.main.analytics import register -''' +""" This module is used to define metrics collected by awx.main.analytics.gather() Each function is decorated with a key name, and should return a data structure that can be serialized to JSON @@ -31,7 +31,7 @@ All functions - when called - will be passed a datetime.datetime object, `since`, which represents the last time analytics were gathered (some metrics functions - like those that return metadata about playbook runs, may return data _since_ the last report date - i.e., new data in the last 24 hours) -''' +""" def trivial_slicing(key, since, until): @@ -74,13 +74,13 @@ def events_slicing(key, since, until): horizon = until - timedelta(weeks=4) lower = since or last_gather or horizon - if last_entries.get(key): + if not since and last_entries.get(key): lower = horizon pk_values = models.JobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk')) - previous_pk = pk_values['pk__min'] or 0 - if last_entries.get(key): - previous_pk = max(last_entries[key] + 1, previous_pk) + previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0 + if not since and last_entries.get(key): + previous_pk = max(last_entries[key], previous_pk) final_pk = pk_values['pk__max'] or 0 step = 100000 diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 38927bd308..912bdb5364 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -18,10 +18,9 @@ from functools import reduce, wraps from decimal import Decimal -from dateutil import parser - # Django from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist +from django.utils.dateparse import parse_datetime from django.utils.translation import ugettext_lazy as _ from django.utils.functional import cached_property from django.db.models.fields.related import ForeignObjectRel, ManyToManyField @@ -122,8 +121,8 @@ def datetime_hook(d): new_d = {} for key, value in d.items(): try: - new_d[key] = parser.parse(value) - except Exception: + new_d[key] = parse_datetime(value) + except TypeError: new_d[key] = value return new_d