diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index d9d73c33d6..cd5930315a 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -45,11 +45,7 @@ from django.db import connection from django.db.models.sql import InsertQuery from django.utils.timezone import now -db = json.loads( - subprocess.check_output( - ['awx-manage', 'print_settings', 'DATABASES', '--format', 'json'] - ) -) +db = json.loads(subprocess.check_output(['awx-manage', 'print_settings', 'DATABASES', '--format', 'json'])) name = db['DATABASES']['default']['NAME'] user = db['DATABASES']['default']['USER'] pw = db['DATABASES']['default']['PASSWORD'] @@ -67,38 +63,39 @@ MODULE_OPTIONS = ('yup', 'stonchronize', 'templotz', 'deboog') class YieldedRows(StringIO): - def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs): self.rows = rows self.rowlist = [] for (event, module) in itertools.product(EVENT_OPTIONS, MODULE_OPTIONS): - event_data_json = { - "task_action": module, - "name": "Do a {} thing".format(module), - "task": "Do a {} thing".format(module) - } - row = "\t".join([ - f"{created_stamp}", - f"{modified_stamp}", - event, - json.dumps(event_data_json), - str(event in ('runner_on_failed', 'runner_on_unreachable')), - str(event == 'runner_on_changed'), - "localhost", - "Example Play", - "Hello World", - "", - "0", - "1", - job_id, - u, - "", - "1", - "hello_world.yml", - "0", - "X", - "1", - ]) + '\n' + event_data_json = {"task_action": module, "name": "Do a {} thing".format(module), "task": "Do a {} thing".format(module)} + row = ( + "\t".join( + [ + f"{created_stamp}", + f"{modified_stamp}", + f"{created_stamp}", + event, + json.dumps(event_data_json), + str(event in ('runner_on_failed', 'runner_on_unreachable')), + str(event == 'runner_on_changed'), + "localhost", + "Example Play", + "Hello World", + "", + "0", + "1", + job_id, + u, + "", + "1", + "hello_world.yml", + "0", + "X", + "1", + ] + ) + + '\n' + ) self.rowlist.append(row) def read(self, x): @@ -117,15 +114,19 @@ def firehose(job, count, created_stamp, modified_stamp): conn = psycopg2.connect(dsn) f = YieldedRows(job, count, created_stamp, modified_stamp) with conn.cursor() as cursor: - cursor.copy_expert(( - 'COPY ' - 'main_jobevent(' - 'created, modified, event, event_data, failed, changed, ' - 'host_name, play, role, task, counter, host_id, job_id, uuid, ' - 'parent_uuid, end_line, playbook, start_line, stdout, verbosity' - ') ' - 'FROM STDIN' - ), f, size=1024 * 1000) + cursor.copy_expert( + ( + 'COPY ' + 'main_jobevent(' + 'created, modified, job_created, event, event_data, failed, changed, ' + 'host_name, play, role, task, counter, host_id, job_id, uuid, ' + 'parent_uuid, end_line, playbook, start_line, stdout, verbosity' + ') ' + 'FROM STDIN' + ), + f, + size=1024 * 1000, + ) conn.commit() conn.close() @@ -143,10 +144,12 @@ def generate_jobs(jobs, batch_size, time_delta): print(f'inserting {jobs} job(s)') sys.path.insert(0, pkg_resources.get_distribution('awx').module_path) from awx import prepare_env + prepare_env() setup_django() from awx.main.models import UnifiedJob, Job, JobTemplate + fields = list(set(Job._meta.fields) - set(UnifiedJob._meta.fields)) job_field_names = set([f.attname for f in fields]) # extra unified job field names from base class @@ -161,9 +164,7 @@ def generate_jobs(jobs, batch_size, time_delta): jt = JobTemplate.objects.all()[jt_pos % jt_count] except IndexError as e: # seems to happen every now and then due to some race condition - print('Warning: IndexError on {} JT, error: {}'.format( - jt_pos % jt_count, e - )) + print('Warning: IndexError on {} JT, error: {}'.format(jt_pos % jt_count, e)) jt_pos += 1 jt_defaults = dict( (f.attname, getattr(jt, f.attname)) @@ -176,22 +177,36 @@ def generate_jobs(jobs, batch_size, time_delta): jobs = [ Job( status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)], - started=now() - time_delta, created=now() - time_delta, modified=now() - time_delta, finished=now() - time_delta, - elapsed=0., **jt_defaults) + started=now() - time_delta, + created=now() - time_delta, + modified=now() - time_delta, + finished=now() - time_delta, + elapsed=0.0, + **jt_defaults, + ) for i in range(N) ] ujs = UnifiedJob.objects.bulk_create(jobs) + for uj in ujs: + uj.unifiedjob_ptr_id = uj.id # hack around the polymorphic id field not being picked up query = InsertQuery(Job) query.insert_values(fields, ujs) with connection.cursor() as cursor: query, params = query.sql_with_params()[0] cursor.execute(query, params) - return ujs[-1], jt_pos, [ujs[i].pk for i in range(len(ujs))] + return ujs[-1], jt_pos, [uj.pk for uj in ujs] i = 1 jt_pos = 0 created_job_ids = [] s = time() + + from awx.main.models import JobEvent + from awx.main.utils.common import create_partition + + start_partition = (now() - time_delta).replace(minute=0, second=0, microsecond=0) + create_partition(JobEvent._meta.db_table, start_partition) + while jobs > 0: s_loop = time() print('running batch {}, runtime {}'.format(i, time() - s)) @@ -200,21 +215,20 @@ def generate_jobs(jobs, batch_size, time_delta): i += 1 jobs -= batch_size created_job_ids += ujs_pk - print('Creted Job IDS: {}'.format(created_job_ids)) - #return created + print('Created Job IDS: {}'.format(created_job_ids)) + # return created return created_job_ids def generate_events(events, job, time_delta): conn = psycopg2.connect(dsn) cursor = conn.cursor() - + created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5) modified_time = datetime.datetime.today() - time_delta created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") - - # get all the indexes for main_jobevent + print(f'attaching {events} events to job {job}') cores = multiprocessing.cpu_count() workers = [] @@ -244,7 +258,7 @@ def generate_events(events, job, time_delta): cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;') cursor.execute("SELECT nextval('firehose_line_seq')") conn.commit() - + cursor.execute( "UPDATE main_jobevent SET " "counter=nextval('firehose_seq')::integer," @@ -258,18 +272,10 @@ def generate_events(events, job, time_delta): if __name__ == '__main__': parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument( - '--jobs-per-hour', type=int, help='Number of jobs to create.', - default=1000) # 1M by default - parser.add_argument( - '--events-per-job', type=int, help='Number of events to create.', - default=1345) # 1B by default - parser.add_argument( - '--batch-size', type=int, help='Number of jobs to create in a single batch.', - default=100) - parser.add_argument( - '--days-delta', type=int, help='Number of days old to create the events. Defaults to 31.', - default=31) + parser.add_argument('--jobs-per-hour', type=int, help='Number of jobs to create.', default=1000) # 1M by default + parser.add_argument('--events-per-job', type=int, help='Number of events to create.', default=1345) # 1B by default + parser.add_argument('--batch-size', type=int, help='Number of jobs to create in a single batch.', default=100) + parser.add_argument('--days-delta', type=int, help='Number of days old to create the events. Defaults to 31.', default=31) params = parser.parse_args() jobs = params.jobs_per_hour events = params.events_per_job @@ -279,7 +285,7 @@ if __name__ == '__main__': conn = psycopg2.connect(dsn) cursor = conn.cursor() - #Drop all the indexes before generating jobs + # Drop all the indexes before generating jobs print('removing indexes and constraints') # get all the indexes for main_jobevent # disable WAL to drastically increase write speed @@ -287,17 +293,20 @@ if __name__ == '__main__': # insert data as quickly as possible without concern for the risk of # data loss on crash # see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/ - + cursor.execute('ALTER TABLE main_jobevent SET UNLOGGED') cursor.execute("SELECT indexname, indexdef FROM pg_indexes WHERE tablename='main_jobevent' AND indexname != 'main_jobevent_pkey1';") indexes = cursor.fetchall() - + cursor.execute( - "SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';") # noqa + "SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';" + ) # noqa constraints = cursor.fetchall() - + # drop all indexes for speed for indexname, indexdef in indexes: + if indexname == 'main_jobevent_pkey_new': # Dropped by the constraint + continue cursor.execute(f'DROP INDEX IF EXISTS {indexname}') print(f'DROP INDEX IF EXISTS {indexname}') for conname, contype, condef in constraints: @@ -305,8 +314,7 @@ if __name__ == '__main__': print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}') conn.commit() - - for i_day in range(days_delta,0,-1): + for i_day in range(days_delta, 0, -1): for j_hour in range(24): time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0) created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta) @@ -315,24 +323,25 @@ if __name__ == '__main__': print(datetime.datetime.utcnow().isoformat()) conn.close() - finally: # restore all indexes print(datetime.datetime.utcnow().isoformat()) print('restoring indexes and constraints (this may take awhile)') - + workers = [] for indexname, indexdef in indexes: + if indexname == 'main_jobevent_pkey_new': # Created by the constraint + continue p = multiprocessing.Process(target=cleanup, args=(indexdef,)) p.daemon = True workers.append(p) - + for w in workers: w.start() - + for w in workers: w.join() - + for conname, contype, condef in constraints: if contype == 'c': # if there are any check constraints, don't add them back @@ -340,6 +349,8 @@ if __name__ == '__main__': # worthless, because Ansible doesn't emit counters, line # numbers, verbosity, etc... < 0) continue - sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}' - cleanup(sql) + + sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}' + cleanup(sql) + print(datetime.datetime.utcnow().isoformat())