Refactor analytics collectors.

- Only have one registration class
- Add description fields
- Add automation collector information to /api/v2/config
This commit is contained in:
Bill Nottingham
2020-09-09 17:10:14 -04:00
parent dff7667532
commit 1c4b06fe1e
6 changed files with 87 additions and 66 deletions

View File

@@ -21,6 +21,7 @@ import requests
from awx.api.generics import APIView from awx.api.generics import APIView
from awx.conf.registry import settings_registry from awx.conf.registry import settings_registry
from awx.main.analytics import all_collectors
from awx.main.ha import is_ha_environment from awx.main.ha import is_ha_environment
from awx.main.utils import ( from awx.main.utils import (
get_awx_version, get_awx_version,
@@ -252,6 +253,7 @@ class ApiV2ConfigView(APIView):
ansible_version=get_ansible_version(), ansible_version=get_ansible_version(),
eula=render_to_string("eula.md") if license_data.get('license_type', 'UNLICENSED') != 'open' else '', eula=render_to_string("eula.md") if license_data.get('license_type', 'UNLICENSED') != 'open' else '',
analytics_status=pendo_state, analytics_status=pendo_state,
analytics_collectors=all_collectors(),
become_methods=PRIVILEGE_ESCALATION_METHODS, become_methods=PRIVILEGE_ESCALATION_METHODS,
) )

View File

@@ -1 +1 @@
from .core import register, gather, ship, table_version # noqa from .core import all_collectors, register, gather, ship # noqa

View File

@@ -6,13 +6,14 @@ from django.db import connection
from django.db.models import Count from django.db.models import Count
from django.conf import settings from django.conf import settings
from django.utils.timezone import now from django.utils.timezone import now
from django.utils.translation import ugettext_lazy as _
from awx.conf.license import get_license from awx.conf.license import get_license
from awx.main.utils import (get_awx_version, get_ansible_version, from awx.main.utils import (get_awx_version, get_ansible_version,
get_custom_venv_choices, camelcase_to_underscore) get_custom_venv_choices, camelcase_to_underscore)
from awx.main import models from awx.main import models
from django.contrib.sessions.models import Session from django.contrib.sessions.models import Session
from awx.main.analytics import register, table_version from awx.main.analytics import register
''' '''
This module is used to define metrics collected by awx.main.analytics.gather() This module is used to define metrics collected by awx.main.analytics.gather()
@@ -31,7 +32,7 @@ data _since_ the last report date - i.e., new data in the last 24 hours)
''' '''
@register('config', '1.1') @register('config', '1.1', description=_('General platform configuration'))
def config(since): def config(since):
license_info = get_license(show_key=False) license_info = get_license(show_key=False)
install_type = 'traditional' install_type = 'traditional'
@@ -63,7 +64,7 @@ def config(since):
} }
@register('counts', '1.0') @register('counts', '1.0', description=_('Counts of objects such as organizations, inventories, and projects'))
def counts(since): def counts(since):
counts = {} counts = {}
for cls in (models.Organization, models.Team, models.User, for cls in (models.Organization, models.Team, models.User,
@@ -98,7 +99,7 @@ def counts(since):
return counts return counts
@register('org_counts', '1.0') @register('org_counts', '1.0', description=_('Counts of users and teams by organization'))
def org_counts(since): def org_counts(since):
counts = {} counts = {}
for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True), for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True),
@@ -110,7 +111,7 @@ def org_counts(since):
return counts return counts
@register('cred_type_counts', '1.0') @register('cred_type_counts', '1.0', description=_('Counts of credentials by credential type'))
def cred_type_counts(since): def cred_type_counts(since):
counts = {} counts = {}
for cred_type in models.CredentialType.objects.annotate(num_credentials=Count( for cred_type in models.CredentialType.objects.annotate(num_credentials=Count(
@@ -122,7 +123,7 @@ def cred_type_counts(since):
return counts return counts
@register('inventory_counts', '1.2') @register('inventory_counts', '1.2', description=_('Inventories, their inventory sources, and host counts'))
def inventory_counts(since): def inventory_counts(since):
counts = {} counts = {}
for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True), for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True),
@@ -147,7 +148,7 @@ def inventory_counts(since):
return counts return counts
@register('projects_by_scm_type', '1.0') @register('projects_by_scm_type', '1.0', description=_('Counts of projects by source control type'))
def projects_by_scm_type(since): def projects_by_scm_type(since):
counts = dict( counts = dict(
(t[0] or 'manual', 0) (t[0] or 'manual', 0)
@@ -166,7 +167,7 @@ def _get_isolated_datetime(last_check):
return last_check return last_check
@register('instance_info', '1.0') @register('instance_info', '1.0', description=_('Cluster topology and capacity'))
def instance_info(since, include_hostnames=False): def instance_info(since, include_hostnames=False):
info = {} info = {}
instances = models.Instance.objects.values_list('hostname').values( instances = models.Instance.objects.values_list('hostname').values(
@@ -192,7 +193,7 @@ def instance_info(since, include_hostnames=False):
return info return info
@register('job_counts', '1.0') @register('job_counts', '1.0', description=_('Counts of jobs by status'))
def job_counts(since): def job_counts(since):
counts = {} counts = {}
counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count() counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count()
@@ -202,7 +203,7 @@ def job_counts(since):
return counts return counts
@register('job_instance_counts', '1.0') @register('job_instance_counts', '1.0', description=_('Counts of jobs by execution node'))
def job_instance_counts(since): def job_instance_counts(since):
counts = {} counts = {}
job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list( job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list(
@@ -217,7 +218,7 @@ def job_instance_counts(since):
return counts return counts
@register('query_info', '1.0') @register('query_info', '1.0', description=_('Metadata about the analytics collected'))
def query_info(since, collection_type): def query_info(since, collection_type):
query_info = {} query_info = {}
query_info['last_run'] = str(since) query_info['last_run'] = str(since)
@@ -226,21 +227,17 @@ def query_info(since, collection_type):
return query_info return query_info
# Copies Job Events from db to a .csv to be shipped def _copy_table(table, query, path):
@table_version('events_table.csv', '1.1') file_path = os.path.join(path, table + '_table.csv')
@table_version('unified_jobs_table.csv', '1.1') file = open(file_path, 'w', encoding='utf-8')
@table_version('unified_job_template_table.csv', '1.0') with connection.cursor() as cursor:
@table_version('workflow_job_node_table.csv', '1.0') cursor.copy_expert(query, file)
@table_version('workflow_job_template_node_table.csv', '1.0') file.close()
def copy_tables(since, full_path, subset=None): return file_path
def _copy_table(table, query, path):
file_path = os.path.join(path, table + '_table.csv')
file = open(file_path, 'w', encoding='utf-8')
with connection.cursor() as cursor:
cursor.copy_expert(query, file)
file.close()
return file_path
@register('events_table', '1.1', format='csv', description=_('Automation task records'))
def events_table(since, full_path):
events_query = '''COPY (SELECT main_jobevent.id, events_query = '''COPY (SELECT main_jobevent.id,
main_jobevent.created, main_jobevent.created,
main_jobevent.uuid, main_jobevent.uuid,
@@ -264,9 +261,11 @@ def copy_tables(since, full_path, subset=None):
FROM main_jobevent FROM main_jobevent
WHERE main_jobevent.created > {} WHERE main_jobevent.created > {}
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
if not subset or 'events' in subset: return _copy_table(table='events', query=events_query, path=full_path)
_copy_table(table='events', query=events_query, path=full_path)
@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run'))
def unified_jobs_table(since, full_path):
unified_job_query = '''COPY (SELECT main_unifiedjob.id, unified_job_query = '''COPY (SELECT main_unifiedjob.id,
main_unifiedjob.polymorphic_ctype_id, main_unifiedjob.polymorphic_ctype_id,
django_content_type.model, django_content_type.model,
@@ -297,9 +296,11 @@ def copy_tables(since, full_path, subset=None):
WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0})
AND main_unifiedjob.launch_type != 'sync' AND main_unifiedjob.launch_type != 'sync'
ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
if not subset or 'unified_jobs' in subset: return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path)
_copy_table(table='unified_jobs', query=unified_job_query, path=full_path)
@register('unified_job_template_table', '1.0', format='csv', description=_('Data on job templates'))
def unified_job_template_table(since, full_path):
unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id,
main_unifiedjobtemplate.polymorphic_ctype_id, main_unifiedjobtemplate.polymorphic_ctype_id,
django_content_type.model, django_content_type.model,
@@ -318,9 +319,11 @@ def copy_tables(since, full_path, subset=None):
FROM main_unifiedjobtemplate, django_content_type FROM main_unifiedjobtemplate, django_content_type
WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id
ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER'''
if not subset or 'unified_job_template' in subset: return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
_copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'))
def workflow_job_node_table(since, full_path):
workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id,
main_workflowjobnode.created, main_workflowjobnode.created,
main_workflowjobnode.modified, main_workflowjobnode.modified,
@@ -351,9 +354,11 @@ def copy_tables(since, full_path, subset=None):
) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id
WHERE main_workflowjobnode.modified > {} WHERE main_workflowjobnode.modified > {}
ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"))
if not subset or 'workflow_job_node' in subset: return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path)
_copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path)
@register('workflow_job_template_node_table', '1.0', format='csv', description=_('Data on workflows'))
def workflow_job_template_node_table(since, full_path):
workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id,
main_workflowjobtemplatenode.created, main_workflowjobtemplatenode.created,
main_workflowjobtemplatenode.modified, main_workflowjobtemplatenode.modified,
@@ -381,7 +386,4 @@ def copy_tables(since, full_path, subset=None):
GROUP BY from_workflowjobtemplatenode_id GROUP BY from_workflowjobtemplatenode_id
) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id
ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER''' ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER'''
if not subset or 'workflow_job_template_node' in subset: return _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path)
_copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path)
return

View File

@@ -18,7 +18,7 @@ from awx.main.models.ha import TowerAnalyticsState
from awx.main.utils import get_awx_http_client_headers, set_environ from awx.main.utils import get_awx_http_client_headers, set_environ
__all__ = ['register', 'gather', 'ship', 'table_version'] __all__ = ['register', 'gather', 'ship']
logger = logging.getLogger('awx.main.analytics') logger = logging.getLogger('awx.main.analytics')
@@ -37,11 +37,27 @@ def _valid_license():
return True return True
def register(key, version): def all_collectors():
from awx.main.analytics import collectors
collector_dict = {}
module = collectors
for name, func in inspect.getmembers(module):
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'):
key = func.__awx_analytics_key__
desc = func.__awx_analytics_description__ or ''
version = func.__awx_analytics_version__
collector_dict[key] = { 'name': key, 'version': version, 'description': desc}
return collector_dict
def register(key, version, description=None, format='json'):
""" """
A decorator used to register a function as a metric collector. A decorator used to register a function as a metric collector.
Decorated functions should return JSON-serializable objects. Decorated functions should do the following based on format:
- json: return JSON-serializable objects.
- csv: write CSV data to a filename named 'key'
@register('projects_by_scm_type', 1) @register('projects_by_scm_type', 1)
def projects_by_scm_type(): def projects_by_scm_type():
@@ -51,28 +67,19 @@ def register(key, version):
def decorate(f): def decorate(f):
f.__awx_analytics_key__ = key f.__awx_analytics_key__ = key
f.__awx_analytics_version__ = version f.__awx_analytics_version__ = version
f.__awx_analytics_description__ = description
f.__awx_analytics_type__ = format
return f return f
return decorate return decorate
def table_version(file_name, version): def gather(dest=None, module=None, subset = None, collection_type='scheduled'):
global manifest
manifest[file_name] = version
def decorate(f):
return f
return decorate
def gather(dest=None, module=None, collection_type='scheduled'):
""" """
Gather all defined metrics and write them as JSON files in a .tgz Gather all defined metrics and write them as JSON files in a .tgz
:param dest: the (optional) absolute path to write a compressed tarball :param dest: the (optional) absolute path to write a compressed tarball
:pararm module: the module to search for registered analytic collector :param module: the module to search for registered analytic collector
functions; defaults to awx.main.analytics.collectors functions; defaults to awx.main.analytics.collectors
""" """
@@ -93,14 +100,21 @@ def gather(dest=None, module=None, collection_type='scheduled'):
logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.") logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
return return
collector_list = []
if module is None: if module is None:
from awx.main.analytics import collectors from awx.main.analytics import collectors
module = collectors module = collectors
for name, func in inspect.getmembers(module):
if (
inspect.isfunction(func) and
hasattr(func, '__awx_analytics_key__') and
(not subset or name in subset)
):
collector_list.append((name, func))
dest = dest or tempfile.mkdtemp(prefix='awx_analytics') dest = dest or tempfile.mkdtemp(prefix='awx_analytics')
for name, func in inspect.getmembers(module): for name, func in collector_list:
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'): if func.__awx_analytics_type__ == 'json':
key = func.__awx_analytics_key__ key = func.__awx_analytics_key__
manifest['{}.json'.format(key)] = func.__awx_analytics_version__ manifest['{}.json'.format(key)] = func.__awx_analytics_version__
path = '{}.json'.format(os.path.join(dest, key)) path = '{}.json'.format(os.path.join(dest, key))
@@ -114,7 +128,14 @@ def gather(dest=None, module=None, collection_type='scheduled'):
logger.exception("Could not generate metric {}.json".format(key)) logger.exception("Could not generate metric {}.json".format(key))
f.close() f.close()
os.remove(f.name) os.remove(f.name)
elif func.__awx_analytics_type__ == 'csv':
key = func.__awx_analytics_key__
manifest['{}.csv'.format(key)] = func.__awx_analytics_version__
try:
func(last_run, full_path=dest)
except Exception:
logger.exception("Could not generate metric {}.csv".format(key))
path = os.path.join(dest, 'manifest.json') path = os.path.join(dest, 'manifest.json')
with open(path, 'w', encoding='utf-8') as f: with open(path, 'w', encoding='utf-8') as f:
try: try:
@@ -124,11 +145,6 @@ def gather(dest=None, module=None, collection_type='scheduled'):
f.close() f.close()
os.remove(f.name) os.remove(f.name)
try:
collectors.copy_tables(since=last_run, full_path=dest)
except Exception:
logger.exception("Could not copy tables")
# can't use isoformat() since it has colons, which GNU tar doesn't like # can't use isoformat() since it has colons, which GNU tar doesn't like
tarname = '_'.join([ tarname = '_'.join([
settings.SYSTEM_UUID, settings.SYSTEM_UUID,

View File

@@ -373,8 +373,9 @@ def gather_analytics():
if acquired is False: if acquired is False:
logger.debug('Not gathering analytics, another task holds lock') logger.debug('Not gathering analytics, another task holds lock')
return return
subset = list(all_collectors().keys())
try: try:
tgz = analytics.gather() tgz = analytics.gather(subset=subset)
if not tgz: if not tgz:
return return
logger.info('gathered analytics: {}'.format(tgz)) logger.info('gathered analytics: {}'.format(tgz))

View File

@@ -86,7 +86,7 @@ def test_copy_tables_unified_job_query(
job_name = job_template.create_unified_job().name job_name = job_template.create_unified_job().name
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset="unified_jobs") collectors.unified_jobs_table(time_start, tmpdir, subset="unified_jobs")
with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f:
lines = "".join([line for line in f]) lines = "".join([line for line in f])
@@ -134,7 +134,7 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
time_start = now() - timedelta(hours=9) time_start = now() - timedelta(hours=9)
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query") collectors.workflow_job_node_table(time_start, tmpdir, subset="workflow_job_node_query")
with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f:
reader = csv.reader(f) reader = csv.reader(f)
# Pop the headers # Pop the headers