mirror of
https://github.com/ansible/awx.git
synced 2026-03-13 23:17:32 -02:30
Merge pull request #9358 from jbradberry/job-event-analytics-v1
Gather job event analytics by last pk instead of created datetime SUMMARY ISSUE TYPE Feature Pull Request COMPONENT NAME API AWX VERSION awx: 17.0.1 Reviewed-by: Bill Nottingham <None> Reviewed-by: Jeff Bradberry <None> Reviewed-by: Ryan Petrello <None> Reviewed-by: Ladislav Smola <lsmola@redhat.com> Reviewed-by: Christian Adams <rooftopcellist@gmail.com> Reviewed-by: Jim Ladd <None> Reviewed-by: Shane McDonald <me@shanemcd.com> Reviewed-by: Bianca Henderson <beeankha@gmail.com> Reviewed-by: Rebeccah Hunter <rhunter@redhat.com>
This commit is contained in:
@@ -1 +1 @@
|
||||
from .core import all_collectors, expensive_collectors, register, gather, ship # noqa
|
||||
from .core import all_collectors, register, gather, ship # noqa
|
||||
|
||||
@@ -1,22 +1,23 @@
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import os.path
|
||||
import platform
|
||||
import distro
|
||||
|
||||
from django.db import connection
|
||||
from django.db.models import Count
|
||||
from django.db.models import Count, Max, Min
|
||||
from django.conf import settings
|
||||
from django.utils.timezone import now
|
||||
from django.contrib.sessions.models import Session
|
||||
from django.utils.timezone import now, timedelta
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from awx.conf.license import get_license
|
||||
from awx.main.utils import get_awx_version, get_custom_venv_choices, camelcase_to_underscore
|
||||
from awx.main.utils import get_awx_version, get_custom_venv_choices, camelcase_to_underscore, datetime_hook
|
||||
from awx.main import models
|
||||
from django.contrib.sessions.models import Session
|
||||
from awx.main.analytics import register
|
||||
|
||||
'''
|
||||
"""
|
||||
This module is used to define metrics collected by awx.main.analytics.gather()
|
||||
Each function is decorated with a key name, and should return a data
|
||||
structure that can be serialized to JSON
|
||||
@@ -30,7 +31,61 @@ All functions - when called - will be passed a datetime.datetime object,
|
||||
`since`, which represents the last time analytics were gathered (some metrics
|
||||
functions - like those that return metadata about playbook runs, may return
|
||||
data _since_ the last report date - i.e., new data in the last 24 hours)
|
||||
'''
|
||||
"""
|
||||
|
||||
|
||||
def trivial_slicing(key, since, until):
|
||||
if since is not None:
|
||||
return [(since, until)]
|
||||
|
||||
from awx.conf.models import Setting
|
||||
|
||||
horizon = until - timedelta(weeks=4)
|
||||
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
|
||||
last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)
|
||||
last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon)
|
||||
return [(last_entry, until)]
|
||||
|
||||
|
||||
def four_hour_slicing(key, since, until):
|
||||
if since is not None:
|
||||
last_entry = since
|
||||
else:
|
||||
from awx.conf.models import Setting
|
||||
|
||||
horizon = until - timedelta(weeks=4)
|
||||
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
|
||||
last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)
|
||||
last_entry = max(last_entries.get(key) or settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon)
|
||||
|
||||
start, end = last_entry, None
|
||||
while start < until:
|
||||
end = min(start + timedelta(hours=4), until)
|
||||
yield (start, end)
|
||||
start = end
|
||||
|
||||
|
||||
def events_slicing(key, since, until):
|
||||
from awx.conf.models import Setting
|
||||
|
||||
last_gather = settings.AUTOMATION_ANALYTICS_LAST_GATHER
|
||||
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
|
||||
last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)
|
||||
horizon = until - timedelta(weeks=4)
|
||||
|
||||
lower = since or last_gather or horizon
|
||||
if not since and last_entries.get(key):
|
||||
lower = horizon
|
||||
pk_values = models.JobEvent.objects.filter(created__gte=lower, created__lte=until).aggregate(Min('pk'), Max('pk'))
|
||||
|
||||
previous_pk = pk_values['pk__min'] - 1 if pk_values['pk__min'] is not None else 0
|
||||
if not since and last_entries.get(key):
|
||||
previous_pk = max(last_entries[key], previous_pk)
|
||||
final_pk = pk_values['pk__max'] or 0
|
||||
|
||||
step = 100000
|
||||
for start in range(previous_pk, final_pk + 1, step):
|
||||
yield (start, min(start + step, final_pk))
|
||||
|
||||
|
||||
@register('config', '1.3', description=_('General platform configuration.'))
|
||||
@@ -265,12 +320,15 @@ class FileSplitter(io.StringIO):
|
||||
self.files = self.files[:-1]
|
||||
# If we only have one file, remove the suffix
|
||||
if len(self.files) == 1:
|
||||
os.rename(self.files[0], self.files[0].replace('_split0', ''))
|
||||
filename = self.files.pop()
|
||||
new_filename = filename.replace('_split0', '')
|
||||
os.rename(filename, new_filename)
|
||||
self.files.append(new_filename)
|
||||
return self.files
|
||||
|
||||
def write(self, s):
|
||||
if not self.header:
|
||||
self.header = s[0 : s.index('\n')]
|
||||
self.header = s[: s.index('\n')]
|
||||
self.counter += self.currentfile.write(s)
|
||||
if self.counter >= MAX_TABLE_SIZE:
|
||||
self.cycle_file()
|
||||
@@ -284,7 +342,7 @@ def _copy_table(table, query, path):
|
||||
return file.file_list()
|
||||
|
||||
|
||||
@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=True)
|
||||
@register('events_table', '1.2', format='csv', description=_('Automation task records'), expensive=events_slicing)
|
||||
def events_table(since, full_path, until, **kwargs):
|
||||
events_query = '''COPY (SELECT main_jobevent.id,
|
||||
main_jobevent.created,
|
||||
@@ -302,22 +360,22 @@ def events_table(since, full_path, until, **kwargs):
|
||||
main_jobevent.role,
|
||||
main_jobevent.job_id,
|
||||
main_jobevent.host_id,
|
||||
main_jobevent.host_name
|
||||
, CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start,
|
||||
main_jobevent.host_name,
|
||||
CAST(main_jobevent.event_data::json->>'start' AS TIMESTAMP WITH TIME ZONE) AS start,
|
||||
CAST(main_jobevent.event_data::json->>'end' AS TIMESTAMP WITH TIME ZONE) AS end,
|
||||
main_jobevent.event_data::json->'duration' AS duration,
|
||||
main_jobevent.event_data::json->'res'->'warnings' AS warnings,
|
||||
main_jobevent.event_data::json->'res'->'deprecations' AS deprecations
|
||||
FROM main_jobevent
|
||||
WHERE (main_jobevent.created > '{}' AND main_jobevent.created <= '{}')
|
||||
WHERE (main_jobevent.id > {} AND main_jobevent.id <= {})
|
||||
ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER
|
||||
'''.format(
|
||||
since.isoformat(), until.isoformat()
|
||||
since, until
|
||||
)
|
||||
return _copy_table(table='events', query=events_query, path=full_path)
|
||||
|
||||
|
||||
@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=True)
|
||||
@register('unified_jobs_table', '1.2', format='csv', description=_('Data on jobs run'), expensive=four_hour_slicing)
|
||||
def unified_jobs_table(since, full_path, until, **kwargs):
|
||||
unified_job_query = '''COPY (SELECT main_unifiedjob.id,
|
||||
main_unifiedjob.polymorphic_ctype_id,
|
||||
@@ -381,7 +439,7 @@ def unified_job_template_table(since, full_path, **kwargs):
|
||||
return _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path)
|
||||
|
||||
|
||||
@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=True)
|
||||
@register('workflow_job_node_table', '1.0', format='csv', description=_('Data on workflow runs'), expensive=four_hour_slicing)
|
||||
def workflow_job_node_table(since, full_path, until, **kwargs):
|
||||
workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id,
|
||||
main_workflowjobnode.created,
|
||||
|
||||
@@ -1,20 +1,25 @@
|
||||
import inspect
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
import tempfile
|
||||
import pathlib
|
||||
import shutil
|
||||
import requests
|
||||
import tarfile
|
||||
import tempfile
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.serializers.json import DjangoJSONEncoder
|
||||
from django.utils.timezone import now, timedelta
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
import requests
|
||||
|
||||
from awx.conf.license import get_license
|
||||
from awx.main.models import Job
|
||||
from awx.main.access import access_registry
|
||||
from awx.main.utils import get_awx_http_client_headers, set_environ
|
||||
from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook
|
||||
from awx.main.utils.pglock import advisory_lock
|
||||
|
||||
__all__ = ['register', 'gather', 'ship']
|
||||
|
||||
@@ -36,29 +41,18 @@ def _valid_license():
|
||||
def all_collectors():
|
||||
from awx.main.analytics import collectors
|
||||
|
||||
collector_dict = {}
|
||||
module = collectors
|
||||
for name, func in inspect.getmembers(module):
|
||||
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__'):
|
||||
key = func.__awx_analytics_key__
|
||||
desc = func.__awx_analytics_description__ or ''
|
||||
version = func.__awx_analytics_version__
|
||||
collector_dict[key] = {'name': key, 'version': version, 'description': desc}
|
||||
return collector_dict
|
||||
return {
|
||||
func.__awx_analytics_key__: {
|
||||
'name': func.__awx_analytics_key__,
|
||||
'version': func.__awx_analytics_version__,
|
||||
'description': func.__awx_analytics_description__ or '',
|
||||
}
|
||||
for name, func in inspect.getmembers(collectors)
|
||||
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__')
|
||||
}
|
||||
|
||||
|
||||
def expensive_collectors():
|
||||
from awx.main.analytics import collectors
|
||||
|
||||
ret = []
|
||||
module = collectors
|
||||
for name, func in inspect.getmembers(module):
|
||||
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and func.__awx_expensive__:
|
||||
ret.append(func.__awx_analytics_key__)
|
||||
return ret
|
||||
|
||||
|
||||
def register(key, version, description=None, format='json', expensive=False):
|
||||
def register(key, version, description=None, format='json', expensive=None):
|
||||
"""
|
||||
A decorator used to register a function as a metric collector.
|
||||
|
||||
@@ -82,136 +76,206 @@ def register(key, version, description=None, format='json', expensive=False):
|
||||
return decorate
|
||||
|
||||
|
||||
def gather(dest=None, module=None, subset=None, since=None, until=now(), collection_type='scheduled'):
|
||||
def package(target, data, timestamp):
|
||||
try:
|
||||
tarname_base = f'{settings.SYSTEM_UUID}-{timestamp.strftime("%Y-%m-%d-%H%M%S%z")}'
|
||||
path = pathlib.Path(target)
|
||||
index = len(list(path.glob(f'{tarname_base}-*.*')))
|
||||
tarname = f'{tarname_base}-{index}.tar.gz'
|
||||
|
||||
manifest = {}
|
||||
with tarfile.open(target.joinpath(tarname), 'w:gz') as f:
|
||||
for name, (item, version) in data.items():
|
||||
try:
|
||||
if isinstance(item, str):
|
||||
f.add(item, arcname=f'./{name}')
|
||||
else:
|
||||
buf = json.dumps(item).encode('utf-8')
|
||||
info = tarfile.TarInfo(f'./{name}')
|
||||
info.size = len(buf)
|
||||
info.mtime = timestamp.timestamp()
|
||||
f.addfile(info, fileobj=io.BytesIO(buf))
|
||||
manifest[name] = version
|
||||
except Exception:
|
||||
logger.exception(f"Could not generate metric {name}")
|
||||
return None
|
||||
|
||||
try:
|
||||
buf = json.dumps(manifest).encode('utf-8')
|
||||
info = tarfile.TarInfo('./manifest.json')
|
||||
info.size = len(buf)
|
||||
info.mtime = timestamp.timestamp()
|
||||
f.addfile(info, fileobj=io.BytesIO(buf))
|
||||
except Exception:
|
||||
logger.exception("Could not generate manifest.json")
|
||||
return None
|
||||
|
||||
return f.name
|
||||
except Exception:
|
||||
logger.exception("Failed to write analytics archive file")
|
||||
return None
|
||||
|
||||
|
||||
def gather(dest=None, module=None, subset=None, since=None, until=None, collection_type='scheduled'):
|
||||
"""
|
||||
Gather all defined metrics and write them as JSON files in a .tgz
|
||||
|
||||
:param dest: the (optional) absolute path to write a compressed tarball
|
||||
:param dest: the (optional) absolute path to write a compressed tarball
|
||||
:param module: the module to search for registered analytic collector
|
||||
functions; defaults to awx.main.analytics.collectors
|
||||
functions; defaults to awx.main.analytics.collectors
|
||||
"""
|
||||
log_level = logging.ERROR if collection_type != 'scheduled' else logging.DEBUG
|
||||
|
||||
def _write_manifest(destdir, manifest):
|
||||
path = os.path.join(destdir, 'manifest.json')
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
try:
|
||||
json.dump(manifest, f)
|
||||
except Exception:
|
||||
f.close()
|
||||
os.remove(f.name)
|
||||
logger.exception("Could not generate manifest.json")
|
||||
|
||||
last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4))
|
||||
logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER))
|
||||
|
||||
if _valid_license() is False:
|
||||
logger.exception("Invalid License provided, or No License Provided")
|
||||
if not _valid_license():
|
||||
logger.log(log_level, "Invalid License provided, or No License Provided")
|
||||
return None
|
||||
|
||||
if collection_type != 'dry-run' and not settings.INSIGHTS_TRACKING_STATE:
|
||||
logger.error("Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
|
||||
return None
|
||||
if collection_type != 'dry-run':
|
||||
if not settings.INSIGHTS_TRACKING_STATE:
|
||||
logger.log(log_level, "Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
|
||||
return None
|
||||
|
||||
collector_list = []
|
||||
if module:
|
||||
collector_module = module
|
||||
else:
|
||||
if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD):
|
||||
logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.")
|
||||
return None
|
||||
|
||||
with advisory_lock('gather_analytics_lock', wait=False) as acquired:
|
||||
if not acquired:
|
||||
logger.log(log_level, "Not gathering analytics, another task holds lock")
|
||||
return None
|
||||
|
||||
from awx.conf.models import Setting
|
||||
from awx.main.analytics import collectors
|
||||
from awx.main.signals import disable_activity_stream
|
||||
|
||||
collector_module = collectors
|
||||
for name, func in inspect.getmembers(collector_module):
|
||||
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset):
|
||||
collector_list.append((name, func))
|
||||
_now = now()
|
||||
until = _now if until is None else min(until, _now) # Make sure the end isn't in the future.
|
||||
horizon = until - timedelta(weeks=4)
|
||||
if since is not None:
|
||||
# Make sure the start isn't in the future or more than 4 weeks prior to `until`.
|
||||
since = max(min(since, _now), horizon)
|
||||
if since and since >= until:
|
||||
logger.warning("Start of the collection interval is later than the end, ignoring request.")
|
||||
return None
|
||||
|
||||
manifest = dict()
|
||||
dest = dest or tempfile.mkdtemp(prefix='awx_analytics')
|
||||
gather_dir = os.path.join(dest, 'stage')
|
||||
os.mkdir(gather_dir, 0o700)
|
||||
num_splits = 1
|
||||
for name, func in collector_list:
|
||||
if func.__awx_analytics_type__ == 'json':
|
||||
key = func.__awx_analytics_key__
|
||||
path = '{}.json'.format(os.path.join(gather_dir, key))
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
try:
|
||||
json.dump(func(last_run, collection_type=collection_type, until=until), f)
|
||||
manifest['{}.json'.format(key)] = func.__awx_analytics_version__
|
||||
except Exception:
|
||||
logger.exception("Could not generate metric {}.json".format(key))
|
||||
f.close()
|
||||
os.remove(f.name)
|
||||
elif func.__awx_analytics_type__ == 'csv':
|
||||
logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER))
|
||||
# LAST_GATHER time should always get truncated to less than 4 weeks back.
|
||||
last_gather = max(settings.AUTOMATION_ANALYTICS_LAST_GATHER or horizon, horizon)
|
||||
|
||||
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
|
||||
last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)
|
||||
|
||||
collector_module = module if module else collectors
|
||||
collector_list = [
|
||||
func
|
||||
for name, func in inspect.getmembers(collector_module)
|
||||
if inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and (not subset or name in subset)
|
||||
]
|
||||
if not any(c.__awx_analytics_key__ == 'config' for c in collector_list):
|
||||
# In order to ship to analytics, we must include the output of the built-in 'config' collector.
|
||||
collector_list.append(collectors.config)
|
||||
|
||||
json_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'json']
|
||||
csv_collectors = [func for func in collector_list if func.__awx_analytics_type__ == 'csv']
|
||||
|
||||
dest = pathlib.Path(dest or tempfile.mkdtemp(prefix='awx_analytics'))
|
||||
gather_dir = dest.joinpath('stage')
|
||||
gather_dir.mkdir(mode=0o700)
|
||||
tarfiles = []
|
||||
succeeded = True
|
||||
|
||||
# These json collectors are pretty compact, so collect all of them before shipping to analytics.
|
||||
data = {}
|
||||
for func in json_collectors:
|
||||
key = func.__awx_analytics_key__
|
||||
filename = f'{key}.json'
|
||||
try:
|
||||
files = func(last_run, full_path=gather_dir, until=until)
|
||||
if files:
|
||||
manifest['{}.csv'.format(key)] = func.__awx_analytics_version__
|
||||
if len(files) > num_splits:
|
||||
num_splits = len(files)
|
||||
last_entry = max(last_entries.get(key) or last_gather, horizon)
|
||||
results = (func(since or last_entry, collection_type=collection_type, until=until), func.__awx_analytics_version__)
|
||||
json.dumps(results) # throwaway check to see if the data is json-serializable
|
||||
data[filename] = results
|
||||
except Exception:
|
||||
logger.exception("Could not generate metric {}.csv".format(key))
|
||||
|
||||
if not manifest:
|
||||
# No data was collected
|
||||
logger.warning("No data from {} to {}".format(last_run, until))
|
||||
shutil.rmtree(dest)
|
||||
return None
|
||||
|
||||
# Always include config.json if we're using our collectors
|
||||
if 'config.json' not in manifest.keys() and not module:
|
||||
from awx.main.analytics import collectors
|
||||
|
||||
config = collectors.config
|
||||
path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__))
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
try:
|
||||
json.dump(collectors.config(last_run), f)
|
||||
manifest['config.json'] = config.__awx_analytics_version__
|
||||
except Exception:
|
||||
logger.exception("Could not generate metric {}.json".format(key))
|
||||
f.close()
|
||||
os.remove(f.name)
|
||||
shutil.rmtree(dest)
|
||||
logger.exception("Could not generate metric {}".format(filename))
|
||||
if data:
|
||||
if data.get('config.json') is None:
|
||||
logger.error("'config' collector data is missing.")
|
||||
return None
|
||||
|
||||
stage_dirs = [gather_dir]
|
||||
if num_splits > 1:
|
||||
for i in range(0, num_splits):
|
||||
split_path = os.path.join(dest, 'split{}'.format(i))
|
||||
os.mkdir(split_path, 0o700)
|
||||
filtered_manifest = {}
|
||||
shutil.copy(os.path.join(gather_dir, 'config.json'), split_path)
|
||||
filtered_manifest['config.json'] = manifest['config.json']
|
||||
suffix = '_split{}'.format(i)
|
||||
for file in os.listdir(gather_dir):
|
||||
if file.endswith(suffix):
|
||||
old_file = os.path.join(gather_dir, file)
|
||||
new_filename = file.replace(suffix, '')
|
||||
new_file = os.path.join(split_path, new_filename)
|
||||
shutil.move(old_file, new_file)
|
||||
filtered_manifest[new_filename] = manifest[new_filename]
|
||||
_write_manifest(split_path, filtered_manifest)
|
||||
stage_dirs.append(split_path)
|
||||
tgzfile = package(dest.parent, data, until)
|
||||
if tgzfile is not None:
|
||||
tarfiles.append(tgzfile)
|
||||
if collection_type != 'dry-run':
|
||||
if ship(tgzfile):
|
||||
with disable_activity_stream():
|
||||
for filename in data:
|
||||
key = filename.replace('.json', '')
|
||||
last_entries[key] = max(last_entries[key], until) if last_entries.get(key) else until
|
||||
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
|
||||
else:
|
||||
succeeded = False
|
||||
|
||||
for item in list(manifest.keys()):
|
||||
if not os.path.exists(os.path.join(gather_dir, item)):
|
||||
manifest.pop(item)
|
||||
_write_manifest(gather_dir, manifest)
|
||||
for func in csv_collectors:
|
||||
key = func.__awx_analytics_key__
|
||||
filename = f'{key}.csv'
|
||||
try:
|
||||
# These slicer functions may return a generator. The `since` parameter is
|
||||
# allowed to be None, and will fall back to LAST_ENTRIES[key] or to
|
||||
# LAST_GATHER (truncated appropriately to match the 4-week limit).
|
||||
if func.__awx_expensive__:
|
||||
slices = func.__awx_expensive__(key, since, until)
|
||||
else:
|
||||
slices = collectors.trivial_slicing(key, since, until)
|
||||
|
||||
tarfiles = []
|
||||
try:
|
||||
for i in range(0, len(stage_dirs)):
|
||||
stage_dir = stage_dirs[i]
|
||||
# can't use isoformat() since it has colons, which GNU tar doesn't like
|
||||
tarname = '_'.join([settings.SYSTEM_UUID, until.strftime('%Y-%m-%d-%H%M%S%z'), str(i)])
|
||||
tgz = shutil.make_archive(os.path.join(os.path.dirname(dest), tarname), 'gztar', stage_dir)
|
||||
tarfiles.append(tgz)
|
||||
except Exception:
|
||||
shutil.rmtree(stage_dir, ignore_errors=True)
|
||||
logger.exception("Failed to write analytics archive file")
|
||||
finally:
|
||||
shutil.rmtree(dest, ignore_errors=True)
|
||||
return tarfiles
|
||||
for start, end in slices:
|
||||
files = func(start, full_path=gather_dir, until=end)
|
||||
|
||||
if not files:
|
||||
if collection_type != 'dry-run':
|
||||
with disable_activity_stream():
|
||||
last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end
|
||||
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
|
||||
continue
|
||||
|
||||
slice_succeeded = True
|
||||
for fpath in files:
|
||||
payload = {filename: (fpath, func.__awx_analytics_version__)}
|
||||
|
||||
payload['config.json'] = data.get('config.json')
|
||||
if payload['config.json'] is None:
|
||||
logger.error("'config' collector data is missing, and is required to ship.")
|
||||
return None
|
||||
|
||||
tgzfile = package(dest.parent, payload, until)
|
||||
if tgzfile is not None:
|
||||
tarfiles.append(tgzfile)
|
||||
if not ship(tgzfile):
|
||||
slice_succeeded, succeeded = False, False
|
||||
break
|
||||
|
||||
if slice_succeeded and collection_type != 'dry-run':
|
||||
with disable_activity_stream():
|
||||
last_entries[key] = max(last_entries[key], end) if last_entries.get(key) else end
|
||||
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
|
||||
except Exception:
|
||||
succeeded = False
|
||||
logger.exception("Could not generate metric {}".format(filename))
|
||||
|
||||
if collection_type != 'dry-run':
|
||||
if succeeded:
|
||||
for fpath in tarfiles:
|
||||
if os.path.exists(fpath):
|
||||
os.remove(fpath)
|
||||
with disable_activity_stream():
|
||||
if not settings.AUTOMATION_ANALYTICS_LAST_GATHER or until > settings.AUTOMATION_ANALYTICS_LAST_GATHER:
|
||||
settings.AUTOMATION_ANALYTICS_LAST_GATHER = until
|
||||
|
||||
shutil.rmtree(dest, ignore_errors=True) # clean up individual artifact files
|
||||
if not tarfiles:
|
||||
# No data was collected
|
||||
logger.warning("No data from {} to {}".format(since or last_gather, until))
|
||||
return None
|
||||
|
||||
return tarfiles
|
||||
|
||||
|
||||
def ship(path):
|
||||
@@ -220,42 +284,38 @@ def ship(path):
|
||||
"""
|
||||
if not path:
|
||||
logger.error('Automation Analytics TAR not found')
|
||||
return
|
||||
return False
|
||||
if not os.path.exists(path):
|
||||
logger.error('Automation Analytics TAR {} not found'.format(path))
|
||||
return
|
||||
return False
|
||||
if "Error:" in str(path):
|
||||
return
|
||||
try:
|
||||
logger.debug('shipping analytics file: {}'.format(path))
|
||||
url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None)
|
||||
if not url:
|
||||
logger.error('AUTOMATION_ANALYTICS_URL is not set')
|
||||
return
|
||||
rh_user = getattr(settings, 'REDHAT_USERNAME', None)
|
||||
rh_password = getattr(settings, 'REDHAT_PASSWORD', None)
|
||||
if not rh_user:
|
||||
return logger.error('REDHAT_USERNAME is not set')
|
||||
if not rh_password:
|
||||
return logger.error('REDHAT_PASSWORD is not set')
|
||||
with open(path, 'rb') as f:
|
||||
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
|
||||
s = requests.Session()
|
||||
s.headers = get_awx_http_client_headers()
|
||||
s.headers.pop('Content-Type')
|
||||
with set_environ(**settings.AWX_TASK_ENV):
|
||||
response = s.post(
|
||||
url,
|
||||
files=files,
|
||||
verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
|
||||
auth=(rh_user, rh_password),
|
||||
headers=s.headers,
|
||||
timeout=(31, 31),
|
||||
)
|
||||
# Accept 2XX status_codes
|
||||
if response.status_code >= 300:
|
||||
return logger.exception('Upload failed with status {}, {}'.format(response.status_code, response.text))
|
||||
finally:
|
||||
# cleanup tar.gz
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
return False
|
||||
|
||||
logger.debug('shipping analytics file: {}'.format(path))
|
||||
url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None)
|
||||
if not url:
|
||||
logger.error('AUTOMATION_ANALYTICS_URL is not set')
|
||||
return False
|
||||
rh_user = getattr(settings, 'REDHAT_USERNAME', None)
|
||||
rh_password = getattr(settings, 'REDHAT_PASSWORD', None)
|
||||
if not rh_user:
|
||||
logger.error('REDHAT_USERNAME is not set')
|
||||
return False
|
||||
if not rh_password:
|
||||
logger.error('REDHAT_PASSWORD is not set')
|
||||
return False
|
||||
with open(path, 'rb') as f:
|
||||
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
|
||||
s = requests.Session()
|
||||
s.headers = get_awx_http_client_headers()
|
||||
s.headers.pop('Content-Type')
|
||||
with set_environ(**settings.AWX_TASK_ENV):
|
||||
response = s.post(
|
||||
url, files=files, verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31)
|
||||
)
|
||||
# Accept 2XX status_codes
|
||||
if response.status_code >= 300:
|
||||
logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text))
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@@ -778,6 +778,15 @@ register(
|
||||
category=_('System'),
|
||||
category_slug='system',
|
||||
)
|
||||
register(
|
||||
'AUTOMATION_ANALYTICS_LAST_ENTRIES',
|
||||
field_class=fields.CharField,
|
||||
label=_('Last gathered entries for expensive Automation Analytics collectors.'),
|
||||
default='',
|
||||
allow_blank=True,
|
||||
category=_('System'),
|
||||
category_slug='system',
|
||||
)
|
||||
|
||||
|
||||
register(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import logging
|
||||
|
||||
from awx.main.analytics import gather, ship
|
||||
from awx.main import analytics
|
||||
from dateutil import parser
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.utils.timezone import now
|
||||
@@ -48,13 +48,9 @@ class Command(BaseCommand):
|
||||
if opt_ship and opt_dry_run:
|
||||
self.logger.error('Both --ship and --dry-run cannot be processed at the same time.')
|
||||
return
|
||||
tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since=since, until=until)
|
||||
tgzfiles = analytics.gather(collection_type='manual' if opt_ship else 'dry-run', since=since, until=until)
|
||||
if tgzfiles:
|
||||
for tgz in tgzfiles:
|
||||
self.logger.info(tgz)
|
||||
else:
|
||||
self.logger.error('No analytics collected')
|
||||
if opt_ship:
|
||||
if tgzfiles:
|
||||
for tgz in tgzfiles:
|
||||
ship(tgz)
|
||||
|
||||
@@ -57,7 +57,6 @@ from receptorctl.socket_interface import ReceptorControl
|
||||
from awx import __version__ as awx_application_version
|
||||
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
|
||||
from awx.main.access import access_registry
|
||||
from awx.main.analytics import all_collectors, expensive_collectors
|
||||
from awx.main.redact import UriCleaner
|
||||
from awx.main.models import (
|
||||
Schedule,
|
||||
@@ -377,70 +376,15 @@ def send_notifications(notification_list, job_id=None):
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
def gather_analytics():
|
||||
def _gather_and_ship(subset, since, until):
|
||||
tgzfiles = []
|
||||
try:
|
||||
tgzfiles = analytics.gather(subset=subset, since=since, until=until)
|
||||
# empty analytics without raising an exception is not an error
|
||||
if not tgzfiles:
|
||||
return True
|
||||
logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles))
|
||||
for tgz in tgzfiles:
|
||||
analytics.ship(tgz)
|
||||
except Exception:
|
||||
logger.exception('Error gathering and sending analytics for {} to {}.'.format(since, until))
|
||||
return False
|
||||
finally:
|
||||
if tgzfiles:
|
||||
for tgz in tgzfiles:
|
||||
if os.path.exists(tgz):
|
||||
os.remove(tgz)
|
||||
return True
|
||||
|
||||
from awx.conf.models import Setting
|
||||
from rest_framework.fields import DateTimeField
|
||||
from awx.main.signals import disable_activity_stream
|
||||
|
||||
if not settings.INSIGHTS_TRACKING_STATE:
|
||||
return
|
||||
if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD):
|
||||
logger.debug('Not gathering analytics, configuration is invalid')
|
||||
return
|
||||
last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first()
|
||||
if last_gather:
|
||||
last_time = DateTimeField().to_internal_value(last_gather.value)
|
||||
else:
|
||||
last_time = None
|
||||
last_time = DateTimeField().to_internal_value(last_gather.value) if last_gather else None
|
||||
gather_time = now()
|
||||
|
||||
if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
|
||||
with advisory_lock('gather_analytics_lock', wait=False) as acquired:
|
||||
if acquired is False:
|
||||
logger.debug('Not gathering analytics, another task holds lock')
|
||||
return
|
||||
subset = list(all_collectors().keys())
|
||||
incremental_collectors = []
|
||||
for collector in expensive_collectors():
|
||||
if collector in subset:
|
||||
subset.remove(collector)
|
||||
incremental_collectors.append(collector)
|
||||
|
||||
# Cap gathering at 4 weeks of data if there has been no data gathering
|
||||
since = last_time or (gather_time - timedelta(weeks=4))
|
||||
|
||||
if incremental_collectors:
|
||||
start = since
|
||||
until = None
|
||||
while start < gather_time:
|
||||
until = start + timedelta(hours=4)
|
||||
if until > gather_time:
|
||||
until = gather_time
|
||||
if not _gather_and_ship(incremental_collectors, since=start, until=until):
|
||||
break
|
||||
start = until
|
||||
with disable_activity_stream():
|
||||
settings.AUTOMATION_ANALYTICS_LAST_GATHER = until
|
||||
if subset:
|
||||
_gather_and_ship(subset, since=since, until=gather_time)
|
||||
analytics.gather()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
|
||||
@@ -39,7 +39,7 @@ def mock_valid_license():
|
||||
def test_gather(mock_valid_license):
|
||||
settings.INSIGHTS_TRACKING_STATE = True
|
||||
|
||||
tgzfiles = gather(module=importlib.import_module(__name__))
|
||||
tgzfiles = gather(module=importlib.import_module(__name__), collection_type='dry-run')
|
||||
files = {}
|
||||
with tarfile.open(tgzfiles[0], "r:gz") as archive:
|
||||
for member in archive.getmembers():
|
||||
|
||||
@@ -20,6 +20,7 @@ from decimal import Decimal
|
||||
|
||||
# Django
|
||||
from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.functional import cached_property
|
||||
from django.db.models.fields.related import ForeignObjectRel, ManyToManyField
|
||||
@@ -54,6 +55,7 @@ __all__ = [
|
||||
'copy_m2m_relationships',
|
||||
'prefetch_page_capabilities',
|
||||
'to_python_boolean',
|
||||
'datetime_hook',
|
||||
'ignore_inventory_computed_fields',
|
||||
'ignore_inventory_group_removal',
|
||||
'_inventory_updates',
|
||||
@@ -115,6 +117,16 @@ def to_python_boolean(value, allow_none=False):
|
||||
raise ValueError(_(u'Unable to convert "%s" to boolean') % value)
|
||||
|
||||
|
||||
def datetime_hook(d):
|
||||
new_d = {}
|
||||
for key, value in d.items():
|
||||
try:
|
||||
new_d[key] = parse_datetime(value)
|
||||
except TypeError:
|
||||
new_d[key] = value
|
||||
return new_d
|
||||
|
||||
|
||||
def camelcase_to_underscore(s):
|
||||
"""
|
||||
Convert CamelCase names to lowercase_with_underscore.
|
||||
|
||||
@@ -620,6 +620,8 @@ INSIGHTS_TRACKING_STATE = False
|
||||
|
||||
# Last gather date for Analytics
|
||||
AUTOMATION_ANALYTICS_LAST_GATHER = None
|
||||
# Last gathered entries for expensive Analytics
|
||||
AUTOMATION_ANALYTICS_LAST_ENTRIES = ''
|
||||
|
||||
# Default list of modules allowed for ad hoc commands.
|
||||
# Note: This setting may be overridden by database settings.
|
||||
|
||||
Reference in New Issue
Block a user