diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index 5856908829..44afd52cfb 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -61,11 +61,11 @@ STATUS_OPTIONS = ('successful', 'failed', 'error', 'canceled') class YieldedRows(StringIO): - def __init__(self, job_id, rows, *args, **kwargs): + def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs): self.rows = rows self.row = "\t".join([ - "2020-01-02 12:00:00", - "2020-01-02 12:00:01", + f"{created_stamp}", + f"{modified_stamp}", "playbook_on_start", "{}", 'false', @@ -94,9 +94,9 @@ class YieldedRows(StringIO): return self.row * 10000 -def firehose(job, count): +def firehose(job, count, created_stamp, modified_stamp): conn = psycopg2.connect(dsn) - f = YieldedRows(job, count) + f = YieldedRows(job, count, created_stamp, modified_stamp) with conn.cursor() as cursor: cursor.copy_expert(( 'COPY ' @@ -120,7 +120,7 @@ def cleanup(sql): conn.close() -def generate_jobs(jobs, batch_size): +def generate_jobs(jobs, batch_size, time_delta): print(f'inserting {jobs} job(s)') sys.path.insert(0, pkg_resources.get_distribution('awx').module_path) from awx import prepare_env @@ -157,7 +157,7 @@ def generate_jobs(jobs, batch_size): jobs = [ Job( status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)], - started=now(), created=now(), modified=now(), finished=now(), + started=now() - time_delta, created=now() - time_delta, modified=now() - time_delta, finished=now() - time_delta, elapsed=0., **jt_defaults) for i in range(N) ] @@ -182,11 +182,14 @@ def generate_jobs(jobs, batch_size): return created -def generate_events(events, job): +def generate_events(events, job, time_delta): conn = psycopg2.connect(dsn) cursor = conn.cursor() print('removing indexes and constraints') - + created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5) + modified_time = datetime.datetime.today() - time_delta + created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") + modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") # get all the indexes for main_jobevent try: # disable WAL to drastically increase write speed @@ -217,8 +220,13 @@ def generate_events(events, job): cores = multiprocessing.cpu_count() workers = [] - for i in range(cores): - p = multiprocessing.Process(target=firehose, args=(job, events / cores)) + num_procs = min(cores, events) + num_events = events // num_procs + if num_events <= 1: + num_events = events + + for i in range(num_procs): + p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp)) p.daemon = True workers.append(p) @@ -287,10 +295,18 @@ if __name__ == '__main__': parser.add_argument( '--batch-size', type=int, help='Number of jobs to create in a single batch.', default=1000) + parser.add_argument( + '--days-delta', type=int, help='Number of days old to create the events. Defaults to 0.', + default=0) + parser.add_argument( + '--hours-delta', type=int, help='Number of hours old to create the events. Defaults to 1.', + default=1) params = parser.parse_args() jobs = params.jobs + time_delta = params.days_delta, params.hours_delta + time_delta = datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=0) events = params.events batch_size = params.batch_size print(datetime.datetime.utcnow().isoformat()) - created = generate_jobs(jobs, batch_size=batch_size) - generate_events(events, str(created.pk)) + created = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta) + generate_events(events, str(created.pk), time_delta)