Refactor analytics.gather

With the change to use pk-based interval slicing for the job events
table, we need analytics.gather to be the code that manages all of the
"expensive" collector slicing.  While we are at it, let's ship each
chunked tarball file as we produce it.
This commit is contained in:
Jeff Bradberry
2021-03-05 15:18:49 -05:00
parent 775c0b02ee
commit a448cb17d9
7 changed files with 193 additions and 211 deletions

View File

@@ -1 +1 @@
from .core import all_collectors, expensive_collectors, register, gather, ship # noqa from .core import all_collectors, register, gather, ship # noqa

View File

@@ -5,9 +5,9 @@ import platform
import distro import distro
from django.db import connection from django.db import connection
from django.db.models import Count from django.db.models import Count, Max
from django.conf import settings from django.conf import settings
from django.utils.timezone import now from django.utils.timezone import now, timedelta
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from awx.conf.license import get_license from awx.conf.license import get_license
@@ -33,6 +33,14 @@ data _since_ the last report date - i.e., new data in the last 24 hours)
''' '''
def four_hour_slicing(key, since, until):
start, end = since, None
while start < until:
end = min(start + timedelta(hours=4), until)
yield (start, end)
start = end
@register('config', '1.3', description=_('General platform configuration.')) @register('config', '1.3', description=_('General platform configuration.'))
def config(since, **kwargs): def config(since, **kwargs):
license_info = get_license() license_info = get_license()
@@ -270,7 +278,7 @@ class FileSplitter(io.StringIO):
def write(self, s): def write(self, s):
if not self.header: if not self.header:
self.header = s[0 : s.index('\n')] self.header = s[: s.index('\n')]
self.counter += self.currentfile.write(s) self.counter += self.currentfile.write(s)
if self.counter >= MAX_TABLE_SIZE: if self.counter >= MAX_TABLE_SIZE:
self.cycle_file() self.cycle_file()
@@ -284,7 +292,20 @@ def _copy_table(table, query, path):
return file.file_list() return file.file_list()
@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=True) def events_slicing(key, since, until):
from awx.conf.models import Setting
step = 100000
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
last_entries = last_entries.value if last_entries is not None else {}
previous_pk = last_entries.get(key) or 0
final_pk = models.JobEvent.objects.filter(created_lte=until).aggregate(Max('pk'))['pk__max']
for start in range(previous_pk, final_pk + 1, step):
yield (start, min(start + step, final_pk))
@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing)
def events_table(since, full_path, until, **kwargs): def events_table(since, full_path, until, **kwargs):
events_query = '''COPY (SELECT main_jobevent.id, events_query = '''COPY (SELECT main_jobevent.id,
main_jobevent.created, main_jobevent.created,
@@ -302,22 +323,22 @@ def events_table(since, full_path, until, **kwargs):
main_jobevent.role, main_jobevent.role,
main_jobevent.job_id, main_jobevent.job_id,
main_jobevent.host_id, main_jobevent.host_id,
main_jobevent.host_name main_jobevent.host_name,
, CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start,
CAST(main_jobevent.event_data::json->>'end' AS TIMESTAMP WITH TIME ZONE) AS end, CAST(main_jobevent.event_data::json->>'end' AS TIMESTAMP WITH TIME ZONE) AS end,
main_jobevent.event_data::json->'duration' AS duration, main_jobevent.event_data::json->'duration' AS duration,
main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'warnings' AS warnings,
main_jobevent.event_data::json->'res'->'deprecations' AS deprecations main_jobevent.event_data::json->'res'->'deprecations' AS deprecations
FROM main_jobevent FROM main_jobevent
WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}') WHERE (main_jobevent.id > {} AND main_jobevent.id <= {})
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER
'''.format( '''.format(
since.isoformat(), until.isoformat() since, until
) )
return _copy_table(table='events', query=events_query, path=full_path) return _copy_table(table='events', query=events_query, path=full_path)
@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=True) @register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing)
def unified_jobs_table(since, full_path, until, **kwargs): def unified_jobs_table(since, full_path, until, **kwargs):
unified_job_query = '''COPY (SELECT main_unifiedjob.id, unified_job_query = '''COPY (SELECT main_unifiedjob.id,
main_unifiedjob.polymorphic_ctype_id, main_unifiedjob.polymorphic_ctype_id,
@@ -381,7 +402,7 @@ def unified_job_template_table(since, full_path, **kwargs):
return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True) @register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=four_hour_slicing)
def workflow_job_node_table(since, full_path, until, **kwargs): def workflow_job_node_table(since, full_path, until, **kwargs):
workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id,
main_workflowjobnode.created, main_workflowjobnode.created,

View File

@@ -1,20 +1,24 @@
import inspect import inspect
import io
import json import json
import logging import logging
import os import os
import os.path import os.path
import tempfile import pathlib
import shutil import shutil
import requests import tarfile
import tempfile
from django.conf import settings from django.conf import settings
from django.utils.timezone import now, timedelta from django.utils.timezone import now, timedelta
from rest_framework.exceptions import PermissionDenied from rest_framework.exceptions import PermissionDenied
import requests
from awx.conf.license import get_license from awx.conf.license import get_license
from awx.main.models import Job from awx.main.models import Job
from awx.main.access import access_registry from awx.main.access import access_registry
from awx.main.utils import get_awx_http_client_headers, set_environ from awx.main.utils import get_awx_http_client_headers, set_environ
from awx.main.utils.pglock import advisory_lock
__all__ = ['register', 'gather', 'ship'] __all__ = ['register', 'gather', 'ship']
@@ -36,29 +40,18 @@ def _valid_license():
def all_collectors(): def all_collectors():
from awx.main.analytics import collectors from awx.main.analytics import collectors
collector_dict = {} return {
module = collectors func.__awx_analytics_key__: {
for name, func in inspect.getmembers(module): 'name': func.__awx_analytics_key__,
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): 'version': func.__awx_analytics_version__,
key = func.__awx_analytics_key__ 'description': func.__awx_analytics_description__ or '',
desc = func.__awx_analytics_description__ or '' }
version = func.__awx_analytics_version__ for name, func in inspect.getmembers(collectors)
collector_dict[key] = {'name': key, 'version': version, 'description': desc} if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__')
return collector_dict }
def expensive_collectors(): def register(key, version, description=None, format='json', expensive=None):
from awx.main.analytics import collectors
ret = []
module = collectors
for name, func in inspect.getmembers(module):
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__:
ret.append(func.__awx_analytics_key__)
return ret
def register(key, version, description=None, format='json', expensive=False):
""" """
A decorator used to register a function as a metric collector. A decorator used to register a function as a metric collector.
@@ -82,136 +75,154 @@ def register(key, version, description=None, format='json', expensive=False):
return decorate return decorate
def gather(dest=None, module=None, subset=None, since=None, until=now(), collection_type='scheduled'): def package(target, data, timestamp):
try:
tarname_base = f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}'
path = pathlib.Path(target)
index = len(list(path.glob(f'{tarname_base}-*.*')))
tarname = f'{tarname_base}-{index}.tar.gz'
manifest = {}
with tarfile.open(target.joinpath(tarname), 'w:gz') as f:
for name, (item, version) in data.items():
manifest[name] = version
if isinstance(item, str):
info = f.gettarinfo(item, arcname=name)
f.addfile(info)
else:
info = tarfile.TarInfo(name)
fileobj = io.BytesIO(json.dumps(item).encode('utf-8'))
fileobj.size = len(fileobj.getvalue())
f.addfile(info, fileobj=fileobj)
info = tarfile.TarInfo('manifest.json')
fileobj = io.BytesIO(json.dumps(manifest).encode('utf-8'))
fileobj.size = len(fileobj.getvalue())
f.addfile(info, fileobj=fileobj)
return f.name
except Exception:
logger.exception("Failed to write analytics archive file")
return None
def gather(dest=None, module=None, subset=None, since=None, until=None, collection_type='scheduled'):
""" """
Gather all defined metrics and write them as JSON files in a .tgz Gather all defined metrics and write them as JSON files in a .tgz
:param dest: the (optional) absolute path to write a compressed tarball :param dest: the (optional) absolute path to write a compressed tarball
:param module: the module to search for registered analytic collector :param module: the module to search for registered analytic collector
functions; defaults to awx.main.analytics.collectors functions; defaults to awx.main.analytics.collectors
""" """
def _write_manifest(destdir, manifest): if not _valid_license():
path = os.path.join(destdir, 'manifest.json') logger.error("Invalid License provided, or No License Provided")
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(manifest, f)
except Exception:
f.close()
os.remove(f.name)
logger.exception("Could not generate manifest.json")
last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4))
logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER))
if _valid_license() is False:
logger.exception("Invalid License provided, or No License Provided")
return None return None
if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE: if collection_type != 'dry-run':
logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") if not settings.INSIGHTS_TRACKING_STATE:
return None logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
return None
collector_list = [] if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD):
if module: logger.debug('Not gathering analytics, configuration is invalid')
collector_module = module return None
else:
from awx.main.analytics import collectors
collector_module = collectors with advisory_lock('gather_analytics_lock', wait=False) as acquired:
for name, func in inspect.getmembers(collector_module): if not acquired:
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset): logger.debug('Not gathering analytics, another task holds lock')
collector_list.append((name, func)) return None
manifest = dict() from awx.conf.models import Setting
dest = dest or tempfile.mkdtemp(prefix='awx_analytics') from awx.main.signals import disable_activity_stream
gather_dir = os.path.join(dest, 'stage')
os.mkdir(gather_dir, 0o700) if until is None:
num_splits = 1 until = now()
for name, func in collector_list: last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (until - timedelta(weeks=4))
if func.__awx_analytics_type__ == 'json': last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
last_entries = last_entries.value if last_entries is not None else {}
logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER))
if module:
collector_module = module
else:
from awx.main.analytics import collectors
collector_module = collectors
if subset: # ensure that the config collector is added to any explicit subset of our builtins
subset = set(subset) | {
'config',
}
collector_list = [
func
for name, func in inspect.getmembers(collector_module)
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset)
]
json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json']
csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv']
dest = pathlib.Path(dest or tempfile.mkdtemp(prefix='awx_analytics'))
gather_dir = dest.joinpath('stage')
gather_dir.mkdir(mode=0o700)
tarfiles = []
# These json collectors are pretty compact, so collect all of them before shipping to analytics.
data = {}
for func in json_collectors:
key = func.__awx_analytics_key__ key = func.__awx_analytics_key__
path = '{}.json'.format(os.path.join(gather_dir, key)) filename = f'{key}.json'
with open(path, 'w', encoding='utf-8') as f: try:
try: data[filename] = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__)
json.dump(func(last_run, collection_type=collection_type, until=until), f) except Exception:
manifest['{}.json'.format(key)] = func.__awx_analytics_version__ logger.exception("Could not generate metric {}".format(filename))
except Exception: if data:
logger.exception("Could not generate metric {}.json".format(key)) tgzfile = package(dest.parent, data, until)
f.close() if tgzfile is not None:
os.remove(f.name) tarfiles.append(tgzfile)
elif func.__awx_analytics_type__ == 'csv':
if collection_type != 'dry-run':
ship(tgzfile)
with disable_activity_stream():
for filename in data:
last_entries[filename.replace('.json', '')] = until
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries
for func in csv_collectors:
key = func.__awx_analytics_key__ key = func.__awx_analytics_key__
filename = f'{key}.csv'
try: try:
files = func(last_run, full_path=gather_dir, until=until) slices = [(last_run, until)]
if files: if func.__awx_expensive__:
manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ slices = func.__awx_expensive__(key, last_run, until) # it's ok if this returns a generator
if len(files) > num_splits: for start, end in slices:
num_splits = len(files) files = func(start, full_path=gather_dir, until=end)
if not files:
continue
for fpath in files:
tgzfile = package(dest.parent, {filename: (fpath, func.__awx_analytics_version__)}, until)
if tgzfile is not None:
tarfiles.append(tgzfile)
if collection_type != 'dry-run':
ship(tgzfile)
with disable_activity_stream():
last_entries[key] = end
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries
except Exception: except Exception:
logger.exception("Could not generate metric {}.csv".format(key)) logger.exception("Could not generate metric {}".format(filename))
if not manifest: with disable_activity_stream():
# No data was collected settings.AUTOMATION_ANALYTICS_LAST_GATHER = until
logger.warning("No data from {} to {}".format(last_run, until))
shutil.rmtree(dest)
return None
# Always include config.json if we're using our collectors shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files
if 'config.json' not in manifest.keys() and not module: if not tarfiles:
from awx.main.analytics import collectors # No data was collected
logger.warning("No data from {} to {}".format(last_run, until))
return None
config = collectors.config return tarfiles
path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__))
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(collectors.config(last_run), f)
manifest['config.json'] = config.__awx_analytics_version__
except Exception:
logger.exception("Could not generate metric {}.json".format(key))
f.close()
os.remove(f.name)
shutil.rmtree(dest)
return None
stage_dirs = [gather_dir]
if num_splits > 1:
for i in range(0, num_splits):
split_path = os.path.join(dest, 'split{}'.format(i))
os.mkdir(split_path, 0o700)
filtered_manifest = {}
shutil.copy(os.path.join(gather_dir, 'config.json'), split_path)
filtered_manifest['config.json'] = manifest['config.json']
suffix = '_split{}'.format(i)
for file in os.listdir(gather_dir):
if file.endswith(suffix):
old_file = os.path.join(gather_dir, file)
new_filename = file.replace(suffix, '')
new_file = os.path.join(split_path, new_filename)
shutil.move(old_file, new_file)
filtered_manifest[new_filename] = manifest[new_filename]
_write_manifest(split_path, filtered_manifest)
stage_dirs.append(split_path)
for item in list(manifest.keys()):
if not os.path.exists(os.path.join(gather_dir, item)):
manifest.pop(item)
_write_manifest(gather_dir, manifest)
tarfiles = []
try:
for i in range(0, len(stage_dirs)):
stage_dir = stage_dirs[i]
# can't use isoformat() since it has colons, which GNU tar doesn't like
tarname = '_'.join([settings.SYSTEM_UUID, until.strftime('%Y-%m-%d-%H%M%S%z'), str(i)])
tgz = shutil.make_archive(os.path.join(os.path.dirname(dest), tarname), 'gztar', stage_dir)
tarfiles.append(tgz)
except Exception:
shutil.rmtree(stage_dir, ignore_errors=True)
logger.exception("Failed to write analytics archive file")
finally:
shutil.rmtree(dest, ignore_errors=True)
return tarfiles
def ship(path): def ship(path):

View File

@@ -10,6 +10,7 @@ from rest_framework.fields import FloatField
# Tower # Tower
from awx.conf import fields, register, register_validate from awx.conf import fields, register, register_validate
from awx.main.fields import JSONField
from awx.main.models import ExecutionEnvironment from awx.main.models import ExecutionEnvironment
@@ -778,6 +779,13 @@ register(
category=_('System'), category=_('System'),
category_slug='system', category_slug='system',
) )
register(
'AUTOMATION_ANALYTICS_LAST_ENTRIES',
field_class=JSONField,
# label=_('Last gathered entries for expensive Automation Analytics collectors.'),
category=_('System'),
category_slug='system',
)
register( register(

View File

@@ -1,6 +1,6 @@
import logging import logging
from awx.main.analytics import gather, ship from awx.main import analytics
from dateutil import parser from dateutil import parser
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.utils.timezone import now from django.utils.timezone import now
@@ -48,13 +48,9 @@ class Command(BaseCommand):
if opt_ship and opt_dry_run: if opt_ship and opt_dry_run:
self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') self.logger.error('Both --ship and --dry-run cannot be processed at the same time.')
return return
tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) tgzfiles = analytics.gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until)
if tgzfiles: if tgzfiles:
for tgz in tgzfiles: for tgz in tgzfiles:
self.logger.info(tgz) self.logger.info(tgz)
else: else:
self.logger.error('No analytics collected') self.logger.error('No analytics collected')
if opt_ship:
if tgzfiles:
for tgz in tgzfiles:
ship(tgz)

View File

@@ -57,7 +57,6 @@ from receptorctl.socket_interface import ReceptorControl
from awx import __version__ as awx_application_version from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
from awx.main.access import access_registry from awx.main.access import access_registry
from awx.main.analytics import all_collectors, expensive_collectors
from awx.main.redact import UriCleaner from awx.main.redact import UriCleaner
from awx.main.models import ( from awx.main.models import (
Schedule, Schedule,
@@ -377,70 +376,15 @@ def send_notifications(notification_list, job_id=None):
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def gather_analytics(): def gather_analytics():
def _gather_and_ship(subset, since, until):
tgzfiles = []
try:
tgzfiles = analytics.gather(subset=subset, since=since, until=until)
# empty analytics without raising an exception is not an error
if not tgzfiles:
return True
logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles))
for tgz in tgzfiles:
analytics.ship(tgz)
except Exception:
logger.exception('Error gathering and sending analytics for {} to {}.'.format(since, until))
return False
finally:
if tgzfiles:
for tgz in tgzfiles:
if os.path.exists(tgz):
os.remove(tgz)
return True
from awx.conf.models import Setting from awx.conf.models import Setting
from rest_framework.fields import DateTimeField from rest_framework.fields import DateTimeField
from awx.main.signals import disable_activity_stream
if not settings.INSIGHTS_TRACKING_STATE:
return
if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD):
logger.debug('Not gathering analytics, configuration is invalid')
return
last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first() last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first()
if last_gather: last_time = DateTimeField().to_internal_value(last_gather.value) if last_gather else None
last_time = DateTimeField().to_internal_value(last_gather.value)
else:
last_time = None
gather_time = now() gather_time = now()
if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
with advisory_lock('gather_analytics_lock', wait=False) as acquired: analytics.gather()
if acquired is False:
logger.debug('Not gathering analytics, another task holds lock')
return
subset = list(all_collectors().keys())
incremental_collectors = []
for collector in expensive_collectors():
if collector in subset:
subset.remove(collector)
incremental_collectors.append(collector)
# Cap gathering at 4 weeks of data if there has been no data gathering
since = last_time or (gather_time - timedelta(weeks=4))
if incremental_collectors:
start = since
until = None
while start < gather_time:
until = start + timedelta(hours=4)
if until > gather_time:
until = gather_time
if not _gather_and_ship(incremental_collectors, since=start, until=until):
break
start = until
with disable_activity_stream():
settings.AUTOMATION_ANALYTICS_LAST_GATHER = until
if subset:
_gather_and_ship(subset, since=since, until=gather_time)
@task(queue=get_local_queuename) @task(queue=get_local_queuename)

View File

@@ -620,6 +620,8 @@ INSIGHTS_TRACKING_STATE = False
# Last gather date for Analytics # Last gather date for Analytics
AUTOMATION_ANALYTICS_LAST_GATHER = None AUTOMATION_ANALYTICS_LAST_GATHER = None
# Last gathered entries for expensive Analytics
AUTOMATION_ANALYTICS_LAST_ENTRIES = None
# Default list of modules allowed for ad hoc commands. # Default list of modules allowed for ad hoc commands.
# Note: This setting may be overridden by database settings. # Note: This setting may be overridden by database settings.