From c753324872abc24363df10641eec9fa93fa0e024 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Tue, 28 Jul 2020 16:32:32 -0400 Subject: [PATCH] Move back to less frequent collections, and split large event tables This should ensure we stay under 100MB at all times. --- awx/main/analytics/collectors.py | 60 +++++++++--- awx/main/analytics/core.py | 95 +++++++++++++------ .../management/commands/gather_analytics.py | 31 +++++- awx/main/tasks.py | 18 ++-- .../tests/functional/analytics/test_core.py | 7 +- 5 files changed, 158 insertions(+), 53 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 45afdc7152..fe823764b8 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -1,3 +1,4 @@ +import io import os import os.path import platform @@ -227,21 +228,58 @@ def query_info(since, collection_type, **kwargs): return query_info +''' +The event table can be *very* large, and we have a 100MB upload limit. + +Split large table dumps at dump time into a series of files. +''' +MAX_TABLE_SIZE = 200 * 1048576 + + +class FileSplitter(io.StringIO): + def __init__(self, filespec=None, *args, **kwargs): + self.filespec = filespec + self.files = [] + self.currentfile = None + self.header = None + self.counter = 0 + self.cycle_file() + + def cycle_file(self): + if self.currentfile: + self.currentfile.close() + self.counter = 0 + fname = '{}_split{}'.format(self.filespec, len(self.files)) + self.currentfile = open(fname, 'w', encoding='utf-8') + self.files.append(fname) + if self.header: + self.currentfile.write('{}\n'.format(self.header)) + + def file_list(self): + self.currentfile.close() + # Check for an empty dump + if len(self.header) + 1 == self.counter: + os.remove(self.files[-1]) + self.files = self.files[:-1] + # If we only have one file, remove the suffix + if len(self.files) == 1: + os.rename(self.files[0],self.files[0].replace('_split0','')) + return self.files + + def write(self, s): + if not self.header: + self.header = s[0:s.index('\n')] + self.counter += self.currentfile.write(s) + if self.counter >= MAX_TABLE_SIZE: + self.cycle_file() + + def _copy_table(table, query, path): file_path = os.path.join(path, table + '_table.csv') - file = open(file_path, 'w', encoding='utf-8') + file = FileSplitter(filespec=file_path) with connection.cursor() as cursor: cursor.copy_expert(query, file) - file.close() - # Ensure we actually dumped data, and not just headers - file = open(file_path, 'r', encoding='utf-8') - file.readline() - data = file.readline() - file.close() - if not data: - os.remove(file_path) - return None - return file_path + return file.file_list() @register('events_table', '1.1', format='csv', description=_('Automation task records'), expensive=True) diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index 98fa8369d8..7179d89969 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -90,6 +90,15 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c :param module: the module to search for registered analytic collector functions; defaults to awx.main.analytics.collectors """ + def _write_manifest(destdir, manifest): + path = os.path.join(destdir, 'manifest.json') + with open(path, 'w', encoding='utf-8') as f: + try: + json.dump(manifest, f) + except Exception: + f.close() + os.remove(f.name) + logger.exception("Could not generate manifest.json") last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4)) logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER)) @@ -103,10 +112,12 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c return collector_list = [] - if module is None: + if module: + collector_module = module + else: from awx.main.analytics import collectors - module = collectors - for name, func in inspect.getmembers(module): + collector_module = collectors + for name, func in inspect.getmembers(collector_module): if ( inspect.isfunction(func) and hasattr(func, '__awx_analytics_key__') and @@ -116,10 +127,13 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c manifest = dict() dest = dest or tempfile.mkdtemp(prefix='awx_analytics') + gather_dir = os.path.join(dest, 'stage') + os.mkdir(gather_dir, 0o700) + num_splits = 1 for name, func in collector_list: if func.__awx_analytics_type__ == 'json': key = func.__awx_analytics_key__ - path = '{}.json'.format(os.path.join(dest, key)) + path = '{}.json'.format(os.path.join(gather_dir, key)) with open(path, 'w', encoding='utf-8') as f: try: json.dump(func(last_run, collection_type=collection_type, until=until), f) @@ -131,8 +145,11 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c elif func.__awx_analytics_type__ == 'csv': key = func.__awx_analytics_key__ try: - if func(last_run, full_path=dest, until=until): + files = func(last_run, full_path=gather_dir, until=until) + if files: manifest['{}.csv'.format(key)] = func.__awx_analytics_version__ + if len(files) > num_splits: + num_splits = len(files) except Exception: logger.exception("Could not generate metric {}.csv".format(key)) @@ -145,11 +162,12 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c # Always include config.json if we're using our collectors if 'config.json' not in manifest.keys() and not module: from awx.main.analytics import collectors - path = '{}.json'.format(os.path.join(dest, key)) + config = collectors.config + path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__)) with open(path, 'w', encoding='utf-8') as f: try: json.dump(collectors.config(last_run), f) - manifest['config.json'] = collectors.config.__awx_analytics_version__ + manifest['config.json'] = config.__awx_analytics_version__ except Exception: logger.exception("Could not generate metric {}.json".format(key)) f.close() @@ -157,31 +175,52 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c shutil.rmtree(dest) return None - path = os.path.join(dest, 'manifest.json') - with open(path, 'w', encoding='utf-8') as f: - try: - json.dump(manifest, f) - except Exception: - logger.exception("Could not generate manifest.json") - f.close() - os.remove(f.name) + stage_dirs = [gather_dir] + if num_splits > 1: + for i in range(0, num_splits): + split_path = os.path.join(dest, 'split{}'.format(i)) + os.mkdir(split_path, 0o700) + filtered_manifest = {} + shutil.copy(os.path.join(gather_dir, 'config.json'), split_path) + filtered_manifest['config.json'] = manifest['config.json'] + suffix = '_split{}'.format(i) + for file in os.listdir(gather_dir): + if file.endswith(suffix): + old_file = os.path.join(gather_dir, file) + new_filename = file.replace(suffix, '') + new_file = os.path.join(split_path, new_filename) + shutil.move(old_file, new_file) + filtered_manifest[new_filename] = manifest[new_filename] + _write_manifest(split_path, filtered_manifest) + stage_dirs.append(split_path) - # can't use isoformat() since it has colons, which GNU tar doesn't like - tarname = '_'.join([ - settings.SYSTEM_UUID, - until.strftime('%Y-%m-%d-%H%M%S%z') - ]) + for item in list(manifest.keys()): + if not os.path.exists(os.path.join(gather_dir, item)): + manifest.pop(item) + _write_manifest(gather_dir, manifest) + + tarfiles = [] try: - tgz = shutil.make_archive( - os.path.join(os.path.dirname(dest), tarname), - 'gztar', - dest - ) - return tgz + for i in range(0, len(stage_dirs)): + stage_dir = stage_dirs[i] + # can't use isoformat() since it has colons, which GNU tar doesn't like + tarname = '_'.join([ + settings.SYSTEM_UUID, + until.strftime('%Y-%m-%d-%H%M%S%z'), + str(i) + ]) + tgz = shutil.make_archive( + os.path.join(os.path.dirname(dest), tarname), + 'gztar', + stage_dir + ) + tarfiles.append(tgz) except Exception: + shutil.rmtree(stage_dir, ignore_errors = True) logger.exception("Failed to write analytics archive file") - finally: - shutil.rmtree(dest) + finally: + shutil.rmtree(dest, ignore_errors = True) + return tarfiles def ship(path): diff --git a/awx/main/management/commands/gather_analytics.py b/awx/main/management/commands/gather_analytics.py index aa096d6f28..6bfa3b1674 100644 --- a/awx/main/management/commands/gather_analytics.py +++ b/awx/main/management/commands/gather_analytics.py @@ -1,6 +1,9 @@ import logging + from awx.main.analytics import gather, ship +from dateutil import parser from django.core.management.base import BaseCommand +from django.utils.timezone import now class Command(BaseCommand): @@ -15,6 +18,10 @@ class Command(BaseCommand): help='Gather analytics without shipping. Works even if analytics are disabled in settings.') parser.add_argument('--ship', dest='ship', action='store_true', help='Enable to ship metrics to the Red Hat Cloud') + parser.add_argument('--since', dest='since', action='store', + help='Start date for collection') + parser.add_argument('--until', dest='until', action='store', + help='End date for collection') def init_logging(self): self.logger = logging.getLogger('awx.main.analytics') @@ -28,11 +35,27 @@ class Command(BaseCommand): self.init_logging() opt_ship = options.get('ship') opt_dry_run = options.get('dry-run') + opt_since = options.get('since') or None + opt_until = options.get('until') or None + + if opt_since: + since = parser.parse(opt_since) + else: + since = None + if opt_until: + until = parser.parse(opt_until) + else: + until = now() + if opt_ship and opt_dry_run: self.logger.error('Both --ship and --dry-run cannot be processed at the same time.') return - tgz = gather(collection_type='manual' if not opt_dry_run else 'dry-run') - if tgz: - self.logger.debug(tgz) + tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since = since, until = until) + if tgzfiles: + self.logger.debug(tgzfiles) + else: + self.logger.error('No analytics collected') if opt_ship: - ship(tgz) + if tgzfiles: + for tgz in tgzfiles: + ship(tgz) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 17f6dd27c4..80013000a6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -357,19 +357,23 @@ def send_notifications(notification_list, job_id=None): @task(queue=get_local_queuename) def gather_analytics(): def _gather_and_ship(subset, since, until): + tgzfiles = [] try: - tgz = analytics.gather(subset=subset, since=since, until=until) + tgzfiles = analytics.gather(subset=subset, since=since, until=until) # empty analytics without raising an exception is not an error - if not tgz: + if not tgzfiles: return True - logger.info('gathered analytics: {}'.format(tgz)) - analytics.ship(tgz) + logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles)) + for tgz in tgzfiles: + analytics.ship(tgz) except Exception: logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until)) return False finally: - if os.path.exists(tgz): - os.remove(tgz) + if tgzfiles: + for tgz in tgzfiles: + if os.path.exists(tgz): + os.remove(tgz) return True from awx.conf.models import Setting @@ -404,7 +408,7 @@ def gather_analytics(): start = since until = None while start < gather_time: - until = start + timedelta(minutes=20) + until = start + timedelta(hours = 4) if (until > gather_time): until = gather_time if not _gather_and_ship(incremental_collectors, since=start, until=until): diff --git a/awx/main/tests/functional/analytics/test_core.py b/awx/main/tests/functional/analytics/test_core.py index 190eeac6b3..f3cc1fcd4b 100644 --- a/awx/main/tests/functional/analytics/test_core.py +++ b/awx/main/tests/functional/analytics/test_core.py @@ -39,9 +39,9 @@ def mock_valid_license(): def test_gather(mock_valid_license): settings.INSIGHTS_TRACKING_STATE = True - tgz = gather(module=importlib.import_module(__name__)) + tgzfiles = gather(module=importlib.import_module(__name__)) files = {} - with tarfile.open(tgz, "r:gz") as archive: + with tarfile.open(tgzfiles[0], "r:gz") as archive: for member in archive.getmembers(): files[member.name] = archive.extractfile(member) @@ -53,7 +53,8 @@ def test_gather(mock_valid_license): assert './bad_json.json' not in files.keys() assert './throws_error.json' not in files.keys() try: - os.remove(tgz) + for tgz in tgzfiles: + os.remove(tgz) except Exception: pass