From 44db4587be0eead95355e244574818dddf35c8b7 Mon Sep 17 00:00:00 2001 From: Martin Slemr Date: Mon, 3 Apr 2023 17:26:37 +0200 Subject: [PATCH] Analytics upload: HostMetrics hybrid sync --- awx/main/analytics/collectors.py | 76 ++++++++++++++++++++++++++++++-- awx/main/analytics/core.py | 22 +++++++-- 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index e7655997d2..1bc4c9044f 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -6,7 +6,7 @@ import platform import distro from django.db import connection -from django.db.models import Count +from django.db.models import Count, Min from django.conf import settings from django.contrib.sessions.models import Session from django.utils.timezone import now, timedelta @@ -35,7 +35,7 @@ data _since_ the last report date - i.e., new data in the last 24 hours) """ -def trivial_slicing(key, since, until, last_gather): +def trivial_slicing(key, since, until, last_gather, **kwargs): if since is not None: return [(since, until)] @@ -48,7 +48,7 @@ def trivial_slicing(key, since, until, last_gather): return [(last_entry, until)] -def four_hour_slicing(key, since, until, last_gather): +def four_hour_slicing(key, since, until, last_gather, **kwargs): if since is not None: last_entry = since else: @@ -69,6 +69,54 @@ def four_hour_slicing(key, since, until, last_gather): start = end +def host_metric_slicing(key, since, until, last_gather, **kwargs): + """ + Slicing doesn't start 4 weeks ago, but sends whole table monthly or first time + """ + from awx.main.models.inventory import HostMetric + + if since is not None: + return [(since, until)] + + from awx.conf.models import Setting + + # Check if full sync should be done + full_sync_enabled = kwargs.get('full_sync_enabled', False) + last_entry = None + if not full_sync_enabled: + # + # If not, try incremental sync first + # + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) + last_entry = last_entries.get(key) + if not last_entry: + # + # If not done before, switch to full sync + # + full_sync_enabled = True + + if full_sync_enabled: + # + # Find the lowest date for full sync + # + min_dates = HostMetric.objects.aggregate(min_last_automation=Min('last_automation'), min_last_deleted=Min('last_deleted')) + if min_dates['min_last_automation'] and min_dates['min_last_deleted']: + last_entry = min(min_dates['min_last_automation'], min_dates['min_last_deleted']) + elif min_dates['min_last_automation'] or min_dates['min_last_deleted']: + last_entry = min_dates['min_last_automation'] or min_dates['min_last_deleted'] + + if not last_entry: + # empty table + return [] + + start, end = last_entry, None + while start < until: + end = min(start + timedelta(days=30), until) + yield (start, end) + start = end + + def _identify_lower(key, since, until, last_gather): from awx.conf.models import Setting @@ -537,3 +585,25 @@ def workflow_job_template_node_table(since, full_path, **kwargs): ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER''' return _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) + + +@register( + 'host_metric_table', '1.0', format='csv', description=_('Host Metric data, incremental/full sync'), expensive=host_metric_slicing, full_sync_interval=30 +) +def host_metric_table(since, full_path, until, **kwargs): + host_metric_query = '''COPY (SELECT main_hostmetric.id, + main_hostmetric.hostname, + main_hostmetric.first_automation, + main_hostmetric.last_automation, + main_hostmetric.last_deleted, + main_hostmetric.deleted, + main_hostmetric.automated_counter, + main_hostmetric.deleted_counter, + main_hostmetric.used_in_inventories + FROM main_hostmetric + WHERE (main_hostmetric.last_automation > '{}' AND main_hostmetric.last_automation <= '{}') OR + (main_hostmetric.last_deleted > '{}' AND main_hostmetric.last_deleted <= '{}') + ORDER BY main_hostmetric.id ASC) TO STDOUT WITH CSV HEADER'''.format( + since.isoformat(), until.isoformat(), since.isoformat(), until.isoformat() + ) + return _copy_table(table='host_metric', query=host_metric_query, path=full_path) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 77f6108205..9cbc873b2b 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -52,7 +52,7 @@ def all_collectors(): } -def register(key, version, description=None, format='json', expensive=None): +def register(key, version, description=None, format='json', expensive=None, full_sync_interval=None): """ A decorator used to register a function as a metric collector. @@ -71,6 +71,7 @@ def register(key, version, description=None, format='json', expensive=None): f.__awx_analytics_description__ = description f.__awx_analytics_type__ = format f.__awx_expensive__ = expensive + f.__awx_full_sync_interval__ = full_sync_interval return f return decorate @@ -259,10 +260,19 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti # These slicer functions may return a generator. The `since` parameter is # allowed to be None, and will fall back to LAST_ENTRIES[key] or to # LAST_GATHER (truncated appropriately to match the 4-week limit). + # + # Or it can force full table sync if interval is given + kwargs = dict() + full_sync_enabled = False + if func.__awx_full_sync_interval__: + last_full_sync = last_entries.get(f"{key}_full") + full_sync_enabled = not last_full_sync or last_full_sync < now() - timedelta(days=func.__awx_full_sync_interval__) + + kwargs['full_sync_enabled'] = full_sync_enabled if func.__awx_expensive__: - slices = func.__awx_expensive__(key, since, until, last_gather) + slices = func.__awx_expensive__(key, since, until, last_gather, **kwargs) else: - slices = collectors.trivial_slicing(key, since, until, last_gather) + slices = collectors.trivial_slicing(key, since, until, last_gather, **kwargs) for start, end in slices: files = func(start, full_path=gather_dir, until=end) @@ -301,6 +311,12 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti succeeded = False logger.exception("Could not generate metric {}".format(filename)) + # update full sync timestamp if successfully shipped + if full_sync_enabled and collection_type != 'dry-run' and succeeded: + with disable_activity_stream(): + last_entries[f"{key}_full"] = now() + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) + if collection_type != 'dry-run': if succeeded: for fpath in tarfiles: