Move back to less frequent collections, and split large event tables

This should ensure we stay under 100MB at all times.
This commit is contained in:
Bill Nottingham
2020-07-28 16:32:32 -04:00
parent 9f67b6742c
commit c753324872
5 changed files with 158 additions and 53 deletions

View File

@@ -1,3 +1,4 @@
import io
import os
import os.path
import platform
@@ -227,21 +228,58 @@ def query_info(since, collection_type, **kwargs):
return query_info
'''
The event table can be *very* large, and we have a 100MB upload limit.
Split large table dumps at dump time into a series of files.
'''
MAX_TABLE_SIZE = 200 * 1048576
class FileSplitter(io.StringIO):
def __init__(self, filespec=None, *args, **kwargs):
self.filespec = filespec
self.files = []
self.currentfile = None
self.header = None
self.counter = 0
self.cycle_file()
def cycle_file(self):
if self.currentfile:
self.currentfile.close()
self.counter = 0
fname = '{}_split{}'.format(self.filespec, len(self.files))
self.currentfile = open(fname, 'w', encoding='utf-8')
self.files.append(fname)
if self.header:
self.currentfile.write('{}\n'.format(self.header))
def file_list(self):
self.currentfile.close()
# Check for an empty dump
if len(self.header) + 1 == self.counter:
os.remove(self.files[-1])
self.files = self.files[:-1]
# If we only have one file, remove the suffix
if len(self.files) == 1:
os.rename(self.files[0],self.files[0].replace('_split0',''))
return self.files
def write(self, s):
if not self.header:
self.header = s[0:s.index('\n')]
self.counter += self.currentfile.write(s)
if self.counter >= MAX_TABLE_SIZE:
self.cycle_file()
def _copy_table(table, query, path):
file_path = os.path.join(path, table + '_table.csv')
file = open(file_path, 'w', encoding='utf-8')
file = FileSplitter(filespec=file_path)
with connection.cursor() as cursor:
cursor.copy_expert(query, file)
file.close()
# Ensure we actually dumped data, and not just headers
file = open(file_path, 'r', encoding='utf-8')
file.readline()
data = file.readline()
file.close()
if not data:
os.remove(file_path)
return None
return file_path
return file.file_list()
@register('events_table', '1.1', format='csv', description=_('Automation task records'), expensive=True)

View File

@@ -90,6 +90,15 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c
:param module: the module to search for registered analytic collector
functions; defaults to awx.main.analytics.collectors
"""
def _write_manifest(destdir, manifest):
path = os.path.join(destdir, 'manifest.json')
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(manifest, f)
except Exception:
f.close()
os.remove(f.name)
logger.exception("Could not generate manifest.json")
last_run = since or settings.AUTOMATION_ANALYTICS_LAST_GATHER or (now() - timedelta(weeks=4))
logger.debug("Last analytics run was: {}".format(settings.AUTOMATION_ANALYTICS_LAST_GATHER))
@@ -103,10 +112,12 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c
return
collector_list = []
if module is None:
if module:
collector_module = module
else:
from awx.main.analytics import collectors
module = collectors
for name, func in inspect.getmembers(module):
collector_module = collectors
for name, func in inspect.getmembers(collector_module):
if (
inspect.isfunction(func) and
hasattr(func, '__awx_analytics_key__') and
@@ -116,10 +127,13 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c
manifest = dict()
dest = dest or tempfile.mkdtemp(prefix='awx_analytics')
gather_dir = os.path.join(dest, 'stage')
os.mkdir(gather_dir, 0o700)
num_splits = 1
for name, func in collector_list:
if func.__awx_analytics_type__ == 'json':
key = func.__awx_analytics_key__
path = '{}.json'.format(os.path.join(dest, key))
path = '{}.json'.format(os.path.join(gather_dir, key))
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(func(last_run, collection_type=collection_type, until=until), f)
@@ -131,8 +145,11 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c
elif func.__awx_analytics_type__ == 'csv':
key = func.__awx_analytics_key__
try:
if func(last_run, full_path=dest, until=until):
files = func(last_run, full_path=gather_dir, until=until)
if files:
manifest['{}.csv'.format(key)] = func.__awx_analytics_version__
if len(files) > num_splits:
num_splits = len(files)
except Exception:
logger.exception("Could not generate metric {}.csv".format(key))
@@ -145,11 +162,12 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c
# Always include config.json if we're using our collectors
if 'config.json' not in manifest.keys() and not module:
from awx.main.analytics import collectors
path = '{}.json'.format(os.path.join(dest, key))
config = collectors.config
path = '{}.json'.format(os.path.join(gather_dir, config.__awx_analytics_key__))
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(collectors.config(last_run), f)
manifest['config.json'] = collectors.config.__awx_analytics_version__
manifest['config.json'] = config.__awx_analytics_version__
except Exception:
logger.exception("Could not generate metric {}.json".format(key))
f.close()
@@ -157,31 +175,52 @@ def gather(dest=None, module=None, subset = None, since = None, until = now(), c
shutil.rmtree(dest)
return None
path = os.path.join(dest, 'manifest.json')
with open(path, 'w', encoding='utf-8') as f:
try:
json.dump(manifest, f)
except Exception:
logger.exception("Could not generate manifest.json")
f.close()
os.remove(f.name)
stage_dirs = [gather_dir]
if num_splits > 1:
for i in range(0, num_splits):
split_path = os.path.join(dest, 'split{}'.format(i))
os.mkdir(split_path, 0o700)
filtered_manifest = {}
shutil.copy(os.path.join(gather_dir, 'config.json'), split_path)
filtered_manifest['config.json'] = manifest['config.json']
suffix = '_split{}'.format(i)
for file in os.listdir(gather_dir):
if file.endswith(suffix):
old_file = os.path.join(gather_dir, file)
new_filename = file.replace(suffix, '')
new_file = os.path.join(split_path, new_filename)
shutil.move(old_file, new_file)
filtered_manifest[new_filename] = manifest[new_filename]
_write_manifest(split_path, filtered_manifest)
stage_dirs.append(split_path)
# can't use isoformat() since it has colons, which GNU tar doesn't like
tarname = '_'.join([
settings.SYSTEM_UUID,
until.strftime('%Y-%m-%d-%H%M%S%z')
])
for item in list(manifest.keys()):
if not os.path.exists(os.path.join(gather_dir, item)):
manifest.pop(item)
_write_manifest(gather_dir, manifest)
tarfiles = []
try:
tgz = shutil.make_archive(
os.path.join(os.path.dirname(dest), tarname),
'gztar',
dest
)
return tgz
for i in range(0, len(stage_dirs)):
stage_dir = stage_dirs[i]
# can't use isoformat() since it has colons, which GNU tar doesn't like
tarname = '_'.join([
settings.SYSTEM_UUID,
until.strftime('%Y-%m-%d-%H%M%S%z'),
str(i)
])
tgz = shutil.make_archive(
os.path.join(os.path.dirname(dest), tarname),
'gztar',
stage_dir
)
tarfiles.append(tgz)
except Exception:
shutil.rmtree(stage_dir, ignore_errors = True)
logger.exception("Failed to write analytics archive file")
finally:
shutil.rmtree(dest)
finally:
shutil.rmtree(dest, ignore_errors = True)
return tarfiles
def ship(path):

View File

@@ -1,6 +1,9 @@
import logging
from awx.main.analytics import gather, ship
from dateutil import parser
from django.core.management.base import BaseCommand
from django.utils.timezone import now
class Command(BaseCommand):
@@ -15,6 +18,10 @@ class Command(BaseCommand):
help='Gather analytics without shipping. Works even if analytics are disabled in settings.')
parser.add_argument('--ship', dest='ship', action='store_true',
help='Enable to ship metrics to the Red Hat Cloud')
parser.add_argument('--since', dest='since', action='store',
help='Start date for collection')
parser.add_argument('--until', dest='until', action='store',
help='End date for collection')
def init_logging(self):
self.logger = logging.getLogger('awx.main.analytics')
@@ -28,11 +35,27 @@ class Command(BaseCommand):
self.init_logging()
opt_ship = options.get('ship')
opt_dry_run = options.get('dry-run')
opt_since = options.get('since') or None
opt_until = options.get('until') or None
if opt_since:
since = parser.parse(opt_since)
else:
since = None
if opt_until:
until = parser.parse(opt_until)
else:
until = now()
if opt_ship and opt_dry_run:
self.logger.error('Both --ship and --dry-run cannot be processed at the same time.')
return
tgz = gather(collection_type='manual' if not opt_dry_run else 'dry-run')
if tgz:
self.logger.debug(tgz)
tgzfiles = gather(collection_type='manual' if not opt_dry_run else 'dry-run', since = since, until = until)
if tgzfiles:
self.logger.debug(tgzfiles)
else:
self.logger.error('No analytics collected')
if opt_ship:
ship(tgz)
if tgzfiles:
for tgz in tgzfiles:
ship(tgz)

View File

@@ -357,19 +357,23 @@ def send_notifications(notification_list, job_id=None):
@task(queue=get_local_queuename)
def gather_analytics():
def _gather_and_ship(subset, since, until):
tgzfiles = []
try:
tgz = analytics.gather(subset=subset, since=since, until=until)
tgzfiles = analytics.gather(subset=subset, since=since, until=until)
# empty analytics without raising an exception is not an error
if not tgz:
if not tgzfiles:
return True
logger.info('gathered analytics: {}'.format(tgz))
analytics.ship(tgz)
logger.info('Gathered analytics from {} to {}: {}'.format(since, until, tgzfiles))
for tgz in tgzfiles:
analytics.ship(tgz)
except Exception:
logger.exception('Error gathering and sending analytics for {} to {}.'.format(since,until))
return False
finally:
if os.path.exists(tgz):
os.remove(tgz)
if tgzfiles:
for tgz in tgzfiles:
if os.path.exists(tgz):
os.remove(tgz)
return True
from awx.conf.models import Setting
@@ -404,7 +408,7 @@ def gather_analytics():
start = since
until = None
while start < gather_time:
until = start + timedelta(minutes=20)
until = start + timedelta(hours = 4)
if (until > gather_time):
until = gather_time
if not _gather_and_ship(incremental_collectors, since=start, until=until):

View File

@@ -39,9 +39,9 @@ def mock_valid_license():
def test_gather(mock_valid_license):
settings.INSIGHTS_TRACKING_STATE = True
tgz = gather(module=importlib.import_module(__name__))
tgzfiles = gather(module=importlib.import_module(__name__))
files = {}
with tarfile.open(tgz, "r:gz") as archive:
with tarfile.open(tgzfiles[0], "r:gz") as archive:
for member in archive.getmembers():
files[member.name] = archive.extractfile(member)
@@ -53,7 +53,8 @@ def test_gather(mock_valid_license):
assert './bad_json.json' not in files.keys()
assert './throws_error.json' not in files.keys()
try:
os.remove(tgz)
for tgz in tgzfiles:
os.remove(tgz)
except Exception:
pass