From 1c97b9a0461e6cd5939455854e027620c73d7577 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 6 May 2021 14:50:49 -0400 Subject: [PATCH] no longer get the size of the gather set * Before, we would get the min and max pk of the set we are to gather. This changeset removes that. * Before, we would, basically, know the size of the set we are to gather and would query 100,000 of those job event records at a time. That logic is now gone. * Now, for unpartitioned job events we gather 4 hours at a time by created time. * Now, for partitioned job events we gather 4 hours at a time by modified time. --- awx/main/analytics/collectors.py | 49 +++++++------------------------- awx/main/analytics/core.py | 6 ++-- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index b1496e2540..e97307bb48 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -58,7 +58,10 @@ def four_hour_slicing(key, since, until, last_gather): horizon = until - timedelta(weeks=4) last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) - last_entry = max(last_entries.get(key) or last_gather, horizon) + try: + last_entry = max(last_entries.get(key) or last_gather, horizon) + except TypeError: # last_entries has a stale non-datetime entry for this collector + last_entry = max(last_gather, horizon) start, end = last_entry, None while start < until: @@ -81,35 +84,6 @@ def _identify_lower(key, since, until, last_gather): return lower, last_entries -def _events_slicing(key, since, until, last_gather, query_func): - lower, last_entries = _identify_lower(key, since, until, last_gather) - - pk_values = query_func(lower, until) - - previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0 - if not since and last_entries.get(key): - previous_pk = max(last_entries[key], previous_pk) - final_pk = pk_values['pk__max'] or 0 - - step = 100000 - for start in range(previous_pk, final_pk + 1, step): - yield (start, min(start + step, final_pk)) - - -def events_slicing_partitioned_modified(key, since, until, last_gather): - def query_func(lower, until): - return models.JobEvent.objects.filter(modified__gte=lower, modified__lte=until).aggregate(Min('pk'), Max('pk')) - - return _events_slicing(key, since, until, last_gather, query_func) - - -def events_slicing_unpartitioned(key, since, until, last_gather): - def query_func(lower, until): - return models.UnpartitionedJobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk')) - - return _events_slicing(key, since, until, last_gather, query_func) - - @register('config', '1.3', description=_('General platform configuration.')) def config(since, **kwargs): license_info = get_license() @@ -356,7 +330,7 @@ def _copy_table(table, query, path): return file.file_list() -def _events_table(since, full_path, until, tbl, project_job_created=False, order_by=True, **kwargs): +def _events_table(since, full_path, until, tbl, where_column, project_job_created=False, **kwargs): def query(event_data): query = f'''COPY (SELECT {tbl}.id, {tbl}.created, @@ -382,10 +356,7 @@ def _events_table(since, full_path, until, tbl, project_job_created=False, order x.res->'warnings' AS warnings, x.res->'deprecations' AS deprecations FROM {tbl}, json_to_record({event_data}) AS x("res" json, "duration" text, "task_action" text, "start" text, "end" text) - WHERE ({tbl}.id > {since} AND {tbl}.id <= {until})''' - if order_by: - query += f' ORDER BY {tbl}.id ASC' - query += ') TO STDOUT WITH CSV HEADER' + WHERE ({tbl}.{where_column} > '{since.isoformat()}' AND {tbl}.{where_column} <= '{until.isoformat()}')) TO STDOUT WITH CSV HEADER''' return query try: @@ -394,14 +365,14 @@ def _events_table(since, full_path, until, tbl, project_job_created=False, order return _copy_table(table='events', query=query(f"replace({tbl}.event_data::text, '\\u0000', '')::json"), path=full_path) -@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=events_slicing_unpartitioned) +@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=four_hour_slicing) def events_table_unpartitioned(since, full_path, until, **kwargs): - return _events_table(since, full_path, until, '_unpartitioned_main_jobevent', **kwargs) + return _events_table(since, full_path, until, '_unpartitioned_main_jobevent', 'created', **kwargs) -@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=events_slicing_partitioned_modified) +@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=four_hour_slicing) def events_table_partitioned_modified(since, full_path, until, **kwargs): - return _events_table(since, full_path, until, 'main_jobevent', project_job_created=True, order_by=False, **kwargs) + return _events_table(since, full_path, until, 'main_jobevent', 'modified', project_job_created=True, **kwargs) @register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 36c6b97b4b..d63afdfbf3 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -270,7 +270,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if not files: if collection_type != 'dry-run': with disable_activity_stream(): - last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end + entry = last_entries.get(key) + last_entries[key] = max(entry, end) if entry and type(entry) == type(end) else end settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) continue @@ -293,7 +294,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti if slice_succeeded and collection_type != 'dry-run': with disable_activity_stream(): - last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end + entry = last_entries.get(key) + last_entries[key] = max(entry, end) if entry and type(entry) == type(end) else end settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) except Exception: succeeded = False