From 6030c5cf4ca8dd5bae892b0e51d29660de5de11d Mon Sep 17 00:00:00 2001 From: Jeff Bradberry Date: Wed, 24 Mar 2021 16:09:33 -0400 Subject: [PATCH] Rationalize the interval calculations for analytics gathering - `since` should not be after `until` - neither `since` nor `until` should be in the future - `since`, `AUTOMATION_ANALYTICS_LAST_GATHER`, and `AUTOMATION_ANALYTICS_LAST_ENTRIES[key]` should be truncated to 4 weeks prior to `until` - an explicit `since` parameter should always take precedence over the settings values --- awx/main/analytics/collectors.py | 63 ++++++++++++++++++++++++-------- awx/main/analytics/core.py | 32 ++++++++++++---- 2 files changed, 71 insertions(+), 24 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 247613639d..1e30494845 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -34,14 +34,60 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' +def trivial_slicing(key, since, until): + if since is not None: + return [(since, until)] + + from awx.conf.models import Setting + + horizon = until - timedelta(weeks=4) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + return [(last_entry, until)] + + def four_hour_slicing(key, since, until): - start, end = since, None + if since is not None: + last_entry = since + else: + from awx.conf.models import Setting + + horizon = until - timedelta(weeks=4) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + + start, end = last_entry, None while start < until: end = min(start + timedelta(hours=4), until) yield (start, end) start = end +def events_slicing(key, since, until): + from awx.conf.models import Setting + + last_gather = settings.AUTOMATION_ANALYTICS_LAST_GATHER + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') + horizon = until - timedelta(weeks=4) + + lower = since or last_gather or horizon + if last_entries.get(key): + lower = horizon + pk_values = models.JobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk')) + + previous_pk = pk_values['pk__min'] or 0 + if last_entries.get(key): + previous_pk = max(last_entries[key] + 1, previous_pk) + final_pk = pk_values['pk__max'] or 0 + + step = 100000 + for start in range(previous_pk, final_pk + 1, step): + yield (start, min(start + step, final_pk)) + + @register('config', '1.3', description=_('General platform configuration.')) def config(since, **kwargs): license_info = get_license() @@ -296,21 +342,6 @@ def _copy_table(table, query, path): return file.file_list() -def events_slicing(key, since, until): - from awx.conf.models import Setting - - pk_values = models.JobEvent.objects.filter(created__gte=since, created__lte=until).aggregate(Min('pk'), Max('pk')) - - step = 100000 - last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() - last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') - previous_pk = last_entries.get(key) or pk_values['pk__min'] or 0 - final_pk = pk_values['pk__max'] or 0 - - for start in range(previous_pk, final_pk + 1, step): - yield (start, min(start + step, final_pk)) - - @register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing) def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 2671f19601..5411b856cf 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -148,12 +148,22 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti from awx.main.analytics import collectors from awx.main.signals import disable_activity_stream - if until is None: - until = now() - last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (until - timedelta(weeks=4)) + _now = now() + until = _now if until is None else min(until, _now) # Make sure the end isn't in the future. + horizon = until - timedelta(weeks=4) + if since is not None: + # Make sure the start isn't in the future or more than 4 weeks prior to `until`. + since = max(min(since, _now), horizon) + if since and since >= until: + logger.warning("Start of the collection interval is later than the end, ignoring request.") + return None + + logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) + # LAST_GATHER time should always get truncated to less than 4 weeks back. + last_gather = max(settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}') - logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) collector_module = module if module else collectors collector_list = [ @@ -180,7 +190,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti key = func.__awx_analytics_key__ filename = f'{key}.json' try: - results = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__) + last_entry = max(last_entries.get(key) or last_gather, horizon) + results = (func(since or last_entry, collection_type=collection_type, until=until), func.__awx_analytics_version__) json.dumps(results) # throwaway check to see if the data is json-serializable data[filename] = results except Exception: @@ -207,9 +218,14 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti key = func.__awx_analytics_key__ filename = f'{key}.csv' try: - slices = [(last_run, until)] + # These slicer functions may return a generator. The `since` parameter is + # allowed to be None, and will fall back to LAST_ENTRIES[key] or to + # LAST_GATHER (truncated appropriately to match the 4-week limit). if func.__awx_expensive__: - slices = func.__awx_expensive__(key, last_run, until) # it's ok if this returns a generator + slices = func.__awx_expensive__(key, since, until) + else: + slices = collectors.trivial_slicing(key, since, until) + for start, end in slices: files = func(start, full_path=gather_dir, until=end) @@ -256,7 +272,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files if not tarfiles: # No data was collected - logger.warning("No data from {} to {}".format(last_run, until)) + logger.warning("No data from {} to {}".format(since or last_gather, until)) return None return tarfiles