Analytics upload: HostMetrics hybrid sync

This commit is contained in:
Martin Slemr
2023-04-03 17:26:37 +02:00
committed by Hao Liu
parent c0491a7b10
commit 44db4587be
2 changed files with 92 additions and 6 deletions

View File

@@ -6,7 +6,7 @@ import platform
import distro import distro
from django.db import connection from django.db import connection
from django.db.models import Count from django.db.models import Count, Min
from django.conf import settings from django.conf import settings
from django.contrib.sessions.models import Session from django.contrib.sessions.models import Session
from django.utils.timezone import now, timedelta from django.utils.timezone import now, timedelta
@@ -35,7 +35,7 @@ data _since_ the last report date - i.e., new data in the last 24 hours)
""" """
def trivial_slicing(key, since, until, last_gather): def trivial_slicing(key, since, until, last_gather, **kwargs):
if since is not None: if since is not None:
return [(since, until)] return [(since, until)]
@@ -48,7 +48,7 @@ def trivial_slicing(key, since, until, last_gather):
return [(last_entry, until)] return [(last_entry, until)]
def four_hour_slicing(key, since, until, last_gather): def four_hour_slicing(key, since, until, last_gather, **kwargs):
if since is not None: if since is not None:
last_entry = since last_entry = since
else: else:
@@ -69,6 +69,54 @@ def four_hour_slicing(key, since, until, last_gather):
start = end start = end
def host_metric_slicing(key, since, until, last_gather, **kwargs):
"""
Slicing doesn't start 4 weeks ago, but sends whole table monthly or first time
"""
from awx.main.models.inventory import HostMetric
if since is not None:
return [(since, until)]
from awx.conf.models import Setting
# Check if full sync should be done
full_sync_enabled = kwargs.get('full_sync_enabled', False)
last_entry = None
if not full_sync_enabled:
#
# If not, try incremental sync first
#
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)
last_entry = last_entries.get(key)
if not last_entry:
#
# If not done before, switch to full sync
#
full_sync_enabled = True
if full_sync_enabled:
#
# Find the lowest date for full sync
#
min_dates = HostMetric.objects.aggregate(min_last_automation=Min('last_automation'), min_last_deleted=Min('last_deleted'))
if min_dates['min_last_automation'] and min_dates['min_last_deleted']:
last_entry = min(min_dates['min_last_automation'], min_dates['min_last_deleted'])
elif min_dates['min_last_automation'] or min_dates['min_last_deleted']:
last_entry = min_dates['min_last_automation'] or min_dates['min_last_deleted']
if not last_entry:
# empty table
return []
start, end = last_entry, None
while start < until:
end = min(start + timedelta(days=30), until)
yield (start, end)
start = end
def _identify_lower(key, since, until, last_gather): def _identify_lower(key, since, until, last_gather):
from awx.conf.models import Setting from awx.conf.models import Setting
@@ -537,3 +585,25 @@ def workflow_job_template_node_table(since, full_path, **kwargs):
) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id
ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER''' ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER'''
return _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) return _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path)
@register(
'host_metric_table', '1.0', format='csv', description=_('Host Metric data, incremental/full sync'), expensive=host_metric_slicing, full_sync_interval=30
)
def host_metric_table(since, full_path, until, **kwargs):
host_metric_query = '''COPY (SELECT main_hostmetric.id,
main_hostmetric.hostname,
main_hostmetric.first_automation,
main_hostmetric.last_automation,
main_hostmetric.last_deleted,
main_hostmetric.deleted,
main_hostmetric.automated_counter,
main_hostmetric.deleted_counter,
main_hostmetric.used_in_inventories
FROM main_hostmetric
WHERE (main_hostmetric.last_automation > '{}' AND main_hostmetric.last_automation <= '{}') OR
(main_hostmetric.last_deleted > '{}' AND main_hostmetric.last_deleted <= '{}')
ORDER BY main_hostmetric.id ASC) TO STDOUT WITH CSV HEADER'''.format(
since.isoformat(), until.isoformat(), since.isoformat(), until.isoformat()
)
return _copy_table(table='host_metric', query=host_metric_query, path=full_path)

View File

@@ -52,7 +52,7 @@ def all_collectors():
} }
def register(key, version, description=None, format='json', expensive=None): def register(key, version, description=None, format='json', expensive=None, full_sync_interval=None):
""" """
A decorator used to register a function as a metric collector. A decorator used to register a function as a metric collector.
@@ -71,6 +71,7 @@ def register(key, version, description=None, format='json', expensive=None):
f.__awx_analytics_description__ = description f.__awx_analytics_description__ = description
f.__awx_analytics_type__ = format f.__awx_analytics_type__ = format
f.__awx_expensive__ = expensive f.__awx_expensive__ = expensive
f.__awx_full_sync_interval__ = full_sync_interval
return f return f
return decorate return decorate
@@ -259,10 +260,19 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
# These slicer functions may return a generator. The `since` parameter is # These slicer functions may return a generator. The `since` parameter is
# allowed to be None, and will fall back to LAST_ENTRIES[key] or to # allowed to be None, and will fall back to LAST_ENTRIES[key] or to
# LAST_GATHER (truncated appropriately to match the 4-week limit). # LAST_GATHER (truncated appropriately to match the 4-week limit).
#
# Or it can force full table sync if interval is given
kwargs = dict()
full_sync_enabled = False
if func.__awx_full_sync_interval__:
last_full_sync = last_entries.get(f"{key}_full")
full_sync_enabled = not last_full_sync or last_full_sync < now() - timedelta(days=func.__awx_full_sync_interval__)
kwargs['full_sync_enabled'] = full_sync_enabled
if func.__awx_expensive__: if func.__awx_expensive__:
slices = func.__awx_expensive__(key, since, until, last_gather) slices = func.__awx_expensive__(key, since, until, last_gather, **kwargs)
else: else:
slices = collectors.trivial_slicing(key, since, until, last_gather) slices = collectors.trivial_slicing(key, since, until, last_gather, **kwargs)
for start, end in slices: for start, end in slices:
files = func(start, full_path=gather_dir, until=end) files = func(start, full_path=gather_dir, until=end)
@@ -301,6 +311,12 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
succeeded = False succeeded = False
logger.exception("Could not generate metric {}".format(filename)) logger.exception("Could not generate metric {}".format(filename))
# update full sync timestamp if successfully shipped
if full_sync_enabled and collection_type != 'dry-run' and succeeded:
with disable_activity_stream():
last_entries[f"{key}_full"] = now()
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
if collection_type != 'dry-run': if collection_type != 'dry-run':
if succeeded: if succeeded:
for fpath in tarfiles: for fpath in tarfiles: