no longer get the size of the gather set

* Before, we would get the min and max pk of the set we are to gather.
This changeset removes that.
* Before, we would, basically, know the size of the set we are to gather
and would query 100,000 of those job event records at a time. That logic
is now gone.
* Now, for unpartitioned job events we gather 4 hours at a time by
created time.
* Now, for partitioned job events we gather 4 hours at a time by
modified time.
This commit is contained in:
Chris Meyers
2021-05-06 14:50:49 -04:00
committed by Jim Ladd
parent 137111351c
commit 1c97b9a046
2 changed files with 14 additions and 41 deletions

View File

@@ -58,7 +58,10 @@ def four_hour_slicing(key, since, until, last_gather):
horizon = until - timedelta(weeks=4)
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)
last_entry = max(last_entries.get(key) or last_gather, horizon)
try:
last_entry = max(last_entries.get(key) or last_gather, horizon)
except TypeError: # last_entries has a stale non-datetime entry for this collector
last_entry = max(last_gather, horizon)
start, end = last_entry, None
while start < until:
@@ -81,35 +84,6 @@ def _identify_lower(key, since, until, last_gather):
return lower, last_entries
def _events_slicing(key, since, until, last_gather, query_func):
lower, last_entries = _identify_lower(key, since, until, last_gather)
pk_values = query_func(lower, until)
previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0
if not since and last_entries.get(key):
previous_pk = max(last_entries[key], previous_pk)
final_pk = pk_values['pk__max'] or 0
step = 100000
for start in range(previous_pk, final_pk + 1, step):
yield (start, min(start + step, final_pk))
def events_slicing_partitioned_modified(key, since, until, last_gather):
def query_func(lower, until):
return models.JobEvent.objects.filter(modified__gte=lower, modified__lte=until).aggregate(Min('pk'), Max('pk'))
return _events_slicing(key, since, until, last_gather, query_func)
def events_slicing_unpartitioned(key, since, until, last_gather):
def query_func(lower, until):
return models.UnpartitionedJobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk'))
return _events_slicing(key, since, until, last_gather, query_func)
@register('config', '1.3', description=_('General platform configuration.'))
def config(since, **kwargs):
license_info = get_license()
@@ -356,7 +330,7 @@ def _copy_table(table, query, path):
return file.file_list()
def _events_table(since, full_path, until, tbl, project_job_created=False, order_by=True, **kwargs):
def _events_table(since, full_path, until, tbl, where_column, project_job_created=False, **kwargs):
def query(event_data):
query = f'''COPY (SELECT {tbl}.id,
{tbl}.created,
@@ -382,10 +356,7 @@ def _events_table(since, full_path, until, tbl, project_job_created=False, order
x.res->'warnings' AS warnings,
x.res->'deprecations' AS deprecations
FROM {tbl}, json_to_record({event_data}) AS x("res" json, "duration" text, "task_action" text, "start" text, "end" text)
WHERE ({tbl}.id > {since} AND {tbl}.id <= {until})'''
if order_by:
query += f' ORDER BY {tbl}.id ASC'
query += ') TO STDOUT WITH CSV HEADER'
WHERE ({tbl}.{where_column} > '{since.isoformat()}' AND {tbl}.{where_column} <= '{until.isoformat()}')) TO STDOUT WITH CSV HEADER'''
return query
try:
@@ -394,14 +365,14 @@ def _events_table(since, full_path, until, tbl, project_job_created=False, order
return _copy_table(table='events', query=query(f"replace({tbl}.event_data::text, '\\u0000', '')::json"), path=full_path)
@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=events_slicing_unpartitioned)
@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=four_hour_slicing)
def events_table_unpartitioned(since, full_path, until, **kwargs):
return _events_table(since, full_path, until, '_unpartitioned_main_jobevent', **kwargs)
return _events_table(since, full_path, until, '_unpartitioned_main_jobevent', 'created', **kwargs)
@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=events_slicing_partitioned_modified)
@register('events_table', '1.3', format='csv', description=_('Automation task records'), expensive=four_hour_slicing)
def events_table_partitioned_modified(since, full_path, until, **kwargs):
return _events_table(since, full_path, until, 'main_jobevent', project_job_created=True, order_by=False, **kwargs)
return _events_table(since, full_path, until, 'main_jobevent', 'modified', project_job_created=True, **kwargs)
@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing)

View File

@@ -270,7 +270,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
if not files:
if collection_type != 'dry-run':
with disable_activity_stream():
last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end
entry = last_entries.get(key)
last_entries[key] = max(entry, end) if entry and type(entry) == type(end) else end
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
continue
@@ -293,7 +294,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
if slice_succeeded and collection_type != 'dry-run':
with disable_activity_stream():
last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end
entry = last_entries.get(key)
last_entries[key] = max(entry, end) if entry and type(entry) == type(end) else end
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
except Exception:
succeeded = False