From 1c4b06fe1ed4b5c6003f0d74bd7640a0aad7fc86 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Wed, 9 Sep 2020 17:10:14 -0400 Subject: [PATCH 01/10] Refactor analytics collectors. - Only have one registration class - Add description fields - Add automation collector information to /api/v2/config --- awx/api/views/root.py | 2 + awx/main/analytics/__init__.py | 2 +- awx/main/analytics/collectors.py | 76 ++++++++++--------- awx/main/analytics/core.py | 66 ++++++++++------ awx/main/tasks.py | 3 +- .../functional/analytics/test_collectors.py | 4 +- 6 files changed, 87 insertions(+), 66 deletions(-) diff --git a/awx/api/views/root.py b/awx/api/views/root.py index 4a15936e9b..aeda19cdeb 100644 --- a/awx/api/views/root.py +++ b/awx/api/views/root.py @@ -21,6 +21,7 @@ import requests from awx.api.generics import APIView from awx.conf.registry import settings_registry +from awx.main.analytics import all_collectors from awx.main.ha import is_ha_environment from awx.main.utils import ( get_awx_version, @@ -252,6 +253,7 @@ class ApiV2ConfigView(APIView): ansible_version=get_ansible_version(), eula=render_to_string("eula.md") if license_data.get('license_type', 'UNLICENSED') != 'open' else '', analytics_status=pendo_state, + analytics_collectors=all_collectors(), become_methods=PRIVILEGE_ESCALATION_METHODS, ) diff --git a/awx/main/analytics/__init__.py b/awx/main/analytics/__init__.py index 9ab526f29b..4302dc9cf2 100644 --- a/awx/main/analytics/__init__.py +++ b/awx/main/analytics/__init__.py @@ -1 +1 @@ -from .core import register, gather, ship, table_version # noqa +from .core import all_collectors, register, gather, ship # noqa diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index ba27905092..01f7f37cdb 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -6,13 +6,14 @@ from django.db import connection from django.db.models import Count from django.conf import settings from django.utils.timezone import now +from django.utils.translation import ugettext_lazy as _ from awx.conf.license import get_license from awx.main.utils import (get_awx_version, get_ansible_version, get_custom_venv_choices, camelcase_to_underscore) from awx.main import models from django.contrib.sessions.models import Session -from awx.main.analytics import register, table_version +from awx.main.analytics import register ''' This module is used to define metrics collected by awx.main.analytics.gather() @@ -31,7 +32,7 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' -@register('config', '1.1') +@register('config', '1.1', description=_('General platform configuration')) def config(since): license_info = get_license(show_key=False) install_type = 'traditional' @@ -63,7 +64,7 @@ def config(since): } -@register('counts', '1.0') +@register('counts', '1.0', description=_('Counts of objects such as organizations, inventories, and projects')) def counts(since): counts = {} for cls in (models.Organization, models.Team, models.User, @@ -98,7 +99,7 @@ def counts(since): return counts -@register('org_counts', '1.0') +@register('org_counts', '1.0', description=_('Counts of users and teams by organization')) def org_counts(since): counts = {} for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True), @@ -110,7 +111,7 @@ def org_counts(since): return counts -@register('cred_type_counts', '1.0') +@register('cred_type_counts', '1.0', description=_('Counts of credentials by credential type')) def cred_type_counts(since): counts = {} for cred_type in models.CredentialType.objects.annotate(num_credentials=Count( @@ -122,7 +123,7 @@ def cred_type_counts(since): return counts -@register('inventory_counts', '1.2') +@register('inventory_counts', '1.2', description=_('Inventories, their inventory sources, and host counts')) def inventory_counts(since): counts = {} for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True), @@ -147,7 +148,7 @@ def inventory_counts(since): return counts -@register('projects_by_scm_type', '1.0') +@register('projects_by_scm_type', '1.0', description=_('Counts of projects by source control type')) def projects_by_scm_type(since): counts = dict( (t[0] or 'manual', 0) @@ -166,7 +167,7 @@ def _get_isolated_datetime(last_check): return last_check -@register('instance_info', '1.0') +@register('instance_info', '1.0', description=_('Cluster topology and capacity')) def instance_info(since, include_hostnames=False): info = {} instances = models.Instance.objects.values_list('hostname').values( @@ -192,7 +193,7 @@ def instance_info(since, include_hostnames=False): return info -@register('job_counts', '1.0') +@register('job_counts', '1.0', description=_('Counts of jobs by status')) def job_counts(since): counts = {} counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count() @@ -202,7 +203,7 @@ def job_counts(since): return counts -@register('job_instance_counts', '1.0') +@register('job_instance_counts', '1.0', description=_('Counts of jobs by execution node')) def job_instance_counts(since): counts = {} job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list( @@ -217,7 +218,7 @@ def job_instance_counts(since): return counts -@register('query_info', '1.0') +@register('query_info', '1.0', description=_('Metadata about the analytics collected')) def query_info(since, collection_type): query_info = {} query_info['last_run'] = str(since) @@ -226,21 +227,17 @@ def query_info(since, collection_type): return query_info -# Copies Job Events from db to a .csv to be shipped -@table_version('events_table.csv', '1.1') -@table_version('unified_jobs_table.csv', '1.1') -@table_version('unified_job_template_table.csv', '1.0') -@table_version('workflow_job_node_table.csv', '1.0') -@table_version('workflow_job_template_node_table.csv', '1.0') -def copy_tables(since, full_path, subset=None): - def _copy_table(table, query, path): - file_path = os.path.join(path, table + '_table.csv') - file = open(file_path, 'w', encoding='utf-8') - with connection.cursor() as cursor: - cursor.copy_expert(query, file) - file.close() - return file_path +def _copy_table(table, query, path): + file_path = os.path.join(path, table + '_table.csv') + file = open(file_path, 'w', encoding='utf-8') + with connection.cursor() as cursor: + cursor.copy_expert(query, file) + file.close() + return file_path + +@register('events_table', '1.1', format='csv', description=_('Automation task records')) +def events_table(since, full_path): events_query = '''COPY (SELECT main_jobevent.id, main_jobevent.created, main_jobevent.uuid, @@ -264,9 +261,11 @@ def copy_tables(since, full_path, subset=None): FROM main_jobevent WHERE main_jobevent.created > {} ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - if not subset or 'events' in subset: - _copy_table(table='events', query=events_query, path=full_path) + return _copy_table(table='events', query=events_query, path=full_path) + +@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run')) +def unified_jobs_table(since, full_path): unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, django_content_type.model, @@ -297,9 +296,11 @@ def copy_tables(since, full_path, subset=None): WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) AND main_unifiedjob.launch_type != 'sync' ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - if not subset or 'unified_jobs' in subset: - _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) + return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) + +@register('unified_job_template_table', '1.0', format='csv', description=_('Data on job templates')) +def unified_job_template_table(since, full_path): unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, main_unifiedjobtemplate.polymorphic_ctype_id, django_content_type.model, @@ -318,9 +319,11 @@ def copy_tables(since, full_path, subset=None): FROM main_unifiedjobtemplate, django_content_type WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' - if not subset or 'unified_job_template' in subset: - _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) + return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) + +@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs')) +def workflow_job_node_table(since, full_path): workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, main_workflowjobnode.modified, @@ -351,9 +354,11 @@ def copy_tables(since, full_path, subset=None): ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id WHERE main_workflowjobnode.modified > {} ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - if not subset or 'workflow_job_node' in subset: - _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) + return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) + +@register('workflow_job_template_node_table', '1.0', format='csv', description=_('Data on workflows')) +def workflow_job_template_node_table(since, full_path): workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, main_workflowjobtemplatenode.created, main_workflowjobtemplatenode.modified, @@ -381,7 +386,4 @@ def copy_tables(since, full_path, subset=None): GROUP BY from_workflowjobtemplatenode_id ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER''' - if not subset or 'workflow_job_template_node' in subset: - _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) - - return + return _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index bab62b4a3c..310f77dc16 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -18,7 +18,7 @@ from awx.main.models.ha import TowerAnalyticsState from awx.main.utils import get_awx_http_client_headers, set_environ -__all__ = ['register', 'gather', 'ship', 'table_version'] +__all__ = ['register', 'gather', 'ship'] logger = logging.getLogger('awx.main.analytics') @@ -37,11 +37,27 @@ def _valid_license(): return True -def register(key, version): +def all_collectors(): + from awx.main.analytics import collectors + + collector_dict = {} + module = collectors + for name, func in inspect.getmembers(module): + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): + key = func.__awx_analytics_key__ + desc = func.__awx_analytics_description__ or '' + version = func.__awx_analytics_version__ + collector_dict[key] = { 'name': key, 'version': version, 'description': desc} + return collector_dict + + +def register(key, version, description=None, format='json'): """ A decorator used to register a function as a metric collector. - Decorated functions should return JSON-serializable objects. + Decorated functions should do the following based on format: + - json: return JSON-serializable objects. + - csv: write CSV data to a filename named 'key' @register('projects_by_scm_type', 1) def projects_by_scm_type(): @@ -51,28 +67,19 @@ def register(key, version): def decorate(f): f.__awx_analytics_key__ = key f.__awx_analytics_version__ = version + f.__awx_analytics_description__ = description + f.__awx_analytics_type__ = format return f return decorate -def table_version(file_name, version): - - global manifest - manifest[file_name] = version - - def decorate(f): - return f - - return decorate - - -def gather(dest=None, module=None, collection_type='scheduled'): +def gather(dest=None, module=None, subset = None, collection_type='scheduled'): """ Gather all defined metrics and write them as JSON files in a .tgz :param dest: the (optional) absolute path to write a compressed tarball - :pararm module: the module to search for registered analytic collector + :param module: the module to search for registered analytic collector functions; defaults to awx.main.analytics.collectors """ @@ -93,14 +100,21 @@ def gather(dest=None, module=None, collection_type='scheduled'): logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") return + collector_list = [] if module is None: from awx.main.analytics import collectors module = collectors - + for name, func in inspect.getmembers(module): + if ( + inspect.isfunction(func) and + hasattr(func, '__awx_analytics_key__') and + (not subset or name in subset) + ): + collector_list.append((name, func)) dest = dest or tempfile.mkdtemp(prefix='awx_analytics') - for name, func in inspect.getmembers(module): - if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): + for name, func in collector_list: + if func.__awx_analytics_type__ == 'json': key = func.__awx_analytics_key__ manifest['{}.json'.format(key)] = func.__awx_analytics_version__ path = '{}.json'.format(os.path.join(dest, key)) @@ -114,7 +128,14 @@ def gather(dest=None, module=None, collection_type='scheduled'): logger.exception("Could not generate metric {}.json".format(key)) f.close() os.remove(f.name) - + elif func.__awx_analytics_type__ == 'csv': + key = func.__awx_analytics_key__ + manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ + try: + func(last_run, full_path=dest) + except Exception: + logger.exception("Could not generate metric {}.csv".format(key)) + path = os.path.join(dest, 'manifest.json') with open(path, 'w', encoding='utf-8') as f: try: @@ -124,11 +145,6 @@ def gather(dest=None, module=None, collection_type='scheduled'): f.close() os.remove(f.name) - try: - collectors.copy_tables(since=last_run, full_path=dest) - except Exception: - logger.exception("Could not copy tables") - # can't use isoformat() since it has colons, which GNU tar doesn't like tarname = '_'.join([ settings.SYSTEM_UUID, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 59f7eb50cb..3e1acc2996 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -373,8 +373,9 @@ def gather_analytics(): if acquired is False: logger.debug('Not gathering analytics, another task holds lock') return + subset = list(all_collectors().keys()) try: - tgz = analytics.gather() + tgz = analytics.gather(subset=subset) if not tgz: return logger.info('gathered analytics: {}'.format(tgz)) diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index ff53ac6bb4..fad64c7b86 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -86,7 +86,7 @@ def test_copy_tables_unified_job_query( job_name = job_template.create_unified_job().name with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir, subset="unified_jobs") + collectors.unified_jobs_table(time_start, tmpdir, subset="unified_jobs") with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: lines = "".join([line for line in f]) @@ -134,7 +134,7 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): time_start = now() - timedelta(hours=9) with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query") + collectors.workflow_job_node_table(time_start, tmpdir, subset="workflow_job_node_query") with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: reader = csv.reader(f) # Pop the headers From 40309e6f704051ea8f4648284f4088865ed58a5b Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Wed, 9 Sep 2020 17:10:51 -0400 Subject: [PATCH 02/10] Ensure we do not send large bundles, or empty bundles Collect expensive collectors separately, and in a loop where we make smaller intermediate dumps. Don't return a table dump if there are no records, and don't put that CSV in the manifest. Fix up unit tests. --- awx/main/analytics/__init__.py | 2 +- awx/main/analytics/collectors.py | 62 +++++++++++-------- awx/main/analytics/core.py | 55 ++++++++++++---- awx/main/tasks.py | 49 ++++++++++++--- .../functional/analytics/test_collectors.py | 5 +- .../tests/functional/analytics/test_core.py | 6 +- 6 files changed, 126 insertions(+), 53 deletions(-) diff --git a/awx/main/analytics/__init__.py b/awx/main/analytics/__init__.py index 4302dc9cf2..6aee1cef91 100644 --- a/awx/main/analytics/__init__.py +++ b/awx/main/analytics/__init__.py @@ -1 +1 @@ -from .core import all_collectors, register, gather, ship # noqa +from .core import all_collectors, expensive_collectors, register, gather, ship # noqa diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 01f7f37cdb..45afdc7152 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -32,8 +32,8 @@ data _since_ the last report date - i.e., new data in the last 24 hours) ''' -@register('config', '1.1', description=_('General platform configuration')) -def config(since): +@register('config', '1.1', description=_('General platform configuration.')) +def config(since, **kwargs): license_info = get_license(show_key=False) install_type = 'traditional' if os.environ.get('container') == 'oci': @@ -65,7 +65,7 @@ def config(since): @register('counts', '1.0', description=_('Counts of objects such as organizations, inventories, and projects')) -def counts(since): +def counts(since, **kwargs): counts = {} for cls in (models.Organization, models.Team, models.User, models.Inventory, models.Credential, models.Project, @@ -100,7 +100,7 @@ def counts(since): @register('org_counts', '1.0', description=_('Counts of users and teams by organization')) -def org_counts(since): +def org_counts(since, **kwargs): counts = {} for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True), num_teams=Count('teams', distinct=True)).values('name', 'id', 'num_users', 'num_teams'): @@ -112,7 +112,7 @@ def org_counts(since): @register('cred_type_counts', '1.0', description=_('Counts of credentials by credential type')) -def cred_type_counts(since): +def cred_type_counts(since, **kwargs): counts = {} for cred_type in models.CredentialType.objects.annotate(num_credentials=Count( 'credentials', distinct=True)).values('name', 'id', 'managed_by_tower', 'num_credentials'): @@ -124,7 +124,7 @@ def cred_type_counts(since): @register('inventory_counts', '1.2', description=_('Inventories, their inventory sources, and host counts')) -def inventory_counts(since): +def inventory_counts(since, **kwargs): counts = {} for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True), num_hosts=Count('hosts', distinct=True)).only('id', 'name', 'kind'): @@ -149,7 +149,7 @@ def inventory_counts(since): @register('projects_by_scm_type', '1.0', description=_('Counts of projects by source control type')) -def projects_by_scm_type(since): +def projects_by_scm_type(since, **kwargs): counts = dict( (t[0] or 'manual', 0) for t in models.Project.SCM_TYPE_CHOICES @@ -168,7 +168,7 @@ def _get_isolated_datetime(last_check): @register('instance_info', '1.0', description=_('Cluster topology and capacity')) -def instance_info(since, include_hostnames=False): +def instance_info(since, include_hostnames=False, **kwargs): info = {} instances = models.Instance.objects.values_list('hostname').values( 'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'last_isolated_check', 'enabled') @@ -194,7 +194,7 @@ def instance_info(since, include_hostnames=False): @register('job_counts', '1.0', description=_('Counts of jobs by status')) -def job_counts(since): +def job_counts(since, **kwargs): counts = {} counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count() counts['status'] = dict(models.UnifiedJob.objects.exclude(launch_type='sync').values_list('status').annotate(Count('status')).order_by()) @@ -204,7 +204,7 @@ def job_counts(since): @register('job_instance_counts', '1.0', description=_('Counts of jobs by execution node')) -def job_instance_counts(since): +def job_instance_counts(since, **kwargs): counts = {} job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list( 'execution_node', 'launch_type').annotate(job_launch_type=Count('launch_type')).order_by() @@ -219,7 +219,7 @@ def job_instance_counts(since): @register('query_info', '1.0', description=_('Metadata about the analytics collected')) -def query_info(since, collection_type): +def query_info(since, collection_type, **kwargs): query_info = {} query_info['last_run'] = str(since) query_info['current_time'] = str(now()) @@ -233,11 +233,19 @@ def _copy_table(table, query, path): with connection.cursor() as cursor: cursor.copy_expert(query, file) file.close() + # Ensure we actually dumped data, and not just headers + file = open(file_path, 'r', encoding='utf-8') + file.readline() + data = file.readline() + file.close() + if not data: + os.remove(file_path) + return None return file_path -@register('events_table', '1.1', format='csv', description=_('Automation task records')) -def events_table(since, full_path): +@register('events_table', '1.1', format='csv', description=_('Automation task records'), expensive=True) +def events_table(since, full_path, until, **kwargs): events_query = '''COPY (SELECT main_jobevent.id, main_jobevent.created, main_jobevent.uuid, @@ -259,13 +267,14 @@ def events_table(since, full_path): main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'deprecations' AS deprecations FROM main_jobevent - WHERE main_jobevent.created > {} - ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + WHERE (main_jobevent.created > {} AND main_jobevent.created <= {}) + ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER + '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) return _copy_table(table='events', query=events_query, path=full_path) -@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run')) -def unified_jobs_table(since, full_path): +@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run'), expensive=True) +def unified_jobs_table(since, full_path, until, **kwargs): unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, django_content_type.model, @@ -293,14 +302,16 @@ def unified_jobs_table(since, full_path): LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id - WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) + WHERE ((main_unifiedjob.created > {0} AND main_unifiedjob.created <= {1}) + OR (main_unifiedjob.finished > {0} AND main_unifiedjob.finished <= {1})) AND main_unifiedjob.launch_type != 'sync' - ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER + '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) @register('unified_job_template_table', '1.0', format='csv', description=_('Data on job templates')) -def unified_job_template_table(since, full_path): +def unified_job_template_table(since, full_path, **kwargs): unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, main_unifiedjobtemplate.polymorphic_ctype_id, django_content_type.model, @@ -322,8 +333,8 @@ def unified_job_template_table(since, full_path): return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) -@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs')) -def workflow_job_node_table(since, full_path): +@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True) +def workflow_job_node_table(since, full_path, until, **kwargs): workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, main_workflowjobnode.modified, @@ -352,13 +363,14 @@ def workflow_job_node_table(since, full_path): FROM main_workflowjobnode_always_nodes GROUP BY from_workflowjobnode_id ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id - WHERE main_workflowjobnode.modified > {} - ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + WHERE (main_workflowjobnode.modified > {} AND main_workflowjobnode.modified <= {}) + ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER + '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) @register('workflow_job_template_node_table', '1.0', format='csv', description=_('Data on workflows')) -def workflow_job_template_node_table(since, full_path): +def workflow_job_template_node_table(since, full_path, **kwargs): workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, main_workflowjobtemplatenode.created, main_workflowjobtemplatenode.modified, diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 310f77dc16..5be5d57abf 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -17,14 +17,11 @@ from awx.main.access import access_registry from awx.main.models.ha import TowerAnalyticsState from awx.main.utils import get_awx_http_client_headers, set_environ - __all__ = ['register', 'gather', 'ship'] logger = logging.getLogger('awx.main.analytics') -manifest = dict() - def _valid_license(): try: @@ -51,7 +48,18 @@ def all_collectors(): return collector_dict -def register(key, version, description=None, format='json'): +def expensive_collectors(): + from awx.main.analytics import collectors + + ret = [] + module = collectors + for name, func in inspect.getmembers(module): + if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__: + ret.append(func.__awx_analytics_key__) + return ret + + +def register(key, version, description=None, format='json', expensive=False): """ A decorator used to register a function as a metric collector. @@ -69,12 +77,13 @@ def register(key, version, description=None, format='json'): f.__awx_analytics_version__ = version f.__awx_analytics_description__ = description f.__awx_analytics_type__ = format + f.__awx_expensive__ = expensive return f return decorate -def gather(dest=None, module=None, subset = None, collection_type='scheduled'): +def gather(dest=None, module=None, subset = None, since = None, until = now(), collection_type='scheduled'): """ Gather all defined metrics and write them as JSON files in a .tgz @@ -91,6 +100,9 @@ def gather(dest=None, module=None, subset = None, collection_type='scheduled'): max_interval = now() - timedelta(weeks=4) if last_run < max_interval or not last_run: last_run = max_interval + if since: + last_run = since + logger.debug("Gathering overriden to start at: {}".format(since)) if _valid_license() is False: logger.exception("Invalid License provided, or No License Provided") @@ -112,30 +124,49 @@ def gather(dest=None, module=None, subset = None, collection_type='scheduled'): ): collector_list.append((name, func)) + manifest = dict() dest = dest or tempfile.mkdtemp(prefix='awx_analytics') for name, func in collector_list: if func.__awx_analytics_type__ == 'json': key = func.__awx_analytics_key__ - manifest['{}.json'.format(key)] = func.__awx_analytics_version__ path = '{}.json'.format(os.path.join(dest, key)) with open(path, 'w', encoding='utf-8') as f: try: - if func.__name__ == 'query_info': - json.dump(func(last_run, collection_type=collection_type), f) - else: - json.dump(func(last_run), f) + json.dump(func(last_run, collection_type=collection_type, until=until), f) + manifest['{}.json'.format(key)] = func.__awx_analytics_version__ except Exception: logger.exception("Could not generate metric {}.json".format(key)) f.close() os.remove(f.name) elif func.__awx_analytics_type__ == 'csv': key = func.__awx_analytics_key__ - manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ try: - func(last_run, full_path=dest) + if func(last_run, full_path=dest, until=until): + manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ except Exception: logger.exception("Could not generate metric {}.csv".format(key)) + if not manifest: + # No data was collected + logger.warning("No data from {} to {}".format(last_run, until)) + shutil.rmtree(dest) + return None + + # Always include config.json if we're using our collectors + if 'config.json' not in manifest.keys() and not module: + from awx.main.analytics import collectors + path = '{}.json'.format(os.path.join(dest, key)) + with open(path, 'w', encoding='utf-8') as f: + try: + json.dump(collectors.config(last_run), f) + manifest['config.json'] = collectors.config.__awx_analytics_version__ + except Exception: + logger.exception("Could not generate metric {}.json".format(key)) + f.close() + os.remove(f.name) + shutil.rmtree(dest) + return None + path = os.path.join(dest, 'manifest.json') with open(path, 'w', encoding='utf-8') as f: try: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 3e1acc2996..17f6dd27c4 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -53,6 +53,7 @@ import ansible_runner from awx import __version__ as awx_application_version from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.access import access_registry +from awx.main.analytics import all_collectors, expensive_collectors from awx.main.redact import UriCleaner from awx.main.models import ( Schedule, TowerScheduleState, Instance, InstanceGroup, @@ -355,6 +356,22 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): + def _gather_and_ship(subset, since, until): + try: + tgz = analytics.gather(subset=subset, since=since, until=until) + # empty analytics without raising an exception is not an error + if not tgz: + return True + logger.info('gathered analytics: {}'.format(tgz)) + analytics.ship(tgz) + except Exception: + logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until)) + return False + finally: + if os.path.exists(tgz): + os.remove(tgz) + return True + from awx.conf.models import Setting from rest_framework.fields import DateTimeField if not settings.INSIGHTS_TRACKING_STATE: @@ -374,16 +391,28 @@ def gather_analytics(): logger.debug('Not gathering analytics, another task holds lock') return subset = list(all_collectors().keys()) - try: - tgz = analytics.gather(subset=subset) - if not tgz: - return - logger.info('gathered analytics: {}'.format(tgz)) - analytics.ship(tgz) - settings.AUTOMATION_ANALYTICS_LAST_GATHER = gather_time - finally: - if os.path.exists(tgz): - os.remove(tgz) + incremental_collectors = [] + for collector in expensive_collectors(): + if collector in subset: + subset.remove(collector) + incremental_collectors.append(collector) + + # Cap gathering at 4 weeks of data if there has been no data gathering + since = last_time or (gather_time - timedelta(weeks=4)) + + if incremental_collectors: + start = since + until = None + while start < gather_time: + until = start + timedelta(minutes=20) + if (until > gather_time): + until = gather_time + if not _gather_and_ship(incremental_collectors, since=start, until=until): + break + start = until + settings.AUTOMATION_ANALYTICS_LAST_GATHER = until + if subset: + _gather_and_ship(subset, since=since, until=gather_time) @task(queue=get_local_queuename) diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index fad64c7b86..7f8e7d0a81 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -28,6 +28,7 @@ def sqlite_copy_expert(request): def write_stdout(self, sql, fd): # Would be cool if we instead properly disected the SQL query and verified # it that way. But instead, we just take the nieve approach here. + sql = sql.strip() assert sql.startswith("COPY (") assert sql.endswith(") TO STDOUT WITH CSV HEADER") @@ -86,7 +87,7 @@ def test_copy_tables_unified_job_query( job_name = job_template.create_unified_job().name with tempfile.TemporaryDirectory() as tmpdir: - collectors.unified_jobs_table(time_start, tmpdir, subset="unified_jobs") + collectors.unified_jobs_table(time_start, tmpdir, until = now() + timedelta(seconds=1)) with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: lines = "".join([line for line in f]) @@ -134,7 +135,7 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): time_start = now() - timedelta(hours=9) with tempfile.TemporaryDirectory() as tmpdir: - collectors.workflow_job_node_table(time_start, tmpdir, subset="workflow_job_node_query") + collectors.workflow_job_node_table(time_start, tmpdir, until = now() + timedelta(seconds=1)) with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: reader = csv.reader(f) # Pop the headers diff --git a/awx/main/tests/functional/analytics/test_core.py b/awx/main/tests/functional/analytics/test_core.py index 01f3858661..190eeac6b3 100644 --- a/awx/main/tests/functional/analytics/test_core.py +++ b/awx/main/tests/functional/analytics/test_core.py @@ -10,17 +10,17 @@ from awx.main.analytics import gather, register @register('example', '1.0') -def example(since): +def example(since, **kwargs): return {'awx': 123} @register('bad_json', '1.0') -def bad_json(since): +def bad_json(since, **kwargs): return set() @register('throws_error', '1.0') -def throws_error(since): +def throws_error(since, **kwargs): raise ValueError() From 1a15f18be3c07e4ddb6a90273a52427bdf27edbd Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Fri, 24 Jul 2020 10:39:08 -0400 Subject: [PATCH 03/10] Stop using the TowerAnalyticsState solo model This is now tracked in the AUTOMATION_ANALYTICS_LAST_GATHER setting. --- awx/main/analytics/core.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 5be5d57abf..3e60c449e4 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -14,7 +14,6 @@ from rest_framework.exceptions import PermissionDenied from awx.conf.license import get_license from awx.main.models import Job from awx.main.access import access_registry -from awx.main.models.ha import TowerAnalyticsState from awx.main.utils import get_awx_http_client_headers, set_environ __all__ = ['register', 'gather', 'ship'] @@ -92,18 +91,9 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c functions; defaults to awx.main.analytics.collectors """ - run_now = now() - state = TowerAnalyticsState.get_solo() - last_run = state.last_run - logger.debug("Last analytics run was: {}".format(last_run)) + last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4)) + logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) - max_interval = now() - timedelta(weeks=4) - if last_run < max_interval or not last_run: - last_run = max_interval - if since: - last_run = since - logger.debug("Gathering overriden to start at: {}".format(since)) - if _valid_license() is False: logger.exception("Invalid License provided, or No License Provided") return "Error: Invalid License provided, or No License Provided" @@ -179,7 +169,7 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c # can't use isoformat() since it has colons, which GNU tar doesn't like tarname = '_'.join([ settings.SYSTEM_UUID, - run_now.strftime('%Y-%m-%d-%H%M%S%z') + until.strftime('%Y-%m-%d-%H%M%S%z') ]) try: tgz = shutil.make_archive( @@ -231,10 +221,6 @@ def ship(path): if response.status_code >= 300: return logger.exception('Upload failed with status {}, {}'.format(response.status_code, response.text)) - run_now = now() - state = TowerAnalyticsState.get_solo() - state.last_run = run_now - state.save() finally: # cleanup tar.gz os.remove(path) From 9f67b6742c8fa8b0b1198c45a257f5bd41e82a94 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Fri, 24 Jul 2020 14:01:48 -0400 Subject: [PATCH 04/10] Fail more gracefully if analytics.ship() is called with a bad path, or it's deleted out from under us. --- awx/main/analytics/core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 3e60c449e4..98fa8369d8 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -191,6 +191,9 @@ def ship(path): if not path: logger.error('Automation Analytics TAR not found') return + if not os.path.exists(path): + logger.error('Automation Analytics TAR {} not found'.format(path)) + return if "Error:" in str(path): return try: @@ -223,4 +226,5 @@ def ship(path): response.text)) finally: # cleanup tar.gz - os.remove(path) + if os.path.exists(path): + os.remove(path) From c753324872abc24363df10641eec9fa93fa0e024 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Tue, 28 Jul 2020 16:32:32 -0400 Subject: [PATCH 05/10] Move back to less frequent collections, and split large event tables This should ensure we stay under 100MB at all times. --- awx/main/analytics/collectors.py | 60 +++++++++--- awx/main/analytics/core.py | 95 +++++++++++++------ .../management/commands/gather_analytics.py | 31 +++++- awx/main/tasks.py | 18 ++-- .../tests/functional/analytics/test_core.py | 7 +- 5 files changed, 158 insertions(+), 53 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 45afdc7152..fe823764b8 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -1,3 +1,4 @@ +import io import os import os.path import platform @@ -227,21 +228,58 @@ def query_info(since, collection_type, **kwargs): return query_info +''' +The event table can be *very* large, and we have a 100MB upload limit. + +Split large table dumps at dump time into a series of files. +''' +MAX_TABLE_SIZE = 200 * 1048576 + + +class FileSplitter(io.StringIO): + def __init__(self, filespec=None, *args, **kwargs): + self.filespec = filespec + self.files = [] + self.currentfile = None + self.header = None + self.counter = 0 + self.cycle_file() + + def cycle_file(self): + if self.currentfile: + self.currentfile.close() + self.counter = 0 + fname = '{}_split{}'.format(self.filespec, len(self.files)) + self.currentfile = open(fname, 'w', encoding='utf-8') + self.files.append(fname) + if self.header: + self.currentfile.write('{}\n'.format(self.header)) + + def file_list(self): + self.currentfile.close() + # Check for an empty dump + if len(self.header) + 1 == self.counter: + os.remove(self.files[-1]) + self.files = self.files[:-1] + # If we only have one file, remove the suffix + if len(self.files) == 1: + os.rename(self.files[0],self.files[0].replace('_split0','')) + return self.files + + def write(self, s): + if not self.header: + self.header = s[0:s.index('\n')] + self.counter += self.currentfile.write(s) + if self.counter >= MAX_TABLE_SIZE: + self.cycle_file() + + def _copy_table(table, query, path): file_path = os.path.join(path, table + '_table.csv') - file = open(file_path, 'w', encoding='utf-8') + file = FileSplitter(filespec=file_path) with connection.cursor() as cursor: cursor.copy_expert(query, file) - file.close() - # Ensure we actually dumped data, and not just headers - file = open(file_path, 'r', encoding='utf-8') - file.readline() - data = file.readline() - file.close() - if not data: - os.remove(file_path) - return None - return file_path + return file.file_list() @register('events_table', '1.1', format='csv', description=_('Automation task records'), expensive=True) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 98fa8369d8..7179d89969 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -90,6 +90,15 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c :param module: the module to search for registered analytic collector functions; defaults to awx.main.analytics.collectors """ + def _write_manifest(destdir, manifest): + path = os.path.join(destdir, 'manifest.json') + with open(path, 'w', encoding='utf-8') as f: + try: + json.dump(manifest, f) + except Exception: + f.close() + os.remove(f.name) + logger.exception("Could not generate manifest.json") last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4)) logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) @@ -103,10 +112,12 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c return collector_list = [] - if module is None: + if module: + collector_module = module + else: from awx.main.analytics import collectors - module = collectors - for name, func in inspect.getmembers(module): + collector_module = collectors + for name, func in inspect.getmembers(collector_module): if ( inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and @@ -116,10 +127,13 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c manifest = dict() dest = dest or tempfile.mkdtemp(prefix='awx_analytics') + gather_dir = os.path.join(dest, 'stage') + os.mkdir(gather_dir, 0o700) + num_splits = 1 for name, func in collector_list: if func.__awx_analytics_type__ == 'json': key = func.__awx_analytics_key__ - path = '{}.json'.format(os.path.join(dest, key)) + path = '{}.json'.format(os.path.join(gather_dir, key)) with open(path, 'w', encoding='utf-8') as f: try: json.dump(func(last_run, collection_type=collection_type, until=until), f) @@ -131,8 +145,11 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c elif func.__awx_analytics_type__ == 'csv': key = func.__awx_analytics_key__ try: - if func(last_run, full_path=dest, until=until): + files = func(last_run, full_path=gather_dir, until=until) + if files: manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ + if len(files) > num_splits: + num_splits = len(files) except Exception: logger.exception("Could not generate metric {}.csv".format(key)) @@ -145,11 +162,12 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c # Always include config.json if we're using our collectors if 'config.json' not in manifest.keys() and not module: from awx.main.analytics import collectors - path = '{}.json'.format(os.path.join(dest, key)) + config = collectors.config + path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__)) with open(path, 'w', encoding='utf-8') as f: try: json.dump(collectors.config(last_run), f) - manifest['config.json'] = collectors.config.__awx_analytics_version__ + manifest['config.json'] = config.__awx_analytics_version__ except Exception: logger.exception("Could not generate metric {}.json".format(key)) f.close() @@ -157,31 +175,52 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c shutil.rmtree(dest) return None - path = os.path.join(dest, 'manifest.json') - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(manifest, f) - except Exception: - logger.exception("Could not generate manifest.json") - f.close() - os.remove(f.name) + stage_dirs = [gather_dir] + if num_splits > 1: + for i in range(0, num_splits): + split_path = os.path.join(dest, 'split{}'.format(i)) + os.mkdir(split_path, 0o700) + filtered_manifest = {} + shutil.copy(os.path.join(gather_dir, 'config.json'), split_path) + filtered_manifest['config.json'] = manifest['config.json'] + suffix = '_split{}'.format(i) + for file in os.listdir(gather_dir): + if file.endswith(suffix): + old_file = os.path.join(gather_dir, file) + new_filename = file.replace(suffix, '') + new_file = os.path.join(split_path, new_filename) + shutil.move(old_file, new_file) + filtered_manifest[new_filename] = manifest[new_filename] + _write_manifest(split_path, filtered_manifest) + stage_dirs.append(split_path) - # can't use isoformat() since it has colons, which GNU tar doesn't like - tarname = '_'.join([ - settings.SYSTEM_UUID, - until.strftime('%Y-%m-%d-%H%M%S%z') - ]) + for item in list(manifest.keys()): + if not os.path.exists(os.path.join(gather_dir, item)): + manifest.pop(item) + _write_manifest(gather_dir, manifest) + + tarfiles = [] try: - tgz = shutil.make_archive( - os.path.join(os.path.dirname(dest), tarname), - 'gztar', - dest - ) - return tgz + for i in range(0, len(stage_dirs)): + stage_dir = stage_dirs[i] + # can't use isoformat() since it has colons, which GNU tar doesn't like + tarname = '_'.join([ + settings.SYSTEM_UUID, + until.strftime('%Y-%m-%d-%H%M%S%z'), + str(i) + ]) + tgz = shutil.make_archive( + os.path.join(os.path.dirname(dest), tarname), + 'gztar', + stage_dir + ) + tarfiles.append(tgz) except Exception: + shutil.rmtree(stage_dir, ignore_errors = True) logger.exception("Failed to write analytics archive file") - finally: - shutil.rmtree(dest) + finally: + shutil.rmtree(dest, ignore_errors = True) + return tarfiles def ship(path): diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index aa096d6f28..6bfa3b1674 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -1,6 +1,9 @@ import logging + from awx.main.analytics import gather, ship +from dateutil import parser from django.core.management.base import BaseCommand +from django.utils.timezone import now class Command(BaseCommand): @@ -15,6 +18,10 @@ class Command(BaseCommand): help='Gather analytics without shipping. Works even if analytics are disabled in settings.') parser.add_argument('--ship', dest='ship', action='store_true', help='Enable to ship metrics to the Red Hat Cloud') + parser.add_argument('--since', dest='since', action='store', + help='Start date for collection') + parser.add_argument('--until', dest='until', action='store', + help='End date for collection') def init_logging(self): self.logger = logging.getLogger('awx.main.analytics') @@ -28,11 +35,27 @@ class Command(BaseCommand): self.init_logging() opt_ship = options.get('ship') opt_dry_run = options.get('dry-run') + opt_since = options.get('since') or None + opt_until = options.get('until') or None + + if opt_since: + since = parser.parse(opt_since) + else: + since = None + if opt_until: + until = parser.parse(opt_until) + else: + until = now() + if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgz = gather(collection_type='manual' if not opt_dry_run else 'dry-run') - if tgz: - self.logger.debug(tgz) + tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since = since, until = until) + if tgzfiles: + self.logger.debug(tgzfiles) + else: + self.logger.error('No analytics collected') if opt_ship: - ship(tgz) + if tgzfiles: + for tgz in tgzfiles: + ship(tgz) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 17f6dd27c4..80013000a6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -357,19 +357,23 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): def _gather_and_ship(subset, since, until): + tgzfiles = [] try: - tgz = analytics.gather(subset=subset, since=since, until=until) + tgzfiles = analytics.gather(subset=subset, since=since, until=until) # empty analytics without raising an exception is not an error - if not tgz: + if not tgzfiles: return True - logger.info('gathered analytics: {}'.format(tgz)) - analytics.ship(tgz) + logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles)) + for tgz in tgzfiles: + analytics.ship(tgz) except Exception: logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until)) return False finally: - if os.path.exists(tgz): - os.remove(tgz) + if tgzfiles: + for tgz in tgzfiles: + if os.path.exists(tgz): + os.remove(tgz) return True from awx.conf.models import Setting @@ -404,7 +408,7 @@ def gather_analytics(): start = since until = None while start < gather_time: - until = start + timedelta(minutes=20) + until = start + timedelta(hours = 4) if (until > gather_time): until = gather_time if not _gather_and_ship(incremental_collectors, since=start, until=until): diff --git a/awx/main/tests/functional/analytics/test_core.py b/awx/main/tests/functional/analytics/test_core.py index 190eeac6b3..f3cc1fcd4b 100644 --- a/awx/main/tests/functional/analytics/test_core.py +++ b/awx/main/tests/functional/analytics/test_core.py @@ -39,9 +39,9 @@ def mock_valid_license(): def test_gather(mock_valid_license): settings.INSIGHTS_TRACKING_STATE = True - tgz = gather(module=importlib.import_module(__name__)) + tgzfiles = gather(module=importlib.import_module(__name__)) files = {} - with tarfile.open(tgz, "r:gz") as archive: + with tarfile.open(tgzfiles[0], "r:gz") as archive: for member in archive.getmembers(): files[member.name] = archive.extractfile(member) @@ -53,7 +53,8 @@ def test_gather(mock_valid_license): assert './bad_json.json' not in files.keys() assert './throws_error.json' not in files.keys() try: - os.remove(tgz) + for tgz in tgzfiles: + os.remove(tgz) except Exception: pass From d4ba62695ff6827ff6ebd2815943f33412bb9949 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Fri, 31 Jul 2020 12:20:02 -0400 Subject: [PATCH 06/10] Put awx analytics logs also in the task system logger Errors/warnings when gathering analytics are about 50/50 split between the gathering code in analytics and the task code that calls it, so they should be in the same place for debugging sanity. --- awx/settings/defaults.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 5725bb7f46..1e72bb551f 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1001,7 +1001,7 @@ LOGGING = { 'level': 'INFO', # very verbose debug-level logs }, 'awx.analytics': { - 'handlers': ['external_logger'], + 'handlers': ['task_system', 'external_logger'], 'level': 'INFO', 'propagate': False }, From 09f7d7042819f689fda03179cb0ae846320eae0b Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Tue, 4 Aug 2020 23:39:44 -0400 Subject: [PATCH 07/10] Use isoformat() rather than strftime Reformat SQL in unit tests because sqlite. --- awx/main/analytics/collectors.py | 14 +++++++------- .../tests/functional/analytics/test_collectors.py | 7 ++++++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index fe823764b8..9b15ff0627 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -305,9 +305,9 @@ def events_table(since, full_path, until, **kwargs): main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'deprecations' AS deprecations FROM main_jobevent - WHERE (main_jobevent.created > {} AND main_jobevent.created <= {}) + WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}') ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER - '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) + '''.format(since.isoformat(),until.isoformat()) return _copy_table(table='events', query=events_query, path=full_path) @@ -340,11 +340,11 @@ def unified_jobs_table(since, full_path, until, **kwargs): LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id - WHERE ((main_unifiedjob.created > {0} AND main_unifiedjob.created <= {1}) - OR (main_unifiedjob.finished > {0} AND main_unifiedjob.finished <= {1})) + WHERE ((main_unifiedjob.created > '{0}' AND main_unifiedjob.created <= '{1}') + OR (main_unifiedjob.finished > '{0}' AND main_unifiedjob.finished <= '{1}')) AND main_unifiedjob.launch_type != 'sync' ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER - '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) + '''.format(since.isoformat(),until.isoformat()) return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) @@ -401,9 +401,9 @@ def workflow_job_node_table(since, full_path, until, **kwargs): FROM main_workflowjobnode_always_nodes GROUP BY from_workflowjobnode_id ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id - WHERE (main_workflowjobnode.modified > {} AND main_workflowjobnode.modified <= {}) + WHERE (main_workflowjobnode.modified > '{}' AND main_workflowjobnode.modified <= '{}') ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER - '''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'")) + '''.format(since.isoformat(),until.isoformat()) return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index 7f8e7d0a81..1d643588d1 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -1,6 +1,7 @@ import pytest import tempfile import os +import re import shutil import csv @@ -27,7 +28,7 @@ def sqlite_copy_expert(request): def write_stdout(self, sql, fd): # Would be cool if we instead properly disected the SQL query and verified - # it that way. But instead, we just take the nieve approach here. + # it that way. But instead, we just take the naive approach here. sql = sql.strip() assert sql.startswith("COPY (") assert sql.endswith(") TO STDOUT WITH CSV HEADER") @@ -36,6 +37,10 @@ def sqlite_copy_expert(request): sql = sql.replace(") TO STDOUT WITH CSV HEADER", "") # sqlite equivalent sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT") + # SQLite doesn't support isoformatted dates, because that would be useful + sql = sql.replace("+00:00", "") + i = re.compile(r'(?P\d\d\d\d-\d\d-\d\d)T') + sql = i.sub(r'\g ', sql) # Remove JSON style queries # TODO: could replace JSON style queries with sqlite kind of equivalents From a604ecffb8b67dc8196fb4b49200a25afd19a983 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Thu, 6 Aug 2020 22:52:34 -0400 Subject: [PATCH 08/10] Adjust query_info to set the collection time based on what's passed. --- awx/main/analytics/collectors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 9b15ff0627..e1dc468d51 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -220,10 +220,10 @@ def job_instance_counts(since, **kwargs): @register('query_info', '1.0', description=_('Metadata about the analytics collected')) -def query_info(since, collection_type, **kwargs): +def query_info(since, collection_type, until, **kwargs): query_info = {} query_info['last_run'] = str(since) - query_info['current_time'] = str(now()) + query_info['current_time'] = str(until) query_info['collection_type'] = collection_type return query_info From 05ad85e7a6d14e68549528a4ae1ca6daaf9e3a1d Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Thu, 6 Aug 2020 23:04:09 -0400 Subject: [PATCH 09/10] Remove the model for the now unused TowerAnalyticsState. --- .../0121_delete_toweranalyticsstate.py | 16 ++++++++++++++++ awx/main/models/ha.py | 6 +----- 2 files changed, 17 insertions(+), 5 deletions(-) create mode 100644 awx/main/migrations/0121_delete_toweranalyticsstate.py diff --git a/awx/main/migrations/0121_delete_toweranalyticsstate.py b/awx/main/migrations/0121_delete_toweranalyticsstate.py new file mode 100644 index 0000000000..d1e1ceb37c --- /dev/null +++ b/awx/main/migrations/0121_delete_toweranalyticsstate.py @@ -0,0 +1,16 @@ +# Generated by Django 2.2.11 on 2020-07-24 17:41 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0120_galaxy_credentials'), + ] + + operations = [ + migrations.DeleteModel( + name='TowerAnalyticsState', + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 4b6d8926e9..fc4e9c022e 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -24,7 +24,7 @@ from awx.main.models.unified_jobs import UnifiedJob from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity from awx.main.models.mixins import RelatedJobsMixin -__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState', 'TowerAnalyticsState') +__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState') class HasPolicyEditsMixin(HasEditsMixin): @@ -296,10 +296,6 @@ class TowerScheduleState(SingletonModel): schedule_last_run = models.DateTimeField(auto_now_add=True) -class TowerAnalyticsState(SingletonModel): - last_run = models.DateTimeField(auto_now_add=True) - - def schedule_policy_task(): from awx.main.tasks import apply_cluster_membership_policies connection.on_commit(lambda: apply_cluster_membership_policies.apply_async()) From 13802fcf2bfa137aa642ee9b5eb2d187b420b269 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Thu, 10 Sep 2020 21:19:07 -0400 Subject: [PATCH 10/10] Don't return error messages for license errors Just log the exception and return None. --- awx/main/analytics/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 7179d89969..fe48fb30bf 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -105,11 +105,11 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c if _valid_license() is False: logger.exception("Invalid License provided, or No License Provided") - return "Error: Invalid License provided, or No License Provided" + return None if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE: logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") - return + return None collector_list = [] if module: