mirror of
https://github.com/ansible/awx.git
synced 2026-03-01 08:48:46 -03:30
analytics support for db partitions
* Keep old primary key based analytics gathering for unpartitioned tables. * Use created time on new partitioned tables.
This commit is contained in:
@@ -67,7 +67,7 @@ def four_hour_slicing(key, since, until, last_gather):
|
|||||||
start = end
|
start = end
|
||||||
|
|
||||||
|
|
||||||
def events_slicing(key, since, until, last_gather):
|
def _identify_lower(key, since, until, last_gather):
|
||||||
from awx.conf.models import Setting
|
from awx.conf.models import Setting
|
||||||
|
|
||||||
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
|
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
|
||||||
@@ -77,7 +77,14 @@ def events_slicing(key, since, until, last_gather):
|
|||||||
lower = since or last_gather
|
lower = since or last_gather
|
||||||
if not since and last_entries.get(key):
|
if not since and last_entries.get(key):
|
||||||
lower = horizon
|
lower = horizon
|
||||||
pk_values = models.JobEvent.objects.filter(modified__gte=lower, modified__lte=until).aggregate(Min('pk'), Max('pk'))
|
|
||||||
|
return lower, last_entries
|
||||||
|
|
||||||
|
|
||||||
|
def _events_slicing(key, since, until, last_gather, query_func):
|
||||||
|
lower, last_entries = _identify_lower(key, since, until, last_gather)
|
||||||
|
|
||||||
|
pk_values = query_func(lower, until)
|
||||||
|
|
||||||
previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0
|
previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0
|
||||||
if not since and last_entries.get(key):
|
if not since and last_entries.get(key):
|
||||||
@@ -89,6 +96,20 @@ def events_slicing(key, since, until, last_gather):
|
|||||||
yield (start, min(start + step, final_pk))
|
yield (start, min(start + step, final_pk))
|
||||||
|
|
||||||
|
|
||||||
|
def events_slicing_partitioned_modified(key, since, until, last_gather):
|
||||||
|
def query_func(lower, until):
|
||||||
|
return models.JobEvent.objects.filter(modified__gte=lower, modified__lte=until).aggregate(Min('pk'), Max('pk'))
|
||||||
|
|
||||||
|
return _events_slicing(key, since, until, last_gather, query_func)
|
||||||
|
|
||||||
|
|
||||||
|
def events_slicing_unpartitioned(key, since, until, last_gather):
|
||||||
|
def query_func(lower, until):
|
||||||
|
return models.UnpartitionedJobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk'))
|
||||||
|
|
||||||
|
return _events_slicing(key, since, until, last_gather, query_func)
|
||||||
|
|
||||||
|
|
||||||
@register('config', '1.3', description=_('General platform configuration.'))
|
@register('config', '1.3', description=_('General platform configuration.'))
|
||||||
def config(since, **kwargs):
|
def config(since, **kwargs):
|
||||||
license_info = get_license()
|
license_info = get_license()
|
||||||
@@ -335,39 +356,50 @@ def _copy_table(table, query, path):
|
|||||||
return file.file_list()
|
return file.file_list()
|
||||||
|
|
||||||
|
|
||||||
@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing)
|
def _events_table(since, full_path, until, tbl, **kwargs):
|
||||||
def events_table(since, full_path, until, **kwargs):
|
|
||||||
def query(event_data):
|
def query(event_data):
|
||||||
return f'''COPY (SELECT main_jobevent.id,
|
# TODO: conditional job_created based on if the column exists or not in the table
|
||||||
main_jobevent.created,
|
# {tbl}.job_created,
|
||||||
main_jobevent.modified,
|
return f'''COPY (SELECT {tbl}.id,
|
||||||
main_jobevent.uuid,
|
{tbl}.created,
|
||||||
main_jobevent.parent_uuid,
|
{tbl}.modified,
|
||||||
main_jobevent.event,
|
{tbl}.uuid,
|
||||||
|
{tbl}.parent_uuid,
|
||||||
|
{tbl}.event,
|
||||||
{event_data}->'task_action' AS task_action,
|
{event_data}->'task_action' AS task_action,
|
||||||
(CASE WHEN event = 'playbook_on_stats' THEN event_data END) as playbook_on_stats,
|
(CASE WHEN event = 'playbook_on_stats' THEN event_data END) as playbook_on_stats,
|
||||||
main_jobevent.failed,
|
{tbl}.failed,
|
||||||
main_jobevent.changed,
|
{tbl}.changed,
|
||||||
main_jobevent.playbook,
|
{tbl}.playbook,
|
||||||
main_jobevent.play,
|
{tbl}.play,
|
||||||
main_jobevent.task,
|
{tbl}.task,
|
||||||
main_jobevent.role,
|
{tbl}.role,
|
||||||
main_jobevent.job_id,
|
{tbl}.job_id,
|
||||||
main_jobevent.host_id,
|
{tbl}.host_id,
|
||||||
main_jobevent.host_name,
|
{tbl}.host_name,
|
||||||
CAST({event_data}->>'start' AS TIMESTAMP WITH TIME ZONE) AS start,
|
CAST({event_data}->>'start' AS TIMESTAMP WITH TIME ZONE) AS start,
|
||||||
CAST({event_data}->>'end' AS TIMESTAMP WITH TIME ZONE) AS end,
|
CAST({event_data}->>'end' AS TIMESTAMP WITH TIME ZONE) AS end,
|
||||||
{event_data}->'duration' AS duration,
|
{event_data}->'duration' AS duration,
|
||||||
{event_data}->'res'->'warnings' AS warnings,
|
{event_data}->'res'->'warnings' AS warnings,
|
||||||
{event_data}->'res'->'deprecations' AS deprecations
|
{event_data}->'res'->'deprecations' AS deprecations
|
||||||
FROM main_jobevent
|
FROM {tbl}
|
||||||
WHERE (main_jobevent.id > {since} AND main_jobevent.id <= {until})
|
WHERE ({tbl}.id > {since} AND {tbl}.id <= {until})
|
||||||
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''
|
ORDER BY {tbl}.id ASC) TO STDOUT WITH CSV HEADER'''
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return _copy_table(table='events', query=query("main_jobevent.event_data::json"), path=full_path)
|
return _copy_table(table='events', query=query(f"{tbl}.event_data::json"), path=full_path)
|
||||||
except UntranslatableCharacter:
|
except UntranslatableCharacter:
|
||||||
return _copy_table(table='events', query=query("replace(main_jobevent.event_data::text, '\\u0000', '')::json"), path=full_path)
|
return _copy_table(table='events', query=query(f"replace({tbl}.event_data::text, '\\u0000', '')::json"), path=full_path)
|
||||||
|
|
||||||
|
|
||||||
|
@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing_unpartitioned)
|
||||||
|
def events_table_unpartitioned(since, full_path, until, **kwargs):
|
||||||
|
return _events_table(since, full_path, until, '_unpartitioned_main_jobevent', **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing_partitioned_modified)
|
||||||
|
def events_table_partitioned_modified(since, full_path, until, **kwargs):
|
||||||
|
return _events_table(since, full_path, until, 'main_jobevent', **kwargs)
|
||||||
|
|
||||||
|
|
||||||
@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing)
|
@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing)
|
||||||
|
|||||||
Reference in New Issue
Block a user