From 412015b7bd41ffabca18a8618ed5ebd99cac5883 Mon Sep 17 00:00:00 2001 From: mcharanrm Date: Fri, 19 Feb 2021 19:10:34 +0530 Subject: [PATCH] tools/scripts/firehose.py: capture all job ids, attach events to all jobs ids and distribute the events --- tools/scripts/firehose.py | 179 +++++++++++++++++++++----------------- 1 file changed, 99 insertions(+), 80 deletions(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index f7c7e48551..d9d73c33d6 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -24,6 +24,7 @@ # is something like m5.4xlarge w/ 1TB provisioned IOPS SSD (io1) # +import math import argparse import datetime import itertools @@ -104,6 +105,10 @@ class YieldedRows(StringIO): if self.rows <= 0: self.close() return '' + elif self.rows >= 1 and self.rows < 1000: + event_rows = self.rowlist[random.randrange(len(self.rowlist))] * self.rows + self.rows -= self.rows + return event_rows self.rows -= 1000 return self.rowlist[random.randrange(len(self.rowlist))] * 1000 @@ -181,46 +186,116 @@ def generate_jobs(jobs, batch_size, time_delta): with connection.cursor() as cursor: query, params = query.sql_with_params()[0] cursor.execute(query, params) - return ujs[-1], jt_pos + return ujs[-1], jt_pos, [ujs[i].pk for i in range(len(ujs))] i = 1 jt_pos = 0 + created_job_ids = [] s = time() while jobs > 0: s_loop = time() print('running batch {}, runtime {}'.format(i, time() - s)) - created, jt_pos = make_batch(min(jobs, batch_size), jt_pos) + created, jt_pos, ujs_pk = make_batch(min(jobs, batch_size), jt_pos) print('took {}'.format(time() - s_loop)) i += 1 jobs -= batch_size - return created + created_job_ids += ujs_pk + print('Creted Job IDS: {}'.format(created_job_ids)) + #return created + return created_job_ids def generate_events(events, job, time_delta): conn = psycopg2.connect(dsn) cursor = conn.cursor() - print('removing indexes and constraints') + created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5) modified_time = datetime.datetime.today() - time_delta created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") + # get all the indexes for main_jobevent + print(f'attaching {events} events to job {job}') + cores = multiprocessing.cpu_count() + workers = [] + + num_procs = min(cores, events) + num_events = math.ceil(events / num_procs) + if num_events <= 1: + num_events = events + + for i in range(num_procs): + p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp)) + p.daemon = True + workers.append(p) + + for w in workers: + w.start() + + for w in workers: + w.join() + + workers = [] + + print('generating unique start/end line counts') + cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_seq;') + cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_line_seq MINVALUE 0;') + cursor.execute('ALTER SEQUENCE firehose_seq RESTART WITH 1;') + cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;') + cursor.execute("SELECT nextval('firehose_line_seq')") + conn.commit() + + cursor.execute( + "UPDATE main_jobevent SET " + "counter=nextval('firehose_seq')::integer," + "start_line=nextval('firehose_line_seq')::integer," + "end_line=currval('firehose_line_seq')::integer + 2 " + f"WHERE job_id={job}" + ) + conn.commit() + conn.close() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument( + '--jobs-per-hour', type=int, help='Number of jobs to create.', + default=1000) # 1M by default + parser.add_argument( + '--events-per-job', type=int, help='Number of events to create.', + default=1345) # 1B by default + parser.add_argument( + '--batch-size', type=int, help='Number of jobs to create in a single batch.', + default=100) + parser.add_argument( + '--days-delta', type=int, help='Number of days old to create the events. Defaults to 31.', + default=31) + params = parser.parse_args() + jobs = params.jobs_per_hour + events = params.events_per_job + days_delta = params.days_delta + batch_size = params.batch_size try: + conn = psycopg2.connect(dsn) + cursor = conn.cursor() + + #Drop all the indexes before generating jobs + print('removing indexes and constraints') + # get all the indexes for main_jobevent # disable WAL to drastically increase write speed # we're not doing replication, and the goal of this script is to just # insert data as quickly as possible without concern for the risk of # data loss on crash # see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/ + cursor.execute('ALTER TABLE main_jobevent SET UNLOGGED') - cursor.execute("SELECT indexname, indexdef FROM pg_indexes WHERE tablename='main_jobevent' AND indexname != 'main_jobevent_pkey1';") indexes = cursor.fetchall() - + cursor.execute( - "SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';" # noqa - ) + "SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';") # noqa constraints = cursor.fetchall() - + # drop all indexes for speed for indexname, indexdef in indexes: cursor.execute(f'DROP INDEX IF EXISTS {indexname}') @@ -230,61 +305,34 @@ def generate_events(events, job, time_delta): print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}') conn.commit() - print(f'attaching {events} events to job {job}') - cores = multiprocessing.cpu_count() - workers = [] - num_procs = min(cores, events) - num_events = events // num_procs - if num_events <= 1: - num_events = events + for i_day in range(days_delta,0,-1): + for j_hour in range(24): + time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0) + created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta) + for k_id in created_job_ids: + generate_events(events, str(k_id), time_delta) + print(datetime.datetime.utcnow().isoformat()) + conn.close() - for i in range(num_procs): - p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp)) - p.daemon = True - workers.append(p) - for w in workers: - w.start() - - for w in workers: - w.join() - - workers = [] - - print('generating unique start/end line counts') - cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_seq;') - cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_line_seq MINVALUE 0;') - cursor.execute('ALTER SEQUENCE firehose_seq RESTART WITH 1;') - cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;') - cursor.execute("SELECT nextval('firehose_line_seq')") - conn.commit() - - cursor.execute( - "UPDATE main_jobevent SET " - "counter=nextval('firehose_seq')::integer," - "start_line=nextval('firehose_line_seq')::integer," - "end_line=currval('firehose_line_seq')::integer + 2 " - f"WHERE job_id={job}" - ) - conn.commit() finally: # restore all indexes print(datetime.datetime.utcnow().isoformat()) print('restoring indexes and constraints (this may take awhile)') - + workers = [] for indexname, indexdef in indexes: p = multiprocessing.Process(target=cleanup, args=(indexdef,)) p.daemon = True workers.append(p) - + for w in workers: w.start() - + for w in workers: w.join() - + for conname, contype, condef in constraints: if contype == 'c': # if there are any check constraints, don't add them back @@ -292,35 +340,6 @@ def generate_events(events, job, time_delta): # worthless, because Ansible doesn't emit counters, line # numbers, verbosity, etc... < 0) continue - sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}' - cleanup(sql) - conn.close() - print(datetime.datetime.utcnow().isoformat()) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument( - '--jobs', type=int, help='Number of jobs to create.', - default=1000000) # 1M by default - parser.add_argument( - '--events', type=int, help='Number of events to create.', - default=1000000000) # 1B by default - parser.add_argument( - '--batch-size', type=int, help='Number of jobs to create in a single batch.', - default=1000) - parser.add_argument( - '--days-delta', type=int, help='Number of days old to create the events. Defaults to 0.', - default=0) - parser.add_argument( - '--hours-delta', type=int, help='Number of hours old to create the events. Defaults to 1.', - default=1) - params = parser.parse_args() - jobs = params.jobs - time_delta = params.days_delta, params.hours_delta - time_delta = datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=0) - events = params.events - batch_size = params.batch_size - print(datetime.datetime.utcnow().isoformat()) - created = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta) - generate_events(events, str(created.pk), time_delta) + sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}' + cleanup(sql) + print(datetime.datetime.utcnow().isoformat())