Merge pull request #7709 from wenottingham/so-many-del-toros

Adjust analytics gathering

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2020-09-11 21:41:05 +00:00
committed by GitHub
11 changed files with 365 additions and 159 deletions

View File

@@ -21,6 +21,7 @@ import requests
from awx.api.generics import APIView from awx.api.generics import APIView
from awx.conf.registry import settings_registry from awx.conf.registry import settings_registry
from awx.main.analytics import all_collectors
from awx.main.ha import is_ha_environment from awx.main.ha import is_ha_environment
from awx.main.utils import ( from awx.main.utils import (
get_awx_version, get_awx_version,
@@ -252,6 +253,7 @@ class ApiV2ConfigView(APIView):
ansible_version=get_ansible_version(), ansible_version=get_ansible_version(),
eula=render_to_string("eula.md") if license_data.get('license_type', 'UNLICENSED') != 'open' else '', eula=render_to_string("eula.md") if license_data.get('license_type', 'UNLICENSED') != 'open' else '',
analytics_status=pendo_state, analytics_status=pendo_state,
analytics_collectors=all_collectors(),
become_methods=PRIVILEGE_ESCALATION_METHODS, become_methods=PRIVILEGE_ESCALATION_METHODS,
) )

View File

@@ -1 +1 @@
from .core import register, gather, ship, table_version # noqa from .core import all_collectors, expensive_collectors, register, gather, ship # noqa

View File

@@ -1,3 +1,4 @@
import io
import os import os
import os.path import os.path
import platform import platform
@@ -6,13 +7,14 @@ from django.db import connection
from django.db.models import Count from django.db.models import Count
from django.conf import settings from django.conf import settings
from django.utils.timezone import now from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from awx.conf.license import get_license from awx.conf.license import get_license
from awx.main.utils import (get_awx_version, get_ansible_version, from awx.main.utils import (get_awx_version, get_ansible_version,
get_custom_venv_choices, camelcase_to_underscore) get_custom_venv_choices, camelcase_to_underscore)
from awx.main import models from awx.main import models
from django.contrib.sessions.models import Session from django.contrib.sessions.models import Session
from awx.main.analytics import register, table_version from awx.main.analytics import register
''' '''
This module is used to define metrics collected by awx.main.analytics.gather() This module is used to define metrics collected by awx.main.analytics.gather()
@@ -31,8 +33,8 @@ data _since_ the last report date - i.e., new data in the last 24 hours)
''' '''
@register('config', '1.1') @register('config', '1.1', description=_('General platform configuration.'))
def config(since): def config(since, **kwargs):
license_info = get_license(show_key=False) license_info = get_license(show_key=False)
install_type = 'traditional' install_type = 'traditional'
if os.environ.get('container') == 'oci': if os.environ.get('container') == 'oci':
@@ -63,8 +65,8 @@ def config(since):
} }
@register('counts', '1.0') @register('counts', '1.0', description=_('Counts of objects such as organizations, inventories, and projects'))
def counts(since): def counts(since, **kwargs):
counts = {} counts = {}
for cls in (models.Organization, models.Team, models.User, for cls in (models.Organization, models.Team, models.User,
models.Inventory, models.Credential, models.Project, models.Inventory, models.Credential, models.Project,
@@ -98,8 +100,8 @@ def counts(since):
return counts return counts
@register('org_counts', '1.0') @register('org_counts', '1.0', description=_('Counts of users and teams by organization'))
def org_counts(since): def org_counts(since, **kwargs):
counts = {} counts = {}
for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True), for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True),
num_teams=Count('teams', distinct=True)).values('name', 'id', 'num_users', 'num_teams'): num_teams=Count('teams', distinct=True)).values('name', 'id', 'num_users', 'num_teams'):
@@ -110,8 +112,8 @@ def org_counts(since):
return counts return counts
@register('cred_type_counts', '1.0') @register('cred_type_counts', '1.0', description=_('Counts of credentials by credential type'))
def cred_type_counts(since): def cred_type_counts(since, **kwargs):
counts = {} counts = {}
for cred_type in models.CredentialType.objects.annotate(num_credentials=Count( for cred_type in models.CredentialType.objects.annotate(num_credentials=Count(
'credentials', distinct=True)).values('name', 'id', 'managed_by_tower', 'num_credentials'): 'credentials', distinct=True)).values('name', 'id', 'managed_by_tower', 'num_credentials'):
@@ -122,8 +124,8 @@ def cred_type_counts(since):
return counts return counts
@register('inventory_counts', '1.2') @register('inventory_counts', '1.2', description=_('Inventories, their inventory sources, and host counts'))
def inventory_counts(since): def inventory_counts(since, **kwargs):
counts = {} counts = {}
for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True), for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True),
num_hosts=Count('hosts', distinct=True)).only('id', 'name', 'kind'): num_hosts=Count('hosts', distinct=True)).only('id', 'name', 'kind'):
@@ -147,8 +149,8 @@ def inventory_counts(since):
return counts return counts
@register('projects_by_scm_type', '1.0') @register('projects_by_scm_type', '1.0', description=_('Counts of projects by source control type'))
def projects_by_scm_type(since): def projects_by_scm_type(since, **kwargs):
counts = dict( counts = dict(
(t[0] or 'manual', 0) (t[0] or 'manual', 0)
for t in models.Project.SCM_TYPE_CHOICES for t in models.Project.SCM_TYPE_CHOICES
@@ -166,8 +168,8 @@ def _get_isolated_datetime(last_check):
return last_check return last_check
@register('instance_info', '1.0') @register('instance_info', '1.0', description=_('Cluster topology and capacity'))
def instance_info(since, include_hostnames=False): def instance_info(since, include_hostnames=False, **kwargs):
info = {} info = {}
instances = models.Instance.objects.values_list('hostname').values( instances = models.Instance.objects.values_list('hostname').values(
'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'last_isolated_check', 'enabled') 'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'last_isolated_check', 'enabled')
@@ -192,8 +194,8 @@ def instance_info(since, include_hostnames=False):
return info return info
@register('job_counts', '1.0') @register('job_counts', '1.0', description=_('Counts of jobs by status'))
def job_counts(since): def job_counts(since, **kwargs):
counts = {} counts = {}
counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count() counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count()
counts['status'] = dict(models.UnifiedJob.objects.exclude(launch_type='sync').values_list('status').annotate(Count('status')).order_by()) counts['status'] = dict(models.UnifiedJob.objects.exclude(launch_type='sync').values_list('status').annotate(Count('status')).order_by())
@@ -202,8 +204,8 @@ def job_counts(since):
return counts return counts
@register('job_instance_counts', '1.0') @register('job_instance_counts', '1.0', description=_('Counts of jobs by execution node'))
def job_instance_counts(since): def job_instance_counts(since, **kwargs):
counts = {} counts = {}
job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list( job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list(
'execution_node', 'launch_type').annotate(job_launch_type=Count('launch_type')).order_by() 'execution_node', 'launch_type').annotate(job_launch_type=Count('launch_type')).order_by()
@@ -217,30 +219,71 @@ def job_instance_counts(since):
return counts return counts
@register('query_info', '1.0') @register('query_info', '1.0', description=_('Metadata about the analytics collected'))
def query_info(since, collection_type): def query_info(since, collection_type, until, **kwargs):
query_info = {} query_info = {}
query_info['last_run'] = str(since) query_info['last_run'] = str(since)
query_info['current_time'] = str(now()) query_info['current_time'] = str(until)
query_info['collection_type'] = collection_type query_info['collection_type'] = collection_type
return query_info return query_info
# Copies Job Events from db to a .csv to be shipped '''
@table_version('events_table.csv', '1.1') The event table can be *very* large, and we have a 100MB upload limit.
@table_version('unified_jobs_table.csv', '1.1')
@table_version('unified_job_template_table.csv', '1.0') Split large table dumps at dump time into a series of files.
@table_version('workflow_job_node_table.csv', '1.0') '''
@table_version('workflow_job_template_node_table.csv', '1.0') MAX_TABLE_SIZE = 200 * 1048576
def copy_tables(since, full_path, subset=None):
def _copy_table(table, query, path):
class FileSplitter(io.StringIO):
def __init__(self, filespec=None, *args, **kwargs):
self.filespec = filespec
self.files = []
self.currentfile = None
self.header = None
self.counter = 0
self.cycle_file()
def cycle_file(self):
if self.currentfile:
self.currentfile.close()
self.counter = 0
fname = '{}_split{}'.format(self.filespec, len(self.files))
self.currentfile = open(fname, 'w', encoding='utf-8')
self.files.append(fname)
if self.header:
self.currentfile.write('{}\n'.format(self.header))
def file_list(self):
self.currentfile.close()
# Check for an empty dump
if len(self.header) + 1 == self.counter:
os.remove(self.files[-1])
self.files = self.files[:-1]
# If we only have one file, remove the suffix
if len(self.files) == 1:
os.rename(self.files[0],self.files[0].replace('_split0',''))
return self.files
def write(self, s):
if not self.header:
self.header = s[0:s.index('\n')]
self.counter += self.currentfile.write(s)
if self.counter >= MAX_TABLE_SIZE:
self.cycle_file()
def _copy_table(table, query, path):
file_path = os.path.join(path, table + '_table.csv') file_path = os.path.join(path, table + '_table.csv')
file = open(file_path, 'w', encoding='utf-8') file = FileSplitter(filespec=file_path)
with connection.cursor() as cursor: with connection.cursor() as cursor:
cursor.copy_expert(query, file) cursor.copy_expert(query, file)
file.close() return file.file_list()
return file_path
@register('events_table', '1.1', format='csv', description=_('Automation task records'), expensive=True)
def events_table(since, full_path, until, **kwargs):
events_query = '''COPY (SELECT main_jobevent.id, events_query = '''COPY (SELECT main_jobevent.id,
main_jobevent.created, main_jobevent.created,
main_jobevent.uuid, main_jobevent.uuid,
@@ -262,11 +305,14 @@ def copy_tables(since, full_path, subset=None):
main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'warnings' AS warnings,
main_jobevent.event_data::json->'res'->'deprecations' AS deprecations main_jobevent.event_data::json->'res'->'deprecations' AS deprecations
FROM main_jobevent FROM main_jobevent
WHERE main_jobevent.created > {} WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}')
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER
if not subset or 'events' in subset: '''.format(since.isoformat(),until.isoformat())
_copy_table(table='events', query=events_query, path=full_path) return _copy_table(table='events', query=events_query, path=full_path)
@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run'), expensive=True)
def unified_jobs_table(since, full_path, until, **kwargs):
unified_job_query = '''COPY (SELECT main_unifiedjob.id, unified_job_query = '''COPY (SELECT main_unifiedjob.id,
main_unifiedjob.polymorphic_ctype_id, main_unifiedjob.polymorphic_ctype_id,
django_content_type.model, django_content_type.model,
@@ -294,12 +340,16 @@ def copy_tables(since, full_path, subset=None):
LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id
LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id
LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id
WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) WHERE ((main_unifiedjob.created > '{0}' AND main_unifiedjob.created <= '{1}')
OR (main_unifiedjob.finished > '{0}' AND main_unifiedjob.finished <= '{1}'))
AND main_unifiedjob.launch_type != 'sync' AND main_unifiedjob.launch_type != 'sync'
ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER
if not subset or 'unified_jobs' in subset: '''.format(since.isoformat(),until.isoformat())
_copy_table(table='unified_jobs', query=unified_job_query, path=full_path) return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path)
@register('unified_job_template_table', '1.0', format='csv', description=_('Data on job templates'))
def unified_job_template_table(since, full_path, **kwargs):
unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id,
main_unifiedjobtemplate.polymorphic_ctype_id, main_unifiedjobtemplate.polymorphic_ctype_id,
django_content_type.model, django_content_type.model,
@@ -318,9 +368,11 @@ def copy_tables(since, full_path, subset=None):
FROM main_unifiedjobtemplate, django_content_type FROM main_unifiedjobtemplate, django_content_type
WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id
ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER'''
if not subset or 'unified_job_template' in subset: return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
_copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True)
def workflow_job_node_table(since, full_path, until, **kwargs):
workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id,
main_workflowjobnode.created, main_workflowjobnode.created,
main_workflowjobnode.modified, main_workflowjobnode.modified,
@@ -349,11 +401,14 @@ def copy_tables(since, full_path, subset=None):
FROM main_workflowjobnode_always_nodes FROM main_workflowjobnode_always_nodes
GROUP BY from_workflowjobnode_id GROUP BY from_workflowjobnode_id
) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id
WHERE main_workflowjobnode.modified > {} WHERE (main_workflowjobnode.modified > '{}' AND main_workflowjobnode.modified <= '{}')
ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER
if not subset or 'workflow_job_node' in subset: '''.format(since.isoformat(),until.isoformat())
_copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path)
@register('workflow_job_template_node_table', '1.0', format='csv', description=_('Data on workflows'))
def workflow_job_template_node_table(since, full_path, **kwargs):
workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id,
main_workflowjobtemplatenode.created, main_workflowjobtemplatenode.created,
main_workflowjobtemplatenode.modified, main_workflowjobtemplatenode.modified,
@@ -381,7 +436,4 @@ def copy_tables(since, full_path, subset=None):
GROUP BY from_workflowjobtemplatenode_id GROUP BY from_workflowjobtemplatenode_id
) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id
ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER''' ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER'''
if not subset or 'workflow_job_template_node' in subset: return _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path)
_copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path)
return

View File

@@ -14,17 +14,13 @@ from rest_framework.exceptions import PermissionDenied
from awx.conf.license import get_license from awx.conf.license import get_license
from awx.main.models import Job from awx.main.models import Job
from awx.main.access import access_registry from awx.main.access import access_registry
from awx.main.models.ha import TowerAnalyticsState
from awx.main.utils import get_awx_http_client_headers, set_environ from awx.main.utils import get_awx_http_client_headers, set_environ
__all__ = ['register', 'gather', 'ship']
__all__ = ['register', 'gather', 'ship', 'table_version']
logger = logging.getLogger('awx.main.analytics') logger = logging.getLogger('awx.main.analytics')
manifest = dict()
def _valid_license(): def _valid_license():
try: try:
@@ -37,11 +33,38 @@ def _valid_license():
return True return True
def register(key, version): def all_collectors():
from awx.main.analytics import collectors
collector_dict = {}
module = collectors
for name, func in inspect.getmembers(module):
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'):
key = func.__awx_analytics_key__
desc = func.__awx_analytics_description__ or ''
version = func.__awx_analytics_version__
collector_dict[key] = { 'name': key, 'version': version, 'description': desc}
return collector_dict
def expensive_collectors():
from awx.main.analytics import collectors
ret = []
module = collectors
for name, func in inspect.getmembers(module):
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__:
ret.append(func.__awx_analytics_key__)
return ret
def register(key, version, description=None, format='json', expensive=False):
""" """
A decorator used to register a function as a metric collector. A decorator used to register a function as a metric collector.
Decorated functions should return JSON-serializable objects. Decorated functions should do the following based on format:
- json: return JSON-serializable objects.
- csv: write CSV data to a filename named 'key'
@register('projects_by_scm_type', 1) @register('projects_by_scm_type', 1)
def projects_by_scm_type(): def projects_by_scm_type():
@@ -51,100 +74,153 @@ def register(key, version):
def decorate(f): def decorate(f):
f.__awx_analytics_key__ = key f.__awx_analytics_key__ = key
f.__awx_analytics_version__ = version f.__awx_analytics_version__ = version
f.__awx_analytics_description__ = description
f.__awx_analytics_type__ = format
f.__awx_expensive__ = expensive
return f return f
return decorate return decorate
def table_version(file_name, version): def gather(dest=None, module=None, subset = None, since = None, until = now(), collection_type='scheduled'):
global manifest
manifest[file_name] = version
def decorate(f):
return f
return decorate
def gather(dest=None, module=None, collection_type='scheduled'):
""" """
Gather all defined metrics and write them as JSON files in a .tgz Gather all defined metrics and write them as JSON files in a .tgz
:param dest: the (optional) absolute path to write a compressed tarball :param dest: the (optional) absolute path to write a compressed tarball
:pararm module: the module to search for registered analytic collector :param module: the module to search for registered analytic collector
functions; defaults to awx.main.analytics.collectors functions; defaults to awx.main.analytics.collectors
""" """
def _write_manifest(destdir, manifest):
run_now = now() path = os.path.join(destdir, 'manifest.json')
state = TowerAnalyticsState.get_solo()
last_run = state.last_run
logger.debug("Last analytics run was: {}".format(last_run))
max_interval = now() - timedelta(weeks=4)
if last_run < max_interval or not last_run:
last_run = max_interval
if _valid_license() is False:
logger.exception("Invalid License provided, or No License Provided")
return "Error: Invalid License provided, or No License Provided"
if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE:
logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
return
if module is None:
from awx.main.analytics import collectors
module = collectors
dest = dest or tempfile.mkdtemp(prefix='awx_analytics')
for name, func in inspect.getmembers(module):
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'):
key = func.__awx_analytics_key__
manifest['{}.json'.format(key)] = func.__awx_analytics_version__
path = '{}.json'.format(os.path.join(dest, key))
with open(path, 'w', encoding='utf-8') as f:
try:
if func.__name__ == 'query_info':
json.dump(func(last_run, collection_type=collection_type), f)
else:
json.dump(func(last_run), f)
except Exception:
logger.exception("Could not generate metric {}.json".format(key))
f.close()
os.remove(f.name)
path = os.path.join(dest, 'manifest.json')
with open(path, 'w', encoding='utf-8') as f: with open(path, 'w', encoding='utf-8') as f:
try: try:
json.dump(manifest, f) json.dump(manifest, f)
except Exception: except Exception:
logger.exception("Could not generate manifest.json")
f.close() f.close()
os.remove(f.name) os.remove(f.name)
logger.exception("Could not generate manifest.json")
last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4))
logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER))
if _valid_license() is False:
logger.exception("Invalid License provided, or No License Provided")
return None
if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE:
logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
return None
collector_list = []
if module:
collector_module = module
else:
from awx.main.analytics import collectors
collector_module = collectors
for name, func in inspect.getmembers(collector_module):
if (
inspect.isfunction(func) and
hasattr(func, '__awx_analytics_key__') and
(not subset or name in subset)
):
collector_list.append((name, func))
manifest = dict()
dest = dest or tempfile.mkdtemp(prefix='awx_analytics')
gather_dir = os.path.join(dest, 'stage')
os.mkdir(gather_dir, 0o700)
num_splits = 1
for name, func in collector_list:
if func.__awx_analytics_type__ == 'json':
key = func.__awx_analytics_key__
path = '{}.json'.format(os.path.join(gather_dir, key))
with open(path, 'w', encoding='utf-8') as f:
try: try:
collectors.copy_tables(since=last_run, full_path=dest) json.dump(func(last_run, collection_type=collection_type, until=until), f)
manifest['{}.json'.format(key)] = func.__awx_analytics_version__
except Exception: except Exception:
logger.exception("Could not copy tables") logger.exception("Could not generate metric {}.json".format(key))
f.close()
os.remove(f.name)
elif func.__awx_analytics_type__ == 'csv':
key = func.__awx_analytics_key__
try:
files = func(last_run, full_path=gather_dir, until=until)
if files:
manifest['{}.csv'.format(key)] = func.__awx_analytics_version__
if len(files) > num_splits:
num_splits = len(files)
except Exception:
logger.exception("Could not generate metric {}.csv".format(key))
if not manifest:
# No data was collected
logger.warning("No data from {} to {}".format(last_run, until))
shutil.rmtree(dest)
return None
# Always include config.json if we're using our collectors
if 'config.json' not in manifest.keys() and not module:
from awx.main.analytics import collectors
config = collectors.config
path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__))
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(collectors.config(last_run), f)
manifest['config.json'] = config.__awx_analytics_version__
except Exception:
logger.exception("Could not generate metric {}.json".format(key))
f.close()
os.remove(f.name)
shutil.rmtree(dest)
return None
stage_dirs = [gather_dir]
if num_splits > 1:
for i in range(0, num_splits):
split_path = os.path.join(dest, 'split{}'.format(i))
os.mkdir(split_path, 0o700)
filtered_manifest = {}
shutil.copy(os.path.join(gather_dir, 'config.json'), split_path)
filtered_manifest['config.json'] = manifest['config.json']
suffix = '_split{}'.format(i)
for file in os.listdir(gather_dir):
if file.endswith(suffix):
old_file = os.path.join(gather_dir, file)
new_filename = file.replace(suffix, '')
new_file = os.path.join(split_path, new_filename)
shutil.move(old_file, new_file)
filtered_manifest[new_filename] = manifest[new_filename]
_write_manifest(split_path, filtered_manifest)
stage_dirs.append(split_path)
for item in list(manifest.keys()):
if not os.path.exists(os.path.join(gather_dir, item)):
manifest.pop(item)
_write_manifest(gather_dir, manifest)
tarfiles = []
try:
for i in range(0, len(stage_dirs)):
stage_dir = stage_dirs[i]
# can't use isoformat() since it has colons, which GNU tar doesn't like # can't use isoformat() since it has colons, which GNU tar doesn't like
tarname = '_'.join([ tarname = '_'.join([
settings.SYSTEM_UUID, settings.SYSTEM_UUID,
run_now.strftime('%Y-%m-%d-%H%M%S%z') until.strftime('%Y-%m-%d-%H%M%S%z'),
str(i)
]) ])
try:
tgz = shutil.make_archive( tgz = shutil.make_archive(
os.path.join(os.path.dirname(dest), tarname), os.path.join(os.path.dirname(dest), tarname),
'gztar', 'gztar',
dest stage_dir
) )
return tgz tarfiles.append(tgz)
except Exception: except Exception:
shutil.rmtree(stage_dir, ignore_errors = True)
logger.exception("Failed to write analytics archive file") logger.exception("Failed to write analytics archive file")
finally: finally:
shutil.rmtree(dest) shutil.rmtree(dest, ignore_errors = True)
return tarfiles
def ship(path): def ship(path):
@@ -154,6 +230,9 @@ def ship(path):
if not path: if not path:
logger.error('Automation Analytics TAR not found') logger.error('Automation Analytics TAR not found')
return return
if not os.path.exists(path):
logger.error('Automation Analytics TAR {} not found'.format(path))
return
if "Error:" in str(path): if "Error:" in str(path):
return return
try: try:
@@ -184,10 +263,7 @@ def ship(path):
if response.status_code >= 300: if response.status_code >= 300:
return logger.exception('Upload failed with status {}, {}'.format(response.status_code, return logger.exception('Upload failed with status {}, {}'.format(response.status_code,
response.text)) response.text))
run_now = now()
state = TowerAnalyticsState.get_solo()
state.last_run = run_now
state.save()
finally: finally:
# cleanup tar.gz # cleanup tar.gz
if os.path.exists(path):
os.remove(path) os.remove(path)

View File

@@ -1,6 +1,9 @@
import logging import logging
from awx.main.analytics import gather, ship from awx.main.analytics import gather, ship
from dateutil import parser
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.utils.timezone import now
class Command(BaseCommand): class Command(BaseCommand):
@@ -15,6 +18,10 @@ class Command(BaseCommand):
help='Gather analytics without shipping. Works even if analytics are disabled in settings.') help='Gather analytics without shipping. Works even if analytics are disabled in settings.')
parser.add_argument('--ship', dest='ship', action='store_true', parser.add_argument('--ship', dest='ship', action='store_true',
help='Enable to ship metrics to the Red Hat Cloud') help='Enable to ship metrics to the Red Hat Cloud')
parser.add_argument('--since', dest='since', action='store',
help='Start date for collection')
parser.add_argument('--until', dest='until', action='store',
help='End date for collection')
def init_logging(self): def init_logging(self):
self.logger = logging.getLogger('awx.main.analytics') self.logger = logging.getLogger('awx.main.analytics')
@@ -28,11 +35,27 @@ class Command(BaseCommand):
self.init_logging() self.init_logging()
opt_ship = options.get('ship') opt_ship = options.get('ship')
opt_dry_run = options.get('dry-run') opt_dry_run = options.get('dry-run')
opt_since = options.get('since') or None
opt_until = options.get('until') or None
if opt_since:
since = parser.parse(opt_since)
else:
since = None
if opt_until:
until = parser.parse(opt_until)
else:
until = now()
if opt_ship and opt_dry_run: if opt_ship and opt_dry_run:
self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') self.logger.error('Both --ship and --dry-run cannot be processed at the same time.')
return return
tgz = gather(collection_type='manual' if not opt_dry_run else 'dry-run') tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since = since, until = until)
if tgz: if tgzfiles:
self.logger.debug(tgz) self.logger.debug(tgzfiles)
else:
self.logger.error('No analytics collected')
if opt_ship: if opt_ship:
if tgzfiles:
for tgz in tgzfiles:
ship(tgz) ship(tgz)

View File

@@ -0,0 +1,16 @@
# Generated by Django 2.2.11 on 2020-07-24 17:41
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0120_galaxy_credentials'),
]
operations = [
migrations.DeleteModel(
name='TowerAnalyticsState',
),
]

View File

@@ -24,7 +24,7 @@ from awx.main.models.unified_jobs import UnifiedJob
from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity
from awx.main.models.mixins import RelatedJobsMixin from awx.main.models.mixins import RelatedJobsMixin
__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState', 'TowerAnalyticsState') __all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState')
class HasPolicyEditsMixin(HasEditsMixin): class HasPolicyEditsMixin(HasEditsMixin):
@@ -296,10 +296,6 @@ class TowerScheduleState(SingletonModel):
schedule_last_run = models.DateTimeField(auto_now_add=True) schedule_last_run = models.DateTimeField(auto_now_add=True)
class TowerAnalyticsState(SingletonModel):
last_run = models.DateTimeField(auto_now_add=True)
def schedule_policy_task(): def schedule_policy_task():
from awx.main.tasks import apply_cluster_membership_policies from awx.main.tasks import apply_cluster_membership_policies
connection.on_commit(lambda: apply_cluster_membership_policies.apply_async()) connection.on_commit(lambda: apply_cluster_membership_policies.apply_async())

View File

@@ -53,6 +53,7 @@ import ansible_runner
from awx import __version__ as awx_application_version from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
from awx.main.access import access_registry from awx.main.access import access_registry
from awx.main.analytics import all_collectors, expensive_collectors
from awx.main.redact import UriCleaner from awx.main.redact import UriCleaner
from awx.main.models import ( from awx.main.models import (
Schedule, TowerScheduleState, Instance, InstanceGroup, Schedule, TowerScheduleState, Instance, InstanceGroup,
@@ -355,6 +356,26 @@ def send_notifications(notification_list, job_id=None):
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def gather_analytics(): def gather_analytics():
def _gather_and_ship(subset, since, until):
tgzfiles = []
try:
tgzfiles = analytics.gather(subset=subset, since=since, until=until)
# empty analytics without raising an exception is not an error
if not tgzfiles:
return True
logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles))
for tgz in tgzfiles:
analytics.ship(tgz)
except Exception:
logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until))
return False
finally:
if tgzfiles:
for tgz in tgzfiles:
if os.path.exists(tgz):
os.remove(tgz)
return True
from awx.conf.models import Setting from awx.conf.models import Setting
from rest_framework.fields import DateTimeField from rest_framework.fields import DateTimeField
if not settings.INSIGHTS_TRACKING_STATE: if not settings.INSIGHTS_TRACKING_STATE:
@@ -373,16 +394,29 @@ def gather_analytics():
if acquired is False: if acquired is False:
logger.debug('Not gathering analytics, another task holds lock') logger.debug('Not gathering analytics, another task holds lock')
return return
try: subset = list(all_collectors().keys())
tgz = analytics.gather() incremental_collectors = []
if not tgz: for collector in expensive_collectors():
return if collector in subset:
logger.info('gathered analytics: {}'.format(tgz)) subset.remove(collector)
analytics.ship(tgz) incremental_collectors.append(collector)
settings.AUTOMATION_ANALYTICS_LAST_GATHER = gather_time
finally: # Cap gathering at 4 weeks of data if there has been no data gathering
if os.path.exists(tgz): since = last_time or (gather_time - timedelta(weeks=4))
os.remove(tgz)
if incremental_collectors:
start = since
until = None
while start < gather_time:
until = start + timedelta(hours = 4)
if (until > gather_time):
until = gather_time
if not _gather_and_ship(incremental_collectors, since=start, until=until):
break
start = until
settings.AUTOMATION_ANALYTICS_LAST_GATHER = until
if subset:
_gather_and_ship(subset, since=since, until=gather_time)
@task(queue=get_local_queuename) @task(queue=get_local_queuename)

View File

@@ -1,6 +1,7 @@
import pytest import pytest
import tempfile import tempfile
import os import os
import re
import shutil import shutil
import csv import csv
@@ -27,7 +28,8 @@ def sqlite_copy_expert(request):
def write_stdout(self, sql, fd): def write_stdout(self, sql, fd):
# Would be cool if we instead properly disected the SQL query and verified # Would be cool if we instead properly disected the SQL query and verified
# it that way. But instead, we just take the nieve approach here. # it that way. But instead, we just take the naive approach here.
sql = sql.strip()
assert sql.startswith("COPY (") assert sql.startswith("COPY (")
assert sql.endswith(") TO STDOUT WITH CSV HEADER") assert sql.endswith(") TO STDOUT WITH CSV HEADER")
@@ -35,6 +37,10 @@ def sqlite_copy_expert(request):
sql = sql.replace(") TO STDOUT WITH CSV HEADER", "") sql = sql.replace(") TO STDOUT WITH CSV HEADER", "")
# sqlite equivalent # sqlite equivalent
sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT") sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT")
# SQLite doesn't support isoformatted dates, because that would be useful
sql = sql.replace("+00:00", "")
i = re.compile(r'(?P<date>\d\d\d\d-\d\d-\d\d)T')
sql = i.sub(r'\g<date> ', sql)
# Remove JSON style queries # Remove JSON style queries
# TODO: could replace JSON style queries with sqlite kind of equivalents # TODO: could replace JSON style queries with sqlite kind of equivalents
@@ -86,7 +92,7 @@ def test_copy_tables_unified_job_query(
job_name = job_template.create_unified_job().name job_name = job_template.create_unified_job().name
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset="unified_jobs") collectors.unified_jobs_table(time_start, tmpdir, until = now() + timedelta(seconds=1))
with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f:
lines = "".join([line for line in f]) lines = "".join([line for line in f])
@@ -134,7 +140,7 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
time_start = now() - timedelta(hours=9) time_start = now() - timedelta(hours=9)
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query") collectors.workflow_job_node_table(time_start, tmpdir, until = now() + timedelta(seconds=1))
with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f:
reader = csv.reader(f) reader = csv.reader(f)
# Pop the headers # Pop the headers

View File

@@ -10,17 +10,17 @@ from awx.main.analytics import gather, register
@register('example', '1.0') @register('example', '1.0')
def example(since): def example(since, **kwargs):
return {'awx': 123} return {'awx': 123}
@register('bad_json', '1.0') @register('bad_json', '1.0')
def bad_json(since): def bad_json(since, **kwargs):
return set() return set()
@register('throws_error', '1.0') @register('throws_error', '1.0')
def throws_error(since): def throws_error(since, **kwargs):
raise ValueError() raise ValueError()
@@ -39,9 +39,9 @@ def mock_valid_license():
def test_gather(mock_valid_license): def test_gather(mock_valid_license):
settings.INSIGHTS_TRACKING_STATE = True settings.INSIGHTS_TRACKING_STATE = True
tgz = gather(module=importlib.import_module(__name__)) tgzfiles = gather(module=importlib.import_module(__name__))
files = {} files = {}
with tarfile.open(tgz, "r:gz") as archive: with tarfile.open(tgzfiles[0], "r:gz") as archive:
for member in archive.getmembers(): for member in archive.getmembers():
files[member.name] = archive.extractfile(member) files[member.name] = archive.extractfile(member)
@@ -53,6 +53,7 @@ def test_gather(mock_valid_license):
assert './bad_json.json' not in files.keys() assert './bad_json.json' not in files.keys()
assert './throws_error.json' not in files.keys() assert './throws_error.json' not in files.keys()
try: try:
for tgz in tgzfiles:
os.remove(tgz) os.remove(tgz)
except Exception: except Exception:
pass pass

View File

@@ -1001,7 +1001,7 @@ LOGGING = {
'level': 'INFO', # very verbose debug-level logs 'level': 'INFO', # very verbose debug-level logs
}, },
'awx.analytics': { 'awx.analytics': {
'handlers': ['external_logger'], 'handlers': ['task_system', 'external_logger'],
'level': 'INFO', 'level': 'INFO',
'propagate': False 'propagate': False
}, },