From 4d7edbbad0afc84de3f0867cae698ac3a1a56fcc Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 20 Apr 2021 13:52:49 -0400 Subject: [PATCH] analytics support for db partitions * Keep old primary key based analytics gathering for unpartitioned tables. * Use created time on new partitioned tables. --- awx/main/analytics/collectors.py | 80 ++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 24 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 8893e1431c..5f2c0d7ff5 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -67,7 +67,7 @@ def four_hour_slicing(key, since, until, last_gather): start = end -def events_slicing(key, since, until, last_gather): +def _identify_lower(key, since, until, last_gather): from awx.conf.models import Setting last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() @@ -77,7 +77,14 @@ def events_slicing(key, since, until, last_gather): lower = since or last_gather if not since and last_entries.get(key): lower = horizon - pk_values = models.JobEvent.objects.filter(modified__gte=lower, modified__lte=until).aggregate(Min('pk'), Max('pk')) + + return lower, last_entries + + +def _events_slicing(key, since, until, last_gather, query_func): + lower, last_entries = _identify_lower(key, since, until, last_gather) + + pk_values = query_func(lower, until) previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0 if not since and last_entries.get(key): @@ -89,6 +96,20 @@ def events_slicing(key, since, until, last_gather): yield (start, min(start + step, final_pk)) +def events_slicing_partitioned_modified(key, since, until, last_gather): + def query_func(lower, until): + return models.JobEvent.objects.filter(modified__gte=lower, modified__lte=until).aggregate(Min('pk'), Max('pk')) + + return _events_slicing(key, since, until, last_gather, query_func) + + +def events_slicing_unpartitioned(key, since, until, last_gather): + def query_func(lower, until): + return models.UnpartitionedJobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk')) + + return _events_slicing(key, since, until, last_gather, query_func) + + @register('config', '1.3', description=_('General platform configuration.')) def config(since, **kwargs): license_info = get_license() @@ -335,39 +356,50 @@ def _copy_table(table, query, path): return file.file_list() -@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing) -def events_table(since, full_path, until, **kwargs): +def _events_table(since, full_path, until, tbl, **kwargs): def query(event_data): - return f'''COPY (SELECT main_jobevent.id, - main_jobevent.created, - main_jobevent.modified, - main_jobevent.uuid, - main_jobevent.parent_uuid, - main_jobevent.event, + # TODO: conditional job_created based on if the column exists or not in the table + # {tbl}.job_created, + return f'''COPY (SELECT {tbl}.id, + {tbl}.created, + {tbl}.modified, + {tbl}.uuid, + {tbl}.parent_uuid, + {tbl}.event, {event_data}->'task_action' AS task_action, (CASE WHEN event = 'playbook_on_stats' THEN event_data END) as playbook_on_stats, - main_jobevent.failed, - main_jobevent.changed, - main_jobevent.playbook, - main_jobevent.play, - main_jobevent.task, - main_jobevent.role, - main_jobevent.job_id, - main_jobevent.host_id, - main_jobevent.host_name, + {tbl}.failed, + {tbl}.changed, + {tbl}.playbook, + {tbl}.play, + {tbl}.task, + {tbl}.role, + {tbl}.job_id, + {tbl}.host_id, + {tbl}.host_name, CAST({event_data}->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, CAST({event_data}->>'end' AS TIMESTAMP WITH TIME ZONE) AS end, {event_data}->'duration' AS duration, {event_data}->'res'->'warnings' AS warnings, {event_data}->'res'->'deprecations' AS deprecations - FROM main_jobevent - WHERE (main_jobevent.id > {since} AND main_jobevent.id <= {until}) - ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER''' + FROM {tbl} + WHERE ({tbl}.id > {since} AND {tbl}.id <= {until}) + ORDER BY {tbl}.id ASC) TO STDOUT WITH CSV HEADER''' try: - return _copy_table(table='events', query=query("main_jobevent.event_data::json"), path=full_path) + return _copy_table(table='events', query=query(f"{tbl}.event_data::json"), path=full_path) except UntranslatableCharacter: - return _copy_table(table='events', query=query("replace(main_jobevent.event_data::text, '\\u0000', '')::json"), path=full_path) + return _copy_table(table='events', query=query(f"replace({tbl}.event_data::text, '\\u0000', '')::json"), path=full_path) + + +@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing_unpartitioned) +def events_table_unpartitioned(since, full_path, until, **kwargs): + return _events_table(since, full_path, until, '_unpartitioned_main_jobevent', **kwargs) + + +@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing_partitioned_modified) +def events_table_partitioned_modified(since, full_path, until, **kwargs): + return _events_table(since, full_path, until, 'main_jobevent', **kwargs) @register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing)