diff --git a/awx/main/analytics/__init__.py b/awx/main/analytics/__init__.py index 6aee1cef91..4302dc9cf2 100644 --- a/awx/main/analytics/__init__.py +++ b/awx/main/analytics/__init__.py @@ -1 +1 @@ -from .core import all_collectors, expensive_collectors, register, gather, ship # noqa +from .core import all_collectors, register, gather, ship # noqa diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index abdeb88b6c..48262c8e0a 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -1,22 +1,23 @@ import io +import json import os import os.path import platform import distro from django.db import connection -from django.db.models import Count +from django.db.models import Count, Max, Min from django.conf import settings -from django.utils.timezone import now +from django.contrib.sessions.models import Session +from django.utils.timezone import now, timedelta from django.utils.translation import ugettext_lazy as _ from awx.conf.license import get_license -from awx.main.utils import get_awx_version, get_custom_venv_choices, camelcase_to_underscore +from awx.main.utils import get_awx_version, get_custom_venv_choices, camelcase_to_underscore, datetime_hook from awx.main import models -from django.contrib.sessions.models import Session from awx.main.analytics import register -''' +""" This module is used to define metrics collected by awx.main.analytics.gather() Each function is decorated with a key name, and should return a data structure that can be serialized to JSON @@ -30,7 +31,61 @@ All functions - when called - will be passed a datetime.datetime object, `since`, which represents the last time analytics were gathered (some metrics functions - like those that return metadata about playbook runs, may return data _since_ the last report date - i.e., new data in the last 24 hours) -''' +""" + + +def trivial_slicing(key, since, until): + if since is not None: + return [(since, until)] + + from awx.conf.models import Setting + + horizon = until - timedelta(weeks=4) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) + last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + return [(last_entry, until)] + + +def four_hour_slicing(key, since, until): + if since is not None: + last_entry = since + else: + from awx.conf.models import Setting + + horizon = until - timedelta(weeks=4) + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) + last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + + start, end = last_entry, None + while start < until: + end = min(start + timedelta(hours=4), until) + yield (start, end) + start = end + + +def events_slicing(key, since, until): + from awx.conf.models import Setting + + last_gather = settings.AUTOMATION_ANALYTICS_LAST_GATHER + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) + horizon = until - timedelta(weeks=4) + + lower = since or last_gather or horizon + if not since and last_entries.get(key): + lower = horizon + pk_values = models.JobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk')) + + previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0 + if not since and last_entries.get(key): + previous_pk = max(last_entries[key], previous_pk) + final_pk = pk_values['pk__max'] or 0 + + step = 100000 + for start in range(previous_pk, final_pk + 1, step): + yield (start, min(start + step, final_pk)) @register('config', '1.3', description=_('General platform configuration.')) @@ -265,12 +320,15 @@ class FileSplitter(io.StringIO): self.files = self.files[:-1] # If we only have one file, remove the suffix if len(self.files) == 1: - os.rename(self.files[0], self.files[0].replace('_split0', '')) + filename = self.files.pop() + new_filename = filename.replace('_split0', '') + os.rename(filename, new_filename) + self.files.append(new_filename) return self.files def write(self, s): if not self.header: - self.header = s[0 : s.index('\n')] + self.header = s[: s.index('\n')] self.counter += self.currentfile.write(s) if self.counter >= MAX_TABLE_SIZE: self.cycle_file() @@ -284,7 +342,7 @@ def _copy_table(table, query, path): return file.file_list() -@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=True) +@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing) def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, main_jobevent.created, @@ -302,22 +360,22 @@ def events_table(since, full_path, until, **kwargs): main_jobevent.role, main_jobevent.job_id, main_jobevent.host_id, - main_jobevent.host_name - , CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, + main_jobevent.host_name, + CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start, CAST(main_jobevent.event_data::json->>'end' AS TIMESTAMP WITH TIME ZONE) AS end, main_jobevent.event_data::json->'duration' AS duration, main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'deprecations' AS deprecations FROM main_jobevent - WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}') + WHERE (main_jobevent.id > {} AND main_jobevent.id <= {}) ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER '''.format( - since.isoformat(), until.isoformat() + since, until ) return _copy_table(table='events', query=events_query, path=full_path) -@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=True) +@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing) def unified_jobs_table(since, full_path, until, **kwargs): unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, @@ -381,7 +439,7 @@ def unified_job_template_table(since, full_path, **kwargs): return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) -@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True) +@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=four_hour_slicing) def workflow_job_node_table(since, full_path, until, **kwargs): workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 69b992a1c2..71008fc2a5 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -1,20 +1,25 @@ import inspect +import io import json import logging import os import os.path -import tempfile +import pathlib import shutil -import requests +import tarfile +import tempfile from django.conf import settings +from django.core.serializers.json import DjangoJSONEncoder from django.utils.timezone import now, timedelta from rest_framework.exceptions import PermissionDenied +import requests from awx.conf.license import get_license from awx.main.models import Job from awx.main.access import access_registry -from awx.main.utils import get_awx_http_client_headers, set_environ +from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook +from awx.main.utils.pglock import advisory_lock __all__ = ['register', 'gather', 'ship'] @@ -36,29 +41,18 @@ def _valid_license(): def all_collectors(): from awx.main.analytics import collectors - collector_dict = {} - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): - key = func.__awx_analytics_key__ - desc = func.__awx_analytics_description__ or '' - version = func.__awx_analytics_version__ - collector_dict[key] = {'name': key, 'version': version, 'description': desc} - return collector_dict + return { + func.__awx_analytics_key__: { + 'name': func.__awx_analytics_key__, + 'version': func.__awx_analytics_version__, + 'description': func.__awx_analytics_description__ or '', + } + for name, func in inspect.getmembers(collectors) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') + } -def expensive_collectors(): - from awx.main.analytics import collectors - - ret = [] - module = collectors - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__: - ret.append(func.__awx_analytics_key__) - return ret - - -def register(key, version, description=None, format='json', expensive=False): +def register(key, version, description=None, format='json', expensive=None): """ A decorator used to register a function as a metric collector. @@ -82,136 +76,206 @@ def register(key, version, description=None, format='json', expensive=False): return decorate -def gather(dest=None, module=None, subset=None, since=None, until=now(), collection_type='scheduled'): +def package(target, data, timestamp): + try: + tarname_base = f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}' + path = pathlib.Path(target) + index = len(list(path.glob(f'{tarname_base}-*.*'))) + tarname = f'{tarname_base}-{index}.tar.gz' + + manifest = {} + with tarfile.open(target.joinpath(tarname), 'w:gz') as f: + for name, (item, version) in data.items(): + try: + if isinstance(item, str): + f.add(item, arcname=f'./{name}') + else: + buf = json.dumps(item).encode('utf-8') + info = tarfile.TarInfo(f'./{name}') + info.size = len(buf) + info.mtime = timestamp.timestamp() + f.addfile(info, fileobj=io.BytesIO(buf)) + manifest[name] = version + except Exception: + logger.exception(f"Could not generate metric {name}") + return None + + try: + buf = json.dumps(manifest).encode('utf-8') + info = tarfile.TarInfo('./manifest.json') + info.size = len(buf) + info.mtime = timestamp.timestamp() + f.addfile(info, fileobj=io.BytesIO(buf)) + except Exception: + logger.exception("Could not generate manifest.json") + return None + + return f.name + except Exception: + logger.exception("Failed to write analytics archive file") + return None + + +def gather(dest=None, module=None, subset=None, since=None, until=None, collection_type='scheduled'): """ Gather all defined metrics and write them as JSON files in a .tgz - :param dest: the (optional) absolute path to write a compressed tarball + :param dest: the (optional) absolute path to write a compressed tarball :param module: the module to search for registered analytic collector - functions; defaults to awx.main.analytics.collectors + functions; defaults to awx.main.analytics.collectors """ + log_level = logging.ERROR if collection_type != 'scheduled' else logging.DEBUG - def _write_manifest(destdir, manifest): - path = os.path.join(destdir, 'manifest.json') - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(manifest, f) - except Exception: - f.close() - os.remove(f.name) - logger.exception("Could not generate manifest.json") - - last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4)) - logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) - - if _valid_license() is False: - logger.exception("Invalid License provided, or No License Provided") + if not _valid_license(): + logger.log(log_level, "Invalid License provided, or No License Provided") return None - if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE: - logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") - return None + if collection_type != 'dry-run': + if not settings.INSIGHTS_TRACKING_STATE: + logger.log(log_level, "Automation Analytics not enabled. Use --dry-run to gather locally without sending.") + return None - collector_list = [] - if module: - collector_module = module - else: + if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): + logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.") + return None + + with advisory_lock('gather_analytics_lock', wait=False) as acquired: + if not acquired: + logger.log(log_level, "Not gathering analytics, another task holds lock") + return None + + from awx.conf.models import Setting from awx.main.analytics import collectors + from awx.main.signals import disable_activity_stream - collector_module = collectors - for name, func in inspect.getmembers(collector_module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset): - collector_list.append((name, func)) + _now = now() + until = _now if until is None else min(until, _now) # Make sure the end isn't in the future. + horizon = until - timedelta(weeks=4) + if since is not None: + # Make sure the start isn't in the future or more than 4 weeks prior to `until`. + since = max(min(since, _now), horizon) + if since and since >= until: + logger.warning("Start of the collection interval is later than the end, ignoring request.") + return None - manifest = dict() - dest = dest or tempfile.mkdtemp(prefix='awx_analytics') - gather_dir = os.path.join(dest, 'stage') - os.mkdir(gather_dir, 0o700) - num_splits = 1 - for name, func in collector_list: - if func.__awx_analytics_type__ == 'json': - key = func.__awx_analytics_key__ - path = '{}.json'.format(os.path.join(gather_dir, key)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(func(last_run, collection_type=collection_type, until=until), f) - manifest['{}.json'.format(key)] = func.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - elif func.__awx_analytics_type__ == 'csv': + logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) + # LAST_GATHER time should always get truncated to less than 4 weeks back. + last_gather = max(settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon) + + last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first() + last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook) + + collector_module = module if module else collectors + collector_list = [ + func + for name, func in inspect.getmembers(collector_module) + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset) + ] + if not any(c.__awx_analytics_key__ == 'config' for c in collector_list): + # In order to ship to analytics, we must include the output of the built-in 'config' collector. + collector_list.append(collectors.config) + + json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json'] + csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv'] + + dest = pathlib.Path(dest or tempfile.mkdtemp(prefix='awx_analytics')) + gather_dir = dest.joinpath('stage') + gather_dir.mkdir(mode=0o700) + tarfiles = [] + succeeded = True + + # These json collectors are pretty compact, so collect all of them before shipping to analytics. + data = {} + for func in json_collectors: key = func.__awx_analytics_key__ + filename = f'{key}.json' try: - files = func(last_run, full_path=gather_dir, until=until) - if files: - manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ - if len(files) > num_splits: - num_splits = len(files) + last_entry = max(last_entries.get(key) or last_gather, horizon) + results = (func(since or last_entry, collection_type=collection_type, until=until), func.__awx_analytics_version__) + json.dumps(results) # throwaway check to see if the data is json-serializable + data[filename] = results except Exception: - logger.exception("Could not generate metric {}.csv".format(key)) - - if not manifest: - # No data was collected - logger.warning("No data from {} to {}".format(last_run, until)) - shutil.rmtree(dest) - return None - - # Always include config.json if we're using our collectors - if 'config.json' not in manifest.keys() and not module: - from awx.main.analytics import collectors - - config = collectors.config - path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__)) - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(collectors.config(last_run), f) - manifest['config.json'] = config.__awx_analytics_version__ - except Exception: - logger.exception("Could not generate metric {}.json".format(key)) - f.close() - os.remove(f.name) - shutil.rmtree(dest) + logger.exception("Could not generate metric {}".format(filename)) + if data: + if data.get('config.json') is None: + logger.error("'config' collector data is missing.") return None - stage_dirs = [gather_dir] - if num_splits > 1: - for i in range(0, num_splits): - split_path = os.path.join(dest, 'split{}'.format(i)) - os.mkdir(split_path, 0o700) - filtered_manifest = {} - shutil.copy(os.path.join(gather_dir, 'config.json'), split_path) - filtered_manifest['config.json'] = manifest['config.json'] - suffix = '_split{}'.format(i) - for file in os.listdir(gather_dir): - if file.endswith(suffix): - old_file = os.path.join(gather_dir, file) - new_filename = file.replace(suffix, '') - new_file = os.path.join(split_path, new_filename) - shutil.move(old_file, new_file) - filtered_manifest[new_filename] = manifest[new_filename] - _write_manifest(split_path, filtered_manifest) - stage_dirs.append(split_path) + tgzfile = package(dest.parent, data, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + if collection_type != 'dry-run': + if ship(tgzfile): + with disable_activity_stream(): + for filename in data: + key = filename.replace('.json', '') + last_entries[key] = max(last_entries[key], until) if last_entries.get(key) else until + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) + else: + succeeded = False - for item in list(manifest.keys()): - if not os.path.exists(os.path.join(gather_dir, item)): - manifest.pop(item) - _write_manifest(gather_dir, manifest) + for func in csv_collectors: + key = func.__awx_analytics_key__ + filename = f'{key}.csv' + try: + # These slicer functions may return a generator. The `since` parameter is + # allowed to be None, and will fall back to LAST_ENTRIES[key] or to + # LAST_GATHER (truncated appropriately to match the 4-week limit). + if func.__awx_expensive__: + slices = func.__awx_expensive__(key, since, until) + else: + slices = collectors.trivial_slicing(key, since, until) - tarfiles = [] - try: - for i in range(0, len(stage_dirs)): - stage_dir = stage_dirs[i] - # can't use isoformat() since it has colons, which GNU tar doesn't like - tarname = '_'.join([settings.SYSTEM_UUID, until.strftime('%Y-%m-%d-%H%M%S%z'), str(i)]) - tgz = shutil.make_archive(os.path.join(os.path.dirname(dest), tarname), 'gztar', stage_dir) - tarfiles.append(tgz) - except Exception: - shutil.rmtree(stage_dir, ignore_errors=True) - logger.exception("Failed to write analytics archive file") - finally: - shutil.rmtree(dest, ignore_errors=True) - return tarfiles + for start, end in slices: + files = func(start, full_path=gather_dir, until=end) + + if not files: + if collection_type != 'dry-run': + with disable_activity_stream(): + last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) + continue + + slice_succeeded = True + for fpath in files: + payload = {filename: (fpath, func.__awx_analytics_version__)} + + payload['config.json'] = data.get('config.json') + if payload['config.json'] is None: + logger.error("'config' collector data is missing, and is required to ship.") + return None + + tgzfile = package(dest.parent, payload, until) + if tgzfile is not None: + tarfiles.append(tgzfile) + if not ship(tgzfile): + slice_succeeded, succeeded = False, False + break + + if slice_succeeded and collection_type != 'dry-run': + with disable_activity_stream(): + last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end + settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder) + except Exception: + succeeded = False + logger.exception("Could not generate metric {}".format(filename)) + + if collection_type != 'dry-run': + if succeeded: + for fpath in tarfiles: + if os.path.exists(fpath): + os.remove(fpath) + with disable_activity_stream(): + if not settings.AUTOMATION_ANALYTICS_LAST_GATHER or until > settings.AUTOMATION_ANALYTICS_LAST_GATHER: + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until + + shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files + if not tarfiles: + # No data was collected + logger.warning("No data from {} to {}".format(since or last_gather, until)) + return None + + return tarfiles def ship(path): @@ -220,42 +284,38 @@ def ship(path): """ if not path: logger.error('Automation Analytics TAR not found') - return + return False if not os.path.exists(path): logger.error('Automation Analytics TAR {} not found'.format(path)) - return + return False if "Error:" in str(path): - return - try: - logger.debug('shipping analytics file: {}'.format(path)) - url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None) - if not url: - logger.error('AUTOMATION_ANALYTICS_URL is not set') - return - rh_user = getattr(settings, 'REDHAT_USERNAME', None) - rh_password = getattr(settings, 'REDHAT_PASSWORD', None) - if not rh_user: - return logger.error('REDHAT_USERNAME is not set') - if not rh_password: - return logger.error('REDHAT_PASSWORD is not set') - with open(path, 'rb') as f: - files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} - s = requests.Session() - s.headers = get_awx_http_client_headers() - s.headers.pop('Content-Type') - with set_environ(**settings.AWX_TASK_ENV): - response = s.post( - url, - files=files, - verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", - auth=(rh_user, rh_password), - headers=s.headers, - timeout=(31, 31), - ) - # Accept 2XX status_codes - if response.status_code >= 300: - return logger.exception('Upload failed with status {}, {}'.format(response.status_code, response.text)) - finally: - # cleanup tar.gz - if os.path.exists(path): - os.remove(path) + return False + + logger.debug('shipping analytics file: {}'.format(path)) + url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None) + if not url: + logger.error('AUTOMATION_ANALYTICS_URL is not set') + return False + rh_user = getattr(settings, 'REDHAT_USERNAME', None) + rh_password = getattr(settings, 'REDHAT_PASSWORD', None) + if not rh_user: + logger.error('REDHAT_USERNAME is not set') + return False + if not rh_password: + logger.error('REDHAT_PASSWORD is not set') + return False + with open(path, 'rb') as f: + files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} + s = requests.Session() + s.headers = get_awx_http_client_headers() + s.headers.pop('Content-Type') + with set_environ(**settings.AWX_TASK_ENV): + response = s.post( + url, files=files, verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31) + ) + # Accept 2XX status_codes + if response.status_code >= 300: + logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text)) + return False + + return True diff --git a/awx/main/conf.py b/awx/main/conf.py index 5cfd2977f7..5c5df72218 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -778,6 +778,15 @@ register( category=_('System'), category_slug='system', ) +register( + 'AUTOMATION_ANALYTICS_LAST_ENTRIES', + field_class=fields.CharField, + label=_('Last gathered entries for expensive Automation Analytics collectors.'), + default='', + allow_blank=True, + category=_('System'), + category_slug='system', +) register( diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index 5099d4d0d1..60d1524fc7 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -1,6 +1,6 @@ import logging -from awx.main.analytics import gather, ship +from awx.main import analytics from dateutil import parser from django.core.management.base import BaseCommand from django.utils.timezone import now @@ -48,13 +48,9 @@ class Command(BaseCommand): if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until) + tgzfiles = analytics.gather(collection_type='manual' if opt_ship else 'dry-run', since=since, until=until) if tgzfiles: for tgz in tgzfiles: self.logger.info(tgz) else: self.logger.error('No analytics collected') - if opt_ship: - if tgzfiles: - for tgz in tgzfiles: - ship(tgz) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 992620fbd8..dfd63569ec 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -57,7 +57,6 @@ from receptorctl.socket_interface import ReceptorControl from awx import __version__ as awx_application_version from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.access import access_registry -from awx.main.analytics import all_collectors, expensive_collectors from awx.main.redact import UriCleaner from awx.main.models import ( Schedule, @@ -377,70 +376,15 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): - def _gather_and_ship(subset, since, until): - tgzfiles = [] - try: - tgzfiles = analytics.gather(subset=subset, since=since, until=until) - # empty analytics without raising an exception is not an error - if not tgzfiles: - return True - logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles)) - for tgz in tgzfiles: - analytics.ship(tgz) - except Exception: - logger.exception('Error gathering and sending analytics for {} to {}.'.format(since, until)) - return False - finally: - if tgzfiles: - for tgz in tgzfiles: - if os.path.exists(tgz): - os.remove(tgz) - return True - from awx.conf.models import Setting from rest_framework.fields import DateTimeField - from awx.main.signals import disable_activity_stream - if not settings.INSIGHTS_TRACKING_STATE: - return - if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): - logger.debug('Not gathering analytics, configuration is invalid') - return last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first() - if last_gather: - last_time = DateTimeField().to_internal_value(last_gather.value) - else: - last_time = None + last_time = DateTimeField().to_internal_value(last_gather.value) if last_gather else None gather_time = now() + if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): - with advisory_lock('gather_analytics_lock', wait=False) as acquired: - if acquired is False: - logger.debug('Not gathering analytics, another task holds lock') - return - subset = list(all_collectors().keys()) - incremental_collectors = [] - for collector in expensive_collectors(): - if collector in subset: - subset.remove(collector) - incremental_collectors.append(collector) - - # Cap gathering at 4 weeks of data if there has been no data gathering - since = last_time or (gather_time - timedelta(weeks=4)) - - if incremental_collectors: - start = since - until = None - while start < gather_time: - until = start + timedelta(hours=4) - if until > gather_time: - until = gather_time - if not _gather_and_ship(incremental_collectors, since=start, until=until): - break - start = until - with disable_activity_stream(): - settings.AUTOMATION_ANALYTICS_LAST_GATHER = until - if subset: - _gather_and_ship(subset, since=since, until=gather_time) + analytics.gather() @task(queue=get_local_queuename) diff --git a/awx/main/tests/functional/analytics/test_core.py b/awx/main/tests/functional/analytics/test_core.py index dbb819a87f..e37f30d26b 100644 --- a/awx/main/tests/functional/analytics/test_core.py +++ b/awx/main/tests/functional/analytics/test_core.py @@ -39,7 +39,7 @@ def mock_valid_license(): def test_gather(mock_valid_license): settings.INSIGHTS_TRACKING_STATE = True - tgzfiles = gather(module=importlib.import_module(__name__)) + tgzfiles = gather(module=importlib.import_module(__name__), collection_type='dry-run') files = {} with tarfile.open(tgzfiles[0], "r:gz") as archive: for member in archive.getmembers(): diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 8ad4a9f485..912bdb5364 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -20,6 +20,7 @@ from decimal import Decimal # Django from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist +from django.utils.dateparse import parse_datetime from django.utils.translation import ugettext_lazy as _ from django.utils.functional import cached_property from django.db.models.fields.related import ForeignObjectRel, ManyToManyField @@ -54,6 +55,7 @@ __all__ = [ 'copy_m2m_relationships', 'prefetch_page_capabilities', 'to_python_boolean', + 'datetime_hook', 'ignore_inventory_computed_fields', 'ignore_inventory_group_removal', '_inventory_updates', @@ -115,6 +117,16 @@ def to_python_boolean(value, allow_none=False): raise ValueError(_(u'Unable to convert "%s" to boolean') % value) +def datetime_hook(d): + new_d = {} + for key, value in d.items(): + try: + new_d[key] = parse_datetime(value) + except TypeError: + new_d[key] = value + return new_d + + def camelcase_to_underscore(s): """ Convert CamelCase names to lowercase_with_underscore. diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b6a3966647..93ad306b1e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -620,6 +620,8 @@ INSIGHTS_TRACKING_STATE = False # Last gather date for Analytics AUTOMATION_ANALYTICS_LAST_GATHER = None +# Last gathered entries for expensive Analytics +AUTOMATION_ANALYTICS_LAST_ENTRIES = '' # Default list of modules allowed for ad hoc commands. # Note: This setting may be overridden by database settings.