From 40309e6f704051ea8f4648284f4088865ed58a5b Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Wed, 9 Sep 2020 17:10:51 -0400 Subject: [PATCH] Ensure we do not send large bundles, or empty bundles Collect expensive collectors separately, and in a loop where we make smaller intermediate dumps. Don't return a table dump if there are no records, and don't put that CSV in the manifest. Fix up unit tests. --- awx/main/analytics/__init__.py | 2 +- awx/main/analytics/collectors.py | 62 +++++++++++-------- awx/main/analytics/core.py | 55 ++++++++++++---- awx/main/tasks.py | 49 ++++++++++++--- .../functional/analytics/test_collectors.py | 5 +- .../tests/functional/analytics/test_core.py | 6 +- 6 files changed, 126 insertions(+), 53 deletions(-) diff --git a/awx/main/analytics/__init__.py b/awx/main/analytics/__init__.py index 4302dc9cf2..6aee1cef91 100644 --- a/awx/main/analytics/__init__.py +++ b/awx/main/analytics/__init__.py @@ -1 +1 @@ -from .core import all_collectors, register, gather, ship # noqa +from .core import all_collectors, expensive_collectors, register, gather, ship # noqa diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 01f7f37cdb..45afdc7152 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -32,8 +32,8 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' -@register('config', '1.1', description=_('General platform configuration')) -def config(since): +@register('config', '1.1', description=_('General platform configuration.')) +def config(since, **kwargs): license_info = get_license(show_key=False) install_type = 'traditional' if os.environ.get('container') == 'oci': @@ -65,7 +65,7 @@ def config(since): @register('counts', '1.0', description=_('Counts of objects such as organizations, inventories, and projects')) -def counts(since): +def counts(since, **kwargs): counts = {} for cls in (models.Organization, models.Team, models.User, models.Inventory, models.Credential, models.Project, @@ -100,7 +100,7 @@ def counts(since): @register('org_counts', '1.0', description=_('Counts of users and teams by organization')) -def org_counts(since): +def org_counts(since, **kwargs): counts = {} for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True), num_teams=Count('teams', distinct=True)).values('name', 'id', 'num_users', 'num_teams'): @@ -112,7 +112,7 @@ def org_counts(since): @register('cred_type_counts', '1.0', description=_('Counts of credentials by credential type')) -def cred_type_counts(since): +def cred_type_counts(since, **kwargs): counts = {} for cred_type in models.CredentialType.objects.annotate(num_credentials=Count( 'credentials', distinct=True)).values('name', 'id', 'managed_by_tower', 'num_credentials'): @@ -124,7 +124,7 @@ def cred_type_counts(since): @register('inventory_counts', '1.2', description=_('Inventories, their inventory sources, and host counts')) -def inventory_counts(since): +def inventory_counts(since, **kwargs): counts = {} for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True), num_hosts=Count('hosts', distinct=True)).only('id', 'name', 'kind'): @@ -149,7 +149,7 @@ def inventory_counts(since): @register('projects_by_scm_type', '1.0', description=_('Counts of projects by source control type')) -def projects_by_scm_type(since): +def projects_by_scm_type(since, **kwargs): counts = dict( (t[0] or 'manual', 0) for t in models.Project.SCM_TYPE_CHOICES @@ -168,7 +168,7 @@ def _get_isolated_datetime(last_check): @register('instance_info', '1.0', description=_('Cluster topology and capacity')) -def instance_info(since, include_hostnames=False): +def instance_info(since, include_hostnames=False, **kwargs): info = {} instances = models.Instance.objects.values_list('hostname').values( 'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'last_isolated_check', 'enabled') @@ -194,7 +194,7 @@ def instance_info(since, include_hostnames=False): @register('job_counts', '1.0', description=_('Counts of jobs by status')) -def job_counts(since): +def job_counts(since, **kwargs): counts = {} counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count() counts['status'] = dict(models.UnifiedJob.objects.exclude(launch_type='sync').values_list('status').annotate(Count('status')).order_by()) @@ -204,7 +204,7 @@ def job_counts(since): @register('job_instance_counts', '1.0', description=_('Counts of jobs by execution node')) -def job_instance_counts(since): +def job_instance_counts(since, **kwargs): counts = {} job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list( 'execution_node', 'launch_type').annotate(job_launch_type=Count('launch_type')).order_by() @@ -219,7 +219,7 @@ def job_instance_counts(since): @register('query_info', '1.0', description=_('Metadata about the analytics collected')) -def query_info(since, collection_type): +def query_info(since, collection_type, **kwargs): query_info = {} query_info['last_run'] = str(since) query_info['current_time'] = str(now()) @@ -233,11 +233,19 @@ def _copy_table(table, query, path): with connection.cursor() as cursor: cursor.copy_expert(query, file) file.close() + # Ensure we actually dumped data, and not just headers + file = open(file_path, 'r', encoding='utf-8') + file.readline() + data = file.readline() + file.close() + if not data: + os.remove(file_path) + return None return file_path -@register('events_table', '1.1', format='csv', description=_('Automation task records')) -def events_table(since, full_path): +@register('events_table', '1.1', format='csv', description=_('Automation task records'), expensive=True) +def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, main_jobevent.created, main_jobevent.uuid, @@ -259,13 +267,14 @@ def events_table(since, full_path): main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'deprecations' AS deprecations FROM main_jobevent - WHERE main_jobevent.created > {} - ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + WHERE (main_jobevent.created > {} AND main_jobevent.created <= {}) + ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER + '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) return _copy_table(table='events', query=events_query, path=full_path) -@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run')) -def unified_jobs_table(since, full_path): +@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run'), expensive=True) +def unified_jobs_table(since, full_path, until, **kwargs): unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, django_content_type.model, @@ -293,14 +302,16 @@ def unified_jobs_table(since, full_path): LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id - WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) + WHERE ((main_unifiedjob.created > {0} AND main_unifiedjob.created <= {1}) + OR (main_unifiedjob.finished > {0} AND main_unifiedjob.finished <= {1})) AND main_unifiedjob.launch_type != 'sync' - ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER + '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) @register('unified_job_template_table', '1.0', format='csv', description=_('Data on job templates')) -def unified_job_template_table(since, full_path): +def unified_job_template_table(since, full_path, **kwargs): unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, main_unifiedjobtemplate.polymorphic_ctype_id, django_content_type.model, @@ -322,8 +333,8 @@ def unified_job_template_table(since, full_path): return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) -@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs')) -def workflow_job_node_table(since, full_path): +@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True) +def workflow_job_node_table(since, full_path, until, **kwargs): workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, main_workflowjobnode.modified, @@ -352,13 +363,14 @@ def workflow_job_node_table(since, full_path): FROM main_workflowjobnode_always_nodes GROUP BY from_workflowjobnode_id ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id - WHERE main_workflowjobnode.modified > {} - ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + WHERE (main_workflowjobnode.modified > {} AND main_workflowjobnode.modified <= {}) + ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER + '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) @register('workflow_job_template_node_table', '1.0', format='csv', description=_('Data on workflows')) -def workflow_job_template_node_table(since, full_path): +def workflow_job_template_node_table(since, full_path, **kwargs): workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, main_workflowjobtemplatenode.created, main_workflowjobtemplatenode.modified, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 310f77dc16..5be5d57abf 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -17,14 +17,11 @@ from awx.main.access import access_registry from awx.main.models.ha import TowerAnalyticsState from awx.main.utils import get_awx_http_client_headers, set_environ - __all__ = ['register', 'gather', 'ship'] logger = logging.getLogger('awx.main.analytics') -manifest = dict() - def _valid_license(): try: @@ -51,7 +48,18 @@ def all_collectors(): return collector_dict -def register(key, version, description=None, format='json'): +def expensive_collectors(): + from awx.main.analytics import collectors + + ret = [] + module = collectors + for name, func in inspect.getmembers(module): + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__: + ret.append(func.__awx_analytics_key__) + return ret + + +def register(key, version, description=None, format='json', expensive=False): """ A decorator used to register a function as a metric collector. @@ -69,12 +77,13 @@ def register(key, version, description=None, format='json'): f.__awx_analytics_version__ = version f.__awx_analytics_description__ = description f.__awx_analytics_type__ = format + f.__awx_expensive__ = expensive return f return decorate -def gather(dest=None, module=None, subset = None, collection_type='scheduled'): +def gather(dest=None, module=None, subset = None, since = None, until = now(), collection_type='scheduled'): """ Gather all defined metrics and write them as JSON files in a .tgz @@ -91,6 +100,9 @@ def gather(dest=None, module=None, subset = None, collection_type='scheduled'): max_interval = now() - timedelta(weeks=4) if last_run < max_interval or not last_run: last_run = max_interval + if since: + last_run = since + logger.debug("Gathering overriden to start at: {}".format(since)) if _valid_license() is False: logger.exception("Invalid License provided, or No License Provided") @@ -112,30 +124,49 @@ def gather(dest=None, module=None, subset = None, collection_type='scheduled'): ): collector_list.append((name, func)) + manifest = dict() dest = dest or tempfile.mkdtemp(prefix='awx_analytics') for name, func in collector_list: if func.__awx_analytics_type__ == 'json': key = func.__awx_analytics_key__ - manifest['{}.json'.format(key)] = func.__awx_analytics_version__ path = '{}.json'.format(os.path.join(dest, key)) with open(path, 'w', encoding='utf-8') as f: try: - if func.__name__ == 'query_info': - json.dump(func(last_run, collection_type=collection_type), f) - else: - json.dump(func(last_run), f) + json.dump(func(last_run, collection_type=collection_type, until=until), f) + manifest['{}.json'.format(key)] = func.__awx_analytics_version__ except Exception: logger.exception("Could not generate metric {}.json".format(key)) f.close() os.remove(f.name) elif func.__awx_analytics_type__ == 'csv': key = func.__awx_analytics_key__ - manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ try: - func(last_run, full_path=dest) + if func(last_run, full_path=dest, until=until): + manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ except Exception: logger.exception("Could not generate metric {}.csv".format(key)) + if not manifest: + # No data was collected + logger.warning("No data from {} to {}".format(last_run, until)) + shutil.rmtree(dest) + return None + + # Always include config.json if we're using our collectors + if 'config.json' not in manifest.keys() and not module: + from awx.main.analytics import collectors + path = '{}.json'.format(os.path.join(dest, key)) + with open(path, 'w', encoding='utf-8') as f: + try: + json.dump(collectors.config(last_run), f) + manifest['config.json'] = collectors.config.__awx_analytics_version__ + except Exception: + logger.exception("Could not generate metric {}.json".format(key)) + f.close() + os.remove(f.name) + shutil.rmtree(dest) + return None + path = os.path.join(dest, 'manifest.json') with open(path, 'w', encoding='utf-8') as f: try: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 3e1acc2996..17f6dd27c4 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -53,6 +53,7 @@ import ansible_runner from awx import __version__ as awx_application_version from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.access import access_registry +from awx.main.analytics import all_collectors, expensive_collectors from awx.main.redact import UriCleaner from awx.main.models import ( Schedule, TowerScheduleState, Instance, InstanceGroup, @@ -355,6 +356,22 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): + def _gather_and_ship(subset, since, until): + try: + tgz = analytics.gather(subset=subset, since=since, until=until) + # empty analytics without raising an exception is not an error + if not tgz: + return True + logger.info('gathered analytics: {}'.format(tgz)) + analytics.ship(tgz) + except Exception: + logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until)) + return False + finally: + if os.path.exists(tgz): + os.remove(tgz) + return True + from awx.conf.models import Setting from rest_framework.fields import DateTimeField if not settings.INSIGHTS_TRACKING_STATE: @@ -374,16 +391,28 @@ def gather_analytics(): logger.debug('Not gathering analytics, another task holds lock') return subset = list(all_collectors().keys()) - try: - tgz = analytics.gather(subset=subset) - if not tgz: - return - logger.info('gathered analytics: {}'.format(tgz)) - analytics.ship(tgz) - settings.AUTOMATION_ANALYTICS_LAST_GATHER = gather_time - finally: - if os.path.exists(tgz): - os.remove(tgz) + incremental_collectors = [] + for collector in expensive_collectors(): + if collector in subset: + subset.remove(collector) + incremental_collectors.append(collector) + + # Cap gathering at 4 weeks of data if there has been no data gathering + since = last_time or (gather_time - timedelta(weeks=4)) + + if incremental_collectors: + start = since + until = None + while start < gather_time: + until = start + timedelta(minutes=20) + if (until > gather_time): + until = gather_time + if not _gather_and_ship(incremental_collectors, since=start, until=until): + break + start = until + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until + if subset: + _gather_and_ship(subset, since=since, until=gather_time) @task(queue=get_local_queuename) diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index fad64c7b86..7f8e7d0a81 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -28,6 +28,7 @@ def sqlite_copy_expert(request): def write_stdout(self, sql, fd): # Would be cool if we instead properly disected the SQL query and verified # it that way. But instead, we just take the nieve approach here. + sql = sql.strip() assert sql.startswith("COPY (") assert sql.endswith(") TO STDOUT WITH CSV HEADER") @@ -86,7 +87,7 @@ def test_copy_tables_unified_job_query( job_name = job_template.create_unified_job().name with tempfile.TemporaryDirectory() as tmpdir: - collectors.unified_jobs_table(time_start, tmpdir, subset="unified_jobs") + collectors.unified_jobs_table(time_start, tmpdir, until = now() + timedelta(seconds=1)) with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: lines = "".join([line for line in f]) @@ -134,7 +135,7 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): time_start = now() - timedelta(hours=9) with tempfile.TemporaryDirectory() as tmpdir: - collectors.workflow_job_node_table(time_start, tmpdir, subset="workflow_job_node_query") + collectors.workflow_job_node_table(time_start, tmpdir, until = now() + timedelta(seconds=1)) with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: reader = csv.reader(f) # Pop the headers diff --git a/awx/main/tests/functional/analytics/test_core.py b/awx/main/tests/functional/analytics/test_core.py index 01f3858661..190eeac6b3 100644 --- a/awx/main/tests/functional/analytics/test_core.py +++ b/awx/main/tests/functional/analytics/test_core.py @@ -10,17 +10,17 @@ from awx.main.analytics import gather, register @register('example', '1.0') -def example(since): +def example(since, **kwargs): return {'awx': 123} @register('bad_json', '1.0') -def bad_json(since): +def bad_json(since, **kwargs): return set() @register('throws_error', '1.0') -def throws_error(since): +def throws_error(since, **kwargs): raise ValueError()