From 00060c95723afde46baeaa4a99f916cbfe9f91a3 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 27 May 2020 17:47:38 -0400 Subject: [PATCH 1/6] make time delta for firehose events configurable This was hardcoded to back in january, which make it of limited use testing automation analytics behavior when there are many events, we need it to be sometime in the past two months. Will take another pass to do the hours delta --- tools/scripts/firehose.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index 5856908829..722e8505c9 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -61,11 +61,15 @@ STATUS_OPTIONS = ('successful', 'failed', 'error', 'canceled') class YieldedRows(StringIO): - def __init__(self, job_id, rows, *args, **kwargs): + def __init__(self, job_id, rows, time_delta, *args, **kwargs): self.rows = rows + created_time = datetime.datetime.today() - datetime.timedelta(days=time_delta, hours=1, seconds=5) + modified_time = datetime.datetime.today() - datetime.timedelta(days=time_delta, hours=1, seconds=0) + created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") + modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") self.row = "\t".join([ - "2020-01-02 12:00:00", - "2020-01-02 12:00:01", + f"{created_stamp}", + f"{modified_stamp}", "playbook_on_start", "{}", 'false', @@ -94,9 +98,9 @@ class YieldedRows(StringIO): return self.row * 10000 -def firehose(job, count): +def firehose(job, count, time_delta): conn = psycopg2.connect(dsn) - f = YieldedRows(job, count) + f = YieldedRows(job, count, time_delta) with conn.cursor() as cursor: cursor.copy_expert(( 'COPY ' @@ -182,7 +186,7 @@ def generate_jobs(jobs, batch_size): return created -def generate_events(events, job): +def generate_events(events, job, time_delta): conn = psycopg2.connect(dsn) cursor = conn.cursor() print('removing indexes and constraints') @@ -218,7 +222,7 @@ def generate_events(events, job): workers = [] for i in range(cores): - p = multiprocessing.Process(target=firehose, args=(job, events / cores)) + p = multiprocessing.Process(target=firehose, args=(job, events / cores, time_delta)) p.daemon = True workers.append(p) @@ -287,10 +291,14 @@ if __name__ == '__main__': parser.add_argument( '--batch-size', type=int, help='Number of jobs to create in a single batch.', default=1000) + parser.add_argument( + '--time-delta', type=int, help='Number of days old to create the events. Defaults to 0.', + default=0) params = parser.parse_args() jobs = params.jobs + time_delta = params.time_delta events = params.events batch_size = params.batch_size print(datetime.datetime.utcnow().isoformat()) created = generate_jobs(jobs, batch_size=batch_size) - generate_events(events, str(created.pk)) + generate_events(events, str(created.pk), time_delta) From 18357877725f3f733e817fa81ca0ca4e87559c6a Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 27 May 2020 17:56:07 -0400 Subject: [PATCH 2/6] add option to set hour delta on firehose --- tools/scripts/firehose.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index 722e8505c9..b99158c4e2 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -63,8 +63,8 @@ class YieldedRows(StringIO): def __init__(self, job_id, rows, time_delta, *args, **kwargs): self.rows = rows - created_time = datetime.datetime.today() - datetime.timedelta(days=time_delta, hours=1, seconds=5) - modified_time = datetime.datetime.today() - datetime.timedelta(days=time_delta, hours=1, seconds=0) + created_time = datetime.datetime.today() - datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=5) + modified_time = datetime.datetime.today() - datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=0) created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") self.row = "\t".join([ @@ -292,11 +292,14 @@ if __name__ == '__main__': '--batch-size', type=int, help='Number of jobs to create in a single batch.', default=1000) parser.add_argument( - '--time-delta', type=int, help='Number of days old to create the events. Defaults to 0.', + '--days-delta', type=int, help='Number of days old to create the events. Defaults to 0.', default=0) + parser.add_argument( + '--hours-delta', type=int, help='Number of hours old to create the events. Defaults to 1.', + default=1) params = parser.parse_args() jobs = params.jobs - time_delta = params.time_delta + time_delta = params.days_delta, params.hours_delta events = params.events batch_size = params.batch_size print(datetime.datetime.utcnow().isoformat()) From a599afa81cd39012e5ada14edc7a3b41026326dc Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 27 May 2020 18:29:11 -0400 Subject: [PATCH 3/6] Also apply time delta to job This way they all happened on the same day --- tools/scripts/firehose.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index b99158c4e2..42bdcd9a87 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -63,8 +63,8 @@ class YieldedRows(StringIO): def __init__(self, job_id, rows, time_delta, *args, **kwargs): self.rows = rows - created_time = datetime.datetime.today() - datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=5) - modified_time = datetime.datetime.today() - datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=0) + created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5) + modified_time = datetime.datetime.today() - time_delta created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") self.row = "\t".join([ @@ -124,7 +124,7 @@ def cleanup(sql): conn.close() -def generate_jobs(jobs, batch_size): +def generate_jobs(jobs, batch_size, time_delta): print(f'inserting {jobs} job(s)') sys.path.insert(0, pkg_resources.get_distribution('awx').module_path) from awx import prepare_env @@ -161,7 +161,7 @@ def generate_jobs(jobs, batch_size): jobs = [ Job( status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)], - started=now(), created=now(), modified=now(), finished=now(), + started=now()-time_delta, created=now()-time_delta, modified=now()-time_delta, finished=now()-time_delta, elapsed=0., **jt_defaults) for i in range(N) ] @@ -300,8 +300,9 @@ if __name__ == '__main__': params = parser.parse_args() jobs = params.jobs time_delta = params.days_delta, params.hours_delta + time_delta = datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=0) events = params.events batch_size = params.batch_size print(datetime.datetime.utcnow().isoformat()) - created = generate_jobs(jobs, batch_size=batch_size) + created = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta) generate_events(events, str(created.pk), time_delta) From 5d5edf6535fc75da7d905adc4ae9c1e2d782ce6f Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 27 May 2020 18:43:26 -0400 Subject: [PATCH 4/6] create timestamp outside loop this may be expensive --- tools/scripts/firehose.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index 42bdcd9a87..fe154e660f 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -58,15 +58,10 @@ u = str(uuid4()) STATUS_OPTIONS = ('successful', 'failed', 'error', 'canceled') - class YieldedRows(StringIO): - def __init__(self, job_id, rows, time_delta, *args, **kwargs): + def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs): self.rows = rows - created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5) - modified_time = datetime.datetime.today() - time_delta - created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") - modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") self.row = "\t".join([ f"{created_stamp}", f"{modified_stamp}", @@ -98,9 +93,9 @@ class YieldedRows(StringIO): return self.row * 10000 -def firehose(job, count, time_delta): +def firehose(job, count, created_stamp, modified_stamp): conn = psycopg2.connect(dsn) - f = YieldedRows(job, count, time_delta) + f = YieldedRows(job, count, created_stamp, modified_stamp) with conn.cursor() as cursor: cursor.copy_expert(( 'COPY ' @@ -190,7 +185,10 @@ def generate_events(events, job, time_delta): conn = psycopg2.connect(dsn) cursor = conn.cursor() print('removing indexes and constraints') - + created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5) + modified_time = datetime.datetime.today() - time_delta + created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") + modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") # get all the indexes for main_jobevent try: # disable WAL to drastically increase write speed @@ -222,7 +220,7 @@ def generate_events(events, job, time_delta): workers = [] for i in range(cores): - p = multiprocessing.Process(target=firehose, args=(job, events / cores, time_delta)) + p = multiprocessing.Process(target=firehose, args=(job, events / cores, created_stamp, modified_stamp)) p.daemon = True workers.append(p) From 90ca2fd59bb3db0ac0bdc8976d97c4c1d068bfac Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 27 May 2020 19:34:12 -0400 Subject: [PATCH 5/6] be a bit more respectful of event request minimum events made now is 10,000, used to be 10,000*number of cores. could dig deeper but this is better for debugging changes to script --- tools/scripts/firehose.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index fe154e660f..7e2410daa4 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -219,8 +219,13 @@ def generate_events(events, job, time_delta): cores = multiprocessing.cpu_count() workers = [] - for i in range(cores): - p = multiprocessing.Process(target=firehose, args=(job, events / cores, created_stamp, modified_stamp)) + num_procs = min(cores, events) + num_events = events // num_procs + if num_events <= 1: + num_events = events + + for i in range(num_procs): + p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp)) p.daemon = True workers.append(p) From efa5a95cf1997d2477b32c717286220568b69c24 Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Thu, 28 May 2020 13:00:37 -0400 Subject: [PATCH 6/6] fix flake8 --- tools/scripts/firehose.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index 7e2410daa4..44afd52cfb 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -58,6 +58,7 @@ u = str(uuid4()) STATUS_OPTIONS = ('successful', 'failed', 'error', 'canceled') + class YieldedRows(StringIO): def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs): @@ -156,7 +157,7 @@ def generate_jobs(jobs, batch_size, time_delta): jobs = [ Job( status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)], - started=now()-time_delta, created=now()-time_delta, modified=now()-time_delta, finished=now()-time_delta, + started=now() - time_delta, created=now() - time_delta, modified=now() - time_delta, finished=now() - time_delta, elapsed=0., **jt_defaults) for i in range(N) ]