Ensure we do not send large bundles, or empty bundles

Collect expensive collectors separately, and in a loop
where we make smaller intermediate dumps.

Don't return a table dump if there are no records, and
don't put that CSV in the manifest.

Fix up unit tests.
This commit is contained in:
Bill Nottingham
2020-09-09 17:10:51 -04:00
parent 1c4b06fe1e
commit 40309e6f70
6 changed files with 126 additions and 53 deletions

View File

@@ -1 +1 @@
from .core import all_collectors, register, gather, ship # noqa from .core import all_collectors, expensive_collectors, register, gather, ship # noqa

View File

@@ -32,8 +32,8 @@ data _since_ the last report date - i.e., new data in the last 24 hours)
''' '''
@register('config', '1.1', description=_('General platform configuration')) @register('config', '1.1', description=_('General platform configuration.'))
def config(since): def config(since, **kwargs):
license_info = get_license(show_key=False) license_info = get_license(show_key=False)
install_type = 'traditional' install_type = 'traditional'
if os.environ.get('container') == 'oci': if os.environ.get('container') == 'oci':
@@ -65,7 +65,7 @@ def config(since):
@register('counts', '1.0', description=_('Counts of objects such as organizations, inventories, and projects')) @register('counts', '1.0', description=_('Counts of objects such as organizations, inventories, and projects'))
def counts(since): def counts(since, **kwargs):
counts = {} counts = {}
for cls in (models.Organization, models.Team, models.User, for cls in (models.Organization, models.Team, models.User,
models.Inventory, models.Credential, models.Project, models.Inventory, models.Credential, models.Project,
@@ -100,7 +100,7 @@ def counts(since):
@register('org_counts', '1.0', description=_('Counts of users and teams by organization')) @register('org_counts', '1.0', description=_('Counts of users and teams by organization'))
def org_counts(since): def org_counts(since, **kwargs):
counts = {} counts = {}
for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True), for org in models.Organization.objects.annotate(num_users=Count('member_role__members', distinct=True),
num_teams=Count('teams', distinct=True)).values('name', 'id', 'num_users', 'num_teams'): num_teams=Count('teams', distinct=True)).values('name', 'id', 'num_users', 'num_teams'):
@@ -112,7 +112,7 @@ def org_counts(since):
@register('cred_type_counts', '1.0', description=_('Counts of credentials by credential type')) @register('cred_type_counts', '1.0', description=_('Counts of credentials by credential type'))
def cred_type_counts(since): def cred_type_counts(since, **kwargs):
counts = {} counts = {}
for cred_type in models.CredentialType.objects.annotate(num_credentials=Count( for cred_type in models.CredentialType.objects.annotate(num_credentials=Count(
'credentials', distinct=True)).values('name', 'id', 'managed_by_tower', 'num_credentials'): 'credentials', distinct=True)).values('name', 'id', 'managed_by_tower', 'num_credentials'):
@@ -124,7 +124,7 @@ def cred_type_counts(since):
@register('inventory_counts', '1.2', description=_('Inventories, their inventory sources, and host counts')) @register('inventory_counts', '1.2', description=_('Inventories, their inventory sources, and host counts'))
def inventory_counts(since): def inventory_counts(since, **kwargs):
counts = {} counts = {}
for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True), for inv in models.Inventory.objects.filter(kind='').annotate(num_sources=Count('inventory_sources', distinct=True),
num_hosts=Count('hosts', distinct=True)).only('id', 'name', 'kind'): num_hosts=Count('hosts', distinct=True)).only('id', 'name', 'kind'):
@@ -149,7 +149,7 @@ def inventory_counts(since):
@register('projects_by_scm_type', '1.0', description=_('Counts of projects by source control type')) @register('projects_by_scm_type', '1.0', description=_('Counts of projects by source control type'))
def projects_by_scm_type(since): def projects_by_scm_type(since, **kwargs):
counts = dict( counts = dict(
(t[0] or 'manual', 0) (t[0] or 'manual', 0)
for t in models.Project.SCM_TYPE_CHOICES for t in models.Project.SCM_TYPE_CHOICES
@@ -168,7 +168,7 @@ def _get_isolated_datetime(last_check):
@register('instance_info', '1.0', description=_('Cluster topology and capacity')) @register('instance_info', '1.0', description=_('Cluster topology and capacity'))
def instance_info(since, include_hostnames=False): def instance_info(since, include_hostnames=False, **kwargs):
info = {} info = {}
instances = models.Instance.objects.values_list('hostname').values( instances = models.Instance.objects.values_list('hostname').values(
'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'last_isolated_check', 'enabled') 'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'last_isolated_check', 'enabled')
@@ -194,7 +194,7 @@ def instance_info(since, include_hostnames=False):
@register('job_counts', '1.0', description=_('Counts of jobs by status')) @register('job_counts', '1.0', description=_('Counts of jobs by status'))
def job_counts(since): def job_counts(since, **kwargs):
counts = {} counts = {}
counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count() counts['total_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').count()
counts['status'] = dict(models.UnifiedJob.objects.exclude(launch_type='sync').values_list('status').annotate(Count('status')).order_by()) counts['status'] = dict(models.UnifiedJob.objects.exclude(launch_type='sync').values_list('status').annotate(Count('status')).order_by())
@@ -204,7 +204,7 @@ def job_counts(since):
@register('job_instance_counts', '1.0', description=_('Counts of jobs by execution node')) @register('job_instance_counts', '1.0', description=_('Counts of jobs by execution node'))
def job_instance_counts(since): def job_instance_counts(since, **kwargs):
counts = {} counts = {}
job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list( job_types = models.UnifiedJob.objects.exclude(launch_type='sync').values_list(
'execution_node', 'launch_type').annotate(job_launch_type=Count('launch_type')).order_by() 'execution_node', 'launch_type').annotate(job_launch_type=Count('launch_type')).order_by()
@@ -219,7 +219,7 @@ def job_instance_counts(since):
@register('query_info', '1.0', description=_('Metadata about the analytics collected')) @register('query_info', '1.0', description=_('Metadata about the analytics collected'))
def query_info(since, collection_type): def query_info(since, collection_type, **kwargs):
query_info = {} query_info = {}
query_info['last_run'] = str(since) query_info['last_run'] = str(since)
query_info['current_time'] = str(now()) query_info['current_time'] = str(now())
@@ -233,11 +233,19 @@ def _copy_table(table, query, path):
with connection.cursor() as cursor: with connection.cursor() as cursor:
cursor.copy_expert(query, file) cursor.copy_expert(query, file)
file.close() file.close()
# Ensure we actually dumped data, and not just headers
file = open(file_path, 'r', encoding='utf-8')
file.readline()
data = file.readline()
file.close()
if not data:
os.remove(file_path)
return None
return file_path return file_path
@register('events_table', '1.1', format='csv', description=_('Automation task records')) @register('events_table', '1.1', format='csv', description=_('Automation task records'), expensive=True)
def events_table(since, full_path): def events_table(since, full_path, until, **kwargs):
events_query = '''COPY (SELECT main_jobevent.id, events_query = '''COPY (SELECT main_jobevent.id,
main_jobevent.created, main_jobevent.created,
main_jobevent.uuid, main_jobevent.uuid,
@@ -259,13 +267,14 @@ def events_table(since, full_path):
main_jobevent.event_data::json->'res'->'warnings' AS warnings, main_jobevent.event_data::json->'res'->'warnings' AS warnings,
main_jobevent.event_data::json->'res'->'deprecations' AS deprecations main_jobevent.event_data::json->'res'->'deprecations' AS deprecations
FROM main_jobevent FROM main_jobevent
WHERE main_jobevent.created > {} WHERE (main_jobevent.created > {} AND main_jobevent.created <= {})
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER
'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'"))
return _copy_table(table='events', query=events_query, path=full_path) return _copy_table(table='events', query=events_query, path=full_path)
@register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run')) @register('unified_jobs_table', '1.1', format='csv', description=_('Data on jobs run'), expensive=True)
def unified_jobs_table(since, full_path): def unified_jobs_table(since, full_path, until, **kwargs):
unified_job_query = '''COPY (SELECT main_unifiedjob.id, unified_job_query = '''COPY (SELECT main_unifiedjob.id,
main_unifiedjob.polymorphic_ctype_id, main_unifiedjob.polymorphic_ctype_id,
django_content_type.model, django_content_type.model,
@@ -293,14 +302,16 @@ def unified_jobs_table(since, full_path):
LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id
LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id
LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id
WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) WHERE ((main_unifiedjob.created > {0} AND main_unifiedjob.created <= {1})
OR (main_unifiedjob.finished > {0} AND main_unifiedjob.finished <= {1}))
AND main_unifiedjob.launch_type != 'sync' AND main_unifiedjob.launch_type != 'sync'
ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER
'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'"))
return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path)
@register('unified_job_template_table', '1.0', format='csv', description=_('Data on job templates')) @register('unified_job_template_table', '1.0', format='csv', description=_('Data on job templates'))
def unified_job_template_table(since, full_path): def unified_job_template_table(since, full_path, **kwargs):
unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id,
main_unifiedjobtemplate.polymorphic_ctype_id, main_unifiedjobtemplate.polymorphic_ctype_id,
django_content_type.model, django_content_type.model,
@@ -322,8 +333,8 @@ def unified_job_template_table(since, full_path):
return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs')) @register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True)
def workflow_job_node_table(since, full_path): def workflow_job_node_table(since, full_path, until, **kwargs):
workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id,
main_workflowjobnode.created, main_workflowjobnode.created,
main_workflowjobnode.modified, main_workflowjobnode.modified,
@@ -352,13 +363,14 @@ def workflow_job_node_table(since, full_path):
FROM main_workflowjobnode_always_nodes FROM main_workflowjobnode_always_nodes
GROUP BY from_workflowjobnode_id GROUP BY from_workflowjobnode_id
) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id
WHERE main_workflowjobnode.modified > {} WHERE (main_workflowjobnode.modified > {} AND main_workflowjobnode.modified <= {})
ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER
'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'"),until.strftime("'%Y-%m-%d %H:%M:%S'"))
return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) return _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path)
@register('workflow_job_template_node_table', '1.0', format='csv', description=_('Data on workflows')) @register('workflow_job_template_node_table', '1.0', format='csv', description=_('Data on workflows'))
def workflow_job_template_node_table(since, full_path): def workflow_job_template_node_table(since, full_path, **kwargs):
workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id,
main_workflowjobtemplatenode.created, main_workflowjobtemplatenode.created,
main_workflowjobtemplatenode.modified, main_workflowjobtemplatenode.modified,

View File

@@ -17,14 +17,11 @@ from awx.main.access import access_registry
from awx.main.models.ha import TowerAnalyticsState from awx.main.models.ha import TowerAnalyticsState
from awx.main.utils import get_awx_http_client_headers, set_environ from awx.main.utils import get_awx_http_client_headers, set_environ
__all__ = ['register', 'gather', 'ship'] __all__ = ['register', 'gather', 'ship']
logger = logging.getLogger('awx.main.analytics') logger = logging.getLogger('awx.main.analytics')
manifest = dict()
def _valid_license(): def _valid_license():
try: try:
@@ -51,7 +48,18 @@ def all_collectors():
return collector_dict return collector_dict
def register(key, version, description=None, format='json'): def expensive_collectors():
from awx.main.analytics import collectors
ret = []
module = collectors
for name, func in inspect.getmembers(module):
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__:
ret.append(func.__awx_analytics_key__)
return ret
def register(key, version, description=None, format='json', expensive=False):
""" """
A decorator used to register a function as a metric collector. A decorator used to register a function as a metric collector.
@@ -69,12 +77,13 @@ def register(key, version, description=None, format='json'):
f.__awx_analytics_version__ = version f.__awx_analytics_version__ = version
f.__awx_analytics_description__ = description f.__awx_analytics_description__ = description
f.__awx_analytics_type__ = format f.__awx_analytics_type__ = format
f.__awx_expensive__ = expensive
return f return f
return decorate return decorate
def gather(dest=None, module=None, subset = None, collection_type='scheduled'): def gather(dest=None, module=None, subset = None, since = None, until = now(), collection_type='scheduled'):
""" """
Gather all defined metrics and write them as JSON files in a .tgz Gather all defined metrics and write them as JSON files in a .tgz
@@ -91,6 +100,9 @@ def gather(dest=None, module=None, subset = None, collection_type='scheduled'):
max_interval = now() - timedelta(weeks=4) max_interval = now() - timedelta(weeks=4)
if last_run < max_interval or not last_run: if last_run < max_interval or not last_run:
last_run = max_interval last_run = max_interval
if since:
last_run = since
logger.debug("Gathering overriden to start at: {}".format(since))
if _valid_license() is False: if _valid_license() is False:
logger.exception("Invalid License provided, or No License Provided") logger.exception("Invalid License provided, or No License Provided")
@@ -112,30 +124,49 @@ def gather(dest=None, module=None, subset = None, collection_type='scheduled'):
): ):
collector_list.append((name, func)) collector_list.append((name, func))
manifest = dict()
dest = dest or tempfile.mkdtemp(prefix='awx_analytics') dest = dest or tempfile.mkdtemp(prefix='awx_analytics')
for name, func in collector_list: for name, func in collector_list:
if func.__awx_analytics_type__ == 'json': if func.__awx_analytics_type__ == 'json':
key = func.__awx_analytics_key__ key = func.__awx_analytics_key__
manifest['{}.json'.format(key)] = func.__awx_analytics_version__
path = '{}.json'.format(os.path.join(dest, key)) path = '{}.json'.format(os.path.join(dest, key))
with open(path, 'w', encoding='utf-8') as f: with open(path, 'w', encoding='utf-8') as f:
try: try:
if func.__name__ == 'query_info': json.dump(func(last_run, collection_type=collection_type, until=until), f)
json.dump(func(last_run, collection_type=collection_type), f) manifest['{}.json'.format(key)] = func.__awx_analytics_version__
else:
json.dump(func(last_run), f)
except Exception: except Exception:
logger.exception("Could not generate metric {}.json".format(key)) logger.exception("Could not generate metric {}.json".format(key))
f.close() f.close()
os.remove(f.name) os.remove(f.name)
elif func.__awx_analytics_type__ == 'csv': elif func.__awx_analytics_type__ == 'csv':
key = func.__awx_analytics_key__ key = func.__awx_analytics_key__
manifest['{}.csv'.format(key)] = func.__awx_analytics_version__
try: try:
func(last_run, full_path=dest) if func(last_run, full_path=dest, until=until):
manifest['{}.csv'.format(key)] = func.__awx_analytics_version__
except Exception: except Exception:
logger.exception("Could not generate metric {}.csv".format(key)) logger.exception("Could not generate metric {}.csv".format(key))
if not manifest:
# No data was collected
logger.warning("No data from {} to {}".format(last_run, until))
shutil.rmtree(dest)
return None
# Always include config.json if we're using our collectors
if 'config.json' not in manifest.keys() and not module:
from awx.main.analytics import collectors
path = '{}.json'.format(os.path.join(dest, key))
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(collectors.config(last_run), f)
manifest['config.json'] = collectors.config.__awx_analytics_version__
except Exception:
logger.exception("Could not generate metric {}.json".format(key))
f.close()
os.remove(f.name)
shutil.rmtree(dest)
return None
path = os.path.join(dest, 'manifest.json') path = os.path.join(dest, 'manifest.json')
with open(path, 'w', encoding='utf-8') as f: with open(path, 'w', encoding='utf-8') as f:
try: try:

View File

@@ -53,6 +53,7 @@ import ansible_runner
from awx import __version__ as awx_application_version from awx import __version__ as awx_application_version
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
from awx.main.access import access_registry from awx.main.access import access_registry
from awx.main.analytics import all_collectors, expensive_collectors
from awx.main.redact import UriCleaner from awx.main.redact import UriCleaner
from awx.main.models import ( from awx.main.models import (
Schedule, TowerScheduleState, Instance, InstanceGroup, Schedule, TowerScheduleState, Instance, InstanceGroup,
@@ -355,6 +356,22 @@ def send_notifications(notification_list, job_id=None):
@task(queue=get_local_queuename) @task(queue=get_local_queuename)
def gather_analytics(): def gather_analytics():
def _gather_and_ship(subset, since, until):
try:
tgz = analytics.gather(subset=subset, since=since, until=until)
# empty analytics without raising an exception is not an error
if not tgz:
return True
logger.info('gathered analytics: {}'.format(tgz))
analytics.ship(tgz)
except Exception:
logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until))
return False
finally:
if os.path.exists(tgz):
os.remove(tgz)
return True
from awx.conf.models import Setting from awx.conf.models import Setting
from rest_framework.fields import DateTimeField from rest_framework.fields import DateTimeField
if not settings.INSIGHTS_TRACKING_STATE: if not settings.INSIGHTS_TRACKING_STATE:
@@ -374,16 +391,28 @@ def gather_analytics():
logger.debug('Not gathering analytics, another task holds lock') logger.debug('Not gathering analytics, another task holds lock')
return return
subset = list(all_collectors().keys()) subset = list(all_collectors().keys())
try: incremental_collectors = []
tgz = analytics.gather(subset=subset) for collector in expensive_collectors():
if not tgz: if collector in subset:
return subset.remove(collector)
logger.info('gathered analytics: {}'.format(tgz)) incremental_collectors.append(collector)
analytics.ship(tgz)
settings.AUTOMATION_ANALYTICS_LAST_GATHER = gather_time # Cap gathering at 4 weeks of data if there has been no data gathering
finally: since = last_time or (gather_time - timedelta(weeks=4))
if os.path.exists(tgz):
os.remove(tgz) if incremental_collectors:
start = since
until = None
while start < gather_time:
until = start + timedelta(minutes=20)
if (until > gather_time):
until = gather_time
if not _gather_and_ship(incremental_collectors, since=start, until=until):
break
start = until
settings.AUTOMATION_ANALYTICS_LAST_GATHER = until
if subset:
_gather_and_ship(subset, since=since, until=gather_time)
@task(queue=get_local_queuename) @task(queue=get_local_queuename)

View File

@@ -28,6 +28,7 @@ def sqlite_copy_expert(request):
def write_stdout(self, sql, fd): def write_stdout(self, sql, fd):
# Would be cool if we instead properly disected the SQL query and verified # Would be cool if we instead properly disected the SQL query and verified
# it that way. But instead, we just take the nieve approach here. # it that way. But instead, we just take the nieve approach here.
sql = sql.strip()
assert sql.startswith("COPY (") assert sql.startswith("COPY (")
assert sql.endswith(") TO STDOUT WITH CSV HEADER") assert sql.endswith(") TO STDOUT WITH CSV HEADER")
@@ -86,7 +87,7 @@ def test_copy_tables_unified_job_query(
job_name = job_template.create_unified_job().name job_name = job_template.create_unified_job().name
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
collectors.unified_jobs_table(time_start, tmpdir, subset="unified_jobs") collectors.unified_jobs_table(time_start, tmpdir, until = now() + timedelta(seconds=1))
with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f:
lines = "".join([line for line in f]) lines = "".join([line for line in f])
@@ -134,7 +135,7 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job):
time_start = now() - timedelta(hours=9) time_start = now() - timedelta(hours=9)
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
collectors.workflow_job_node_table(time_start, tmpdir, subset="workflow_job_node_query") collectors.workflow_job_node_table(time_start, tmpdir, until = now() + timedelta(seconds=1))
with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f:
reader = csv.reader(f) reader = csv.reader(f)
# Pop the headers # Pop the headers

View File

@@ -10,17 +10,17 @@ from awx.main.analytics import gather, register
@register('example', '1.0') @register('example', '1.0')
def example(since): def example(since, **kwargs):
return {'awx': 123} return {'awx': 123}
@register('bad_json', '1.0') @register('bad_json', '1.0')
def bad_json(since): def bad_json(since, **kwargs):
return set() return set()
@register('throws_error', '1.0') @register('throws_error', '1.0')
def throws_error(since): def throws_error(since, **kwargs):
raise ValueError() raise ValueError()