diff --git a/awx/main/analytics/__init__.py b/awx/main/analytics/__init__.py index 6aee1cef91..4302dc9cf2 100644 --- a/awx/main/analytics/__init__.py +++ b/awx/main/analytics/__init__.py @@ -1 +1 @@ -from .core import all_collectors, expensive_collectors, register, gather, ship # noqa +from .core import all_collectors, register, gather, ship # noqa diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index abdeb88b6c..2244d65ac5 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -5,9 +5,9 @@ import platform import distro from django.db import connection -from django.db.models import Count +from django.db.models import Count, Max from django.conf import settings -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.translation import ugettext_lazy as _ from awx.conf.license import get_license @@ -33,6 +33,14 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' +def four_hour_slicing(key, since, until): + start, end = since, None + while start < until: + end = min(start + timedelta(hours=4), until) + yield (start, end) + start = end + + @register('config', '1.3', description=_('General platform configuration.')) def config(since, **kwargs): license_info = get_license() @@ -270,7 +278,7 @@ class FileSplitter(io.StringIO): def write(self, s): if not self.header: - self.header = s[0 : s.index('\n')] + self.header = s[: s.index('\n')] self.counter += self.currentfile.write(s) if self.counter >= MAX_TABLE_SIZE: self.cycle_file() @@ -284,7 +292,20 @@ def _copy_table(table, query, path): return file.file_list() -@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=True) +def events_slicing(key, since, until): + from awx.conf.models import Setting + + step = 100000 + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = last_entries.value if last_entries is not None else {} + previous_pk = last_entries.get(key) or 0 + final_pk = models.JobEvent.objects.filter(created_lte=until).aggregate(Max('pk'))['pk__max'] + + for start in range(previous_pk, final_pk + 1, step): + yield (start, min(start + step, final_pk)) + + +@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing) def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, main_jobevent.created, @@ -302,22 +323,22 @@ def events_table(since, full_path, until, **kwargs): main_jobevent.role, main_jobevent.job_id, main_jobevent.host_id, - main_jobevent.host_name - , CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, + main_jobevent.host_name, + CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, CAST(main_jobevent.event_data::json->>'end' AS TIMESTAMP WITH TIME ZONE) AS end, main_jobevent.event_data::json->'duration' AS duration, main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'deprecations' AS deprecations FROM main_jobevent - WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}') + WHERE (main_jobevent.id > {} AND main_jobevent.id <= {}) ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER '''.format( - since.isoformat(), until.isoformat() + since, until ) return _copy_table(table='events', query=events_query, path=full_path) -@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=True) +@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing) def unified_jobs_table(since, full_path, until, **kwargs): unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, @@ -381,7 +402,7 @@ def unified_job_template_table(since, full_path, **kwargs): return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) -@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True) +@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=four_hour_slicing) def workflow_job_node_table(since, full_path, until, **kwargs): workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 69b992a1c2..cb3deee215 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -1,20 +1,24 @@ import inspect +import io import json import logging import os import os.path -import tempfile +import pathlib import shutil -import requests +import tarfile +import tempfile from django.conf import settings from django.utils.timezone import now, timedelta from rest_framework.exceptions import PermissionDenied +import requests from awx.conf.license import get_license from awx.main.models import Job from awx.main.access import access_registry from awx.main.utils import get_awx_http_client_headers, set_environ +from awx.main.utils.pglock import advisory_lock __all__ = ['register', 'gather', 'ship'] @@ -36,29 +40,18 @@ def _valid_license(): def all_collectors(): from awx.main.analytics import collectors - collector_dict = {} - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): - key = func.__awx_analytics_key__ - desc = func.__awx_analytics_description__ or '' - version = func.__awx_analytics_version__ - collector_dict[key] = {'name': key, 'version': version, 'description': desc} - return collector_dict + return { + func.__awx_analytics_key__: { + 'name': func.__awx_analytics_key__, + 'version': func.__awx_analytics_version__, + 'description': func.__awx_analytics_description__ or '', + } + for name, func in inspect.getmembers(collectors) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') + } -def expensive_collectors(): - from awx.main.analytics import collectors - - ret = [] - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__: - ret.append(func.__awx_analytics_key__) - return ret - - -def register(key, version, description=None, format='json', expensive=False): +def register(key, version, description=None, format='json', expensive=None): """ A decorator used to register a function as a metric collector. @@ -82,136 +75,154 @@ def register(key, version, description=None, format='json', expensive=False): return decorate -def gather(dest=None, module=None, subset=None, since=None, until=now(), collection_type='scheduled'): +def package(target, data, timestamp): + try: + tarname_base = f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}' + path = pathlib.Path(target) + index = len(list(path.glob(f'{tarname_base}-*.*'))) + tarname = f'{tarname_base}-{index}.tar.gz' + + manifest = {} + with tarfile.open(target.joinpath(tarname), 'w:gz') as f: + for name, (item, version) in data.items(): + manifest[name] = version + if isinstance(item, str): + info = f.gettarinfo(item, arcname=name) + f.addfile(info) + else: + info = tarfile.TarInfo(name) + fileobj = io.BytesIO(json.dumps(item).encode('utf-8')) + fileobj.size = len(fileobj.getvalue()) + f.addfile(info, fileobj=fileobj) + + info = tarfile.TarInfo('manifest.json') + fileobj = io.BytesIO(json.dumps(manifest).encode('utf-8')) + fileobj.size = len(fileobj.getvalue()) + f.addfile(info, fileobj=fileobj) + + return f.name + except Exception: + logger.exception("Failed to write analytics archive file") + return None + + +def gather(dest=None, module=None, subset=None, since=None, until=None, collection_type='scheduled'): """ Gather all defined metrics and write them as JSON files in a .tgz - :param dest: the (optional) absolute path to write a compressed tarball + :param dest: the (optional) absolute path to write a compressed tarball :param module: the module to search for registered analytic collector - functions; defaults to awx.main.analytics.collectors + functions; defaults to awx.main.analytics.collectors """ - def _write_manifest(destdir, manifest): - path = os.path.join(destdir, 'manifest.json') - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(manifest, f) - except Exception: - f.close() - os.remove(f.name) - logger.exception("Could not generate manifest.json") - - last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4)) - logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) - - if _valid_license() is False: - logger.exception("Invalid License provided, or No License Provided") + if not _valid_license(): + logger.error("Invalid License provided, or No License Provided") return None - if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE: - logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") - return None + if collection_type != 'dry-run': + if not settings.INSIGHTS_TRACKING_STATE: + logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") + return None - collector_list = [] - if module: - collector_module = module - else: - from awx.main.analytics import collectors + if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): + logger.debug('Not gathering analytics, configuration is invalid') + return None - collector_module = collectors - for name, func in inspect.getmembers(collector_module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset): - collector_list.append((name, func)) + with advisory_lock('gather_analytics_lock', wait=False) as acquired: + if not acquired: + logger.debug('Not gathering analytics, another task holds lock') + return None - manifest = dict() - dest = dest or tempfile.mkdtemp(prefix='awx_analytics') - gather_dir = os.path.join(dest, 'stage') - os.mkdir(gather_dir, 0o700) - num_splits = 1 - for name, func in collector_list: - if func.__awx_analytics_type__ == 'json': + from awx.conf.models import Setting + from awx.main.signals import disable_activity_stream + + if until is None: + until = now() + last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (until - timedelta(weeks=4)) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = last_entries.value if last_entries is not None else {} + logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) + + if module: + collector_module = module + else: + from awx.main.analytics import collectors + + collector_module = collectors + if subset: # ensure that the config collector is added to any explicit subset of our builtins + subset = set(subset) | { + 'config', + } + + collector_list = [ + func + for name, func in inspect.getmembers(collector_module) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset) + ] + json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json'] + csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv'] + + dest = pathlib.Path(dest or tempfile.mkdtemp(prefix='awx_analytics')) + gather_dir = dest.joinpath('stage') + gather_dir.mkdir(mode=0o700) + tarfiles = [] + + # These json collectors are pretty compact, so collect all of them before shipping to analytics. + data = {} + for func in json_collectors: key = func.__awx_analytics_key__ - path = '{}.json'.format(os.path.join(gather_dir, key)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(func(last_run, collection_type=collection_type, until=until), f) - manifest['{}.json'.format(key)] = func.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - elif func.__awx_analytics_type__ == 'csv': + filename = f'{key}.json' + try: + data[filename] = (func(last_run, collection_type=collection_type, until=until), func.__awx_analytics_version__) + except Exception: + logger.exception("Could not generate metric {}".format(filename)) + if data: + tgzfile = package(dest.parent, data, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + + if collection_type != 'dry-run': + ship(tgzfile) + with disable_activity_stream(): + for filename in data: + last_entries[filename.replace('.json', '')] = until + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries + + for func in csv_collectors: key = func.__awx_analytics_key__ + filename = f'{key}.csv' try: - files = func(last_run, full_path=gather_dir, until=until) - if files: - manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ - if len(files) > num_splits: - num_splits = len(files) + slices = [(last_run, until)] + if func.__awx_expensive__: + slices = func.__awx_expensive__(key, last_run, until) # it's ok if this returns a generator + for start, end in slices: + files = func(start, full_path=gather_dir, until=end) + + if not files: + continue + for fpath in files: + tgzfile = package(dest.parent, {filename: (fpath, func.__awx_analytics_version__)}, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + + if collection_type != 'dry-run': + ship(tgzfile) + with disable_activity_stream(): + last_entries[key] = end + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = last_entries except Exception: - logger.exception("Could not generate metric {}.csv".format(key)) + logger.exception("Could not generate metric {}".format(filename)) - if not manifest: - # No data was collected - logger.warning("No data from {} to {}".format(last_run, until)) - shutil.rmtree(dest) - return None + with disable_activity_stream(): + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until - # Always include config.json if we're using our collectors - if 'config.json' not in manifest.keys() and not module: - from awx.main.analytics import collectors + shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files + if not tarfiles: + # No data was collected + logger.warning("No data from {} to {}".format(last_run, until)) + return None - config = collectors.config - path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(collectors.config(last_run), f) - manifest['config.json'] = config.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - shutil.rmtree(dest) - return None - - stage_dirs = [gather_dir] - if num_splits > 1: - for i in range(0, num_splits): - split_path = os.path.join(dest, 'split{}'.format(i)) - os.mkdir(split_path, 0o700) - filtered_manifest = {} - shutil.copy(os.path.join(gather_dir, 'config.json'), split_path) - filtered_manifest['config.json'] = manifest['config.json'] - suffix = '_split{}'.format(i) - for file in os.listdir(gather_dir): - if file.endswith(suffix): - old_file = os.path.join(gather_dir, file) - new_filename = file.replace(suffix, '') - new_file = os.path.join(split_path, new_filename) - shutil.move(old_file, new_file) - filtered_manifest[new_filename] = manifest[new_filename] - _write_manifest(split_path, filtered_manifest) - stage_dirs.append(split_path) - - for item in list(manifest.keys()): - if not os.path.exists(os.path.join(gather_dir, item)): - manifest.pop(item) - _write_manifest(gather_dir, manifest) - - tarfiles = [] - try: - for i in range(0, len(stage_dirs)): - stage_dir = stage_dirs[i] - # can't use isoformat() since it has colons, which GNU tar doesn't like - tarname = '_'.join([settings.SYSTEM_UUID, until.strftime('%Y-%m-%d-%H%M%S%z'), str(i)]) - tgz = shutil.make_archive(os.path.join(os.path.dirname(dest), tarname), 'gztar', stage_dir) - tarfiles.append(tgz) - except Exception: - shutil.rmtree(stage_dir, ignore_errors=True) - logger.exception("Failed to write analytics archive file") - finally: - shutil.rmtree(dest, ignore_errors=True) - return tarfiles + return tarfiles def ship(path): diff --git a/awx/main/conf.py b/awx/main/conf.py index 5cfd2977f7..3699d3f685 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -10,6 +10,7 @@ from rest_framework.fields import FloatField # Tower from awx.conf import fields, register, register_validate +from awx.main.fields import JSONField from awx.main.models import ExecutionEnvironment @@ -778,6 +779,13 @@ register( category=_('System'), category_slug='system', ) +register( + 'AUTOMATION_ANALYTICS_LAST_ENTRIES', + field_class=JSONField, + # label=_('Last gathered entries for expensive Automation Analytics collectors.'), + category=_('System'), + category_slug='system', +) register( diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index 5099d4d0d1..25fffb4dec 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -1,6 +1,6 @@ import logging -from awx.main.analytics import gather, ship +from awx.main import analytics from dateutil import parser from django.core.management.base import BaseCommand from django.utils.timezone import now @@ -48,13 +48,9 @@ class Command(BaseCommand): if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) + tgzfiles = analytics.gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) if tgzfiles: for tgz in tgzfiles: self.logger.info(tgz) else: self.logger.error('No analytics collected') - if opt_ship: - if tgzfiles: - for tgz in tgzfiles: - ship(tgz) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 992620fbd8..dfd63569ec 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -57,7 +57,6 @@ from receptorctl.socket_interface import ReceptorControl from awx import __version__ as awx_application_version from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.access import access_registry -from awx.main.analytics import all_collectors, expensive_collectors from awx.main.redact import UriCleaner from awx.main.models import ( Schedule, @@ -377,70 +376,15 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): - def _gather_and_ship(subset, since, until): - tgzfiles = [] - try: - tgzfiles = analytics.gather(subset=subset, since=since, until=until) - # empty analytics without raising an exception is not an error - if not tgzfiles: - return True - logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles)) - for tgz in tgzfiles: - analytics.ship(tgz) - except Exception: - logger.exception('Error gathering and sending analytics for {} to {}.'.format(since, until)) - return False - finally: - if tgzfiles: - for tgz in tgzfiles: - if os.path.exists(tgz): - os.remove(tgz) - return True - from awx.conf.models import Setting from rest_framework.fields import DateTimeField - from awx.main.signals import disable_activity_stream - if not settings.INSIGHTS_TRACKING_STATE: - return - if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): - logger.debug('Not gathering analytics, configuration is invalid') - return last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first() - if last_gather: - last_time = DateTimeField().to_internal_value(last_gather.value) - else: - last_time = None + last_time = DateTimeField().to_internal_value(last_gather.value) if last_gather else None gather_time = now() + if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): - with advisory_lock('gather_analytics_lock', wait=False) as acquired: - if acquired is False: - logger.debug('Not gathering analytics, another task holds lock') - return - subset = list(all_collectors().keys()) - incremental_collectors = [] - for collector in expensive_collectors(): - if collector in subset: - subset.remove(collector) - incremental_collectors.append(collector) - - # Cap gathering at 4 weeks of data if there has been no data gathering - since = last_time or (gather_time - timedelta(weeks=4)) - - if incremental_collectors: - start = since - until = None - while start < gather_time: - until = start + timedelta(hours=4) - if until > gather_time: - until = gather_time - if not _gather_and_ship(incremental_collectors, since=start, until=until): - break - start = until - with disable_activity_stream(): - settings.AUTOMATION_ANALYTICS_LAST_GATHER = until - if subset: - _gather_and_ship(subset, since=since, until=gather_time) + analytics.gather() @task(queue=get_local_queuename) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b6a3966647..1a91d392c7 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -620,6 +620,8 @@ INSIGHTS_TRACKING_STATE = False # Last gather date for Analytics AUTOMATION_ANALYTICS_LAST_GATHER = None +# Last gathered entries for expensive Analytics +AUTOMATION_ANALYTICS_LAST_ENTRIES = None # Default list of modules allowed for ad hoc commands. # Note: This setting may be overridden by database settings.