mirror of
https://github.com/ansible/awx.git
synced 2026-03-09 05:29:26 -02:30
tools/scripts/firehose.py: capture all job ids, attach events to all jobs ids and distribute the events
This commit is contained in:
@@ -24,6 +24,7 @@
|
|||||||
# is something like m5.4xlarge w/ 1TB provisioned IOPS SSD (io1)
|
# is something like m5.4xlarge w/ 1TB provisioned IOPS SSD (io1)
|
||||||
#
|
#
|
||||||
|
|
||||||
|
import math
|
||||||
import argparse
|
import argparse
|
||||||
import datetime
|
import datetime
|
||||||
import itertools
|
import itertools
|
||||||
@@ -104,6 +105,10 @@ class YieldedRows(StringIO):
|
|||||||
if self.rows <= 0:
|
if self.rows <= 0:
|
||||||
self.close()
|
self.close()
|
||||||
return ''
|
return ''
|
||||||
|
elif self.rows >= 1 and self.rows < 1000:
|
||||||
|
event_rows = self.rowlist[random.randrange(len(self.rowlist))] * self.rows
|
||||||
|
self.rows -= self.rows
|
||||||
|
return event_rows
|
||||||
self.rows -= 1000
|
self.rows -= 1000
|
||||||
return self.rowlist[random.randrange(len(self.rowlist))] * 1000
|
return self.rowlist[random.randrange(len(self.rowlist))] * 1000
|
||||||
|
|
||||||
@@ -181,44 +186,114 @@ def generate_jobs(jobs, batch_size, time_delta):
|
|||||||
with connection.cursor() as cursor:
|
with connection.cursor() as cursor:
|
||||||
query, params = query.sql_with_params()[0]
|
query, params = query.sql_with_params()[0]
|
||||||
cursor.execute(query, params)
|
cursor.execute(query, params)
|
||||||
return ujs[-1], jt_pos
|
return ujs[-1], jt_pos, [ujs[i].pk for i in range(len(ujs))]
|
||||||
|
|
||||||
i = 1
|
i = 1
|
||||||
jt_pos = 0
|
jt_pos = 0
|
||||||
|
created_job_ids = []
|
||||||
s = time()
|
s = time()
|
||||||
while jobs > 0:
|
while jobs > 0:
|
||||||
s_loop = time()
|
s_loop = time()
|
||||||
print('running batch {}, runtime {}'.format(i, time() - s))
|
print('running batch {}, runtime {}'.format(i, time() - s))
|
||||||
created, jt_pos = make_batch(min(jobs, batch_size), jt_pos)
|
created, jt_pos, ujs_pk = make_batch(min(jobs, batch_size), jt_pos)
|
||||||
print('took {}'.format(time() - s_loop))
|
print('took {}'.format(time() - s_loop))
|
||||||
i += 1
|
i += 1
|
||||||
jobs -= batch_size
|
jobs -= batch_size
|
||||||
return created
|
created_job_ids += ujs_pk
|
||||||
|
print('Creted Job IDS: {}'.format(created_job_ids))
|
||||||
|
#return created
|
||||||
|
return created_job_ids
|
||||||
|
|
||||||
|
|
||||||
def generate_events(events, job, time_delta):
|
def generate_events(events, job, time_delta):
|
||||||
conn = psycopg2.connect(dsn)
|
conn = psycopg2.connect(dsn)
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
print('removing indexes and constraints')
|
|
||||||
created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5)
|
created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5)
|
||||||
modified_time = datetime.datetime.today() - time_delta
|
modified_time = datetime.datetime.today() - time_delta
|
||||||
created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S")
|
created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
# get all the indexes for main_jobevent
|
# get all the indexes for main_jobevent
|
||||||
|
print(f'attaching {events} events to job {job}')
|
||||||
|
cores = multiprocessing.cpu_count()
|
||||||
|
workers = []
|
||||||
|
|
||||||
|
num_procs = min(cores, events)
|
||||||
|
num_events = math.ceil(events / num_procs)
|
||||||
|
if num_events <= 1:
|
||||||
|
num_events = events
|
||||||
|
|
||||||
|
for i in range(num_procs):
|
||||||
|
p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp))
|
||||||
|
p.daemon = True
|
||||||
|
workers.append(p)
|
||||||
|
|
||||||
|
for w in workers:
|
||||||
|
w.start()
|
||||||
|
|
||||||
|
for w in workers:
|
||||||
|
w.join()
|
||||||
|
|
||||||
|
workers = []
|
||||||
|
|
||||||
|
print('generating unique start/end line counts')
|
||||||
|
cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_seq;')
|
||||||
|
cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_line_seq MINVALUE 0;')
|
||||||
|
cursor.execute('ALTER SEQUENCE firehose_seq RESTART WITH 1;')
|
||||||
|
cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;')
|
||||||
|
cursor.execute("SELECT nextval('firehose_line_seq')")
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
cursor.execute(
|
||||||
|
"UPDATE main_jobevent SET "
|
||||||
|
"counter=nextval('firehose_seq')::integer,"
|
||||||
|
"start_line=nextval('firehose_line_seq')::integer,"
|
||||||
|
"end_line=currval('firehose_line_seq')::integer + 2 "
|
||||||
|
f"WHERE job_id={job}"
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
|
parser.add_argument(
|
||||||
|
'--jobs-per-hour', type=int, help='Number of jobs to create.',
|
||||||
|
default=1000) # 1M by default
|
||||||
|
parser.add_argument(
|
||||||
|
'--events-per-job', type=int, help='Number of events to create.',
|
||||||
|
default=1345) # 1B by default
|
||||||
|
parser.add_argument(
|
||||||
|
'--batch-size', type=int, help='Number of jobs to create in a single batch.',
|
||||||
|
default=100)
|
||||||
|
parser.add_argument(
|
||||||
|
'--days-delta', type=int, help='Number of days old to create the events. Defaults to 31.',
|
||||||
|
default=31)
|
||||||
|
params = parser.parse_args()
|
||||||
|
jobs = params.jobs_per_hour
|
||||||
|
events = params.events_per_job
|
||||||
|
days_delta = params.days_delta
|
||||||
|
batch_size = params.batch_size
|
||||||
try:
|
try:
|
||||||
|
conn = psycopg2.connect(dsn)
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
#Drop all the indexes before generating jobs
|
||||||
|
print('removing indexes and constraints')
|
||||||
|
# get all the indexes for main_jobevent
|
||||||
# disable WAL to drastically increase write speed
|
# disable WAL to drastically increase write speed
|
||||||
# we're not doing replication, and the goal of this script is to just
|
# we're not doing replication, and the goal of this script is to just
|
||||||
# insert data as quickly as possible without concern for the risk of
|
# insert data as quickly as possible without concern for the risk of
|
||||||
# data loss on crash
|
# data loss on crash
|
||||||
# see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/
|
# see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/
|
||||||
cursor.execute('ALTER TABLE main_jobevent SET UNLOGGED')
|
|
||||||
|
|
||||||
|
cursor.execute('ALTER TABLE main_jobevent SET UNLOGGED')
|
||||||
cursor.execute("SELECT indexname, indexdef FROM pg_indexes WHERE tablename='main_jobevent' AND indexname != 'main_jobevent_pkey1';")
|
cursor.execute("SELECT indexname, indexdef FROM pg_indexes WHERE tablename='main_jobevent' AND indexname != 'main_jobevent_pkey1';")
|
||||||
indexes = cursor.fetchall()
|
indexes = cursor.fetchall()
|
||||||
|
|
||||||
cursor.execute(
|
cursor.execute(
|
||||||
"SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';" # noqa
|
"SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';") # noqa
|
||||||
)
|
|
||||||
constraints = cursor.fetchall()
|
constraints = cursor.fetchall()
|
||||||
|
|
||||||
# drop all indexes for speed
|
# drop all indexes for speed
|
||||||
@@ -230,44 +305,17 @@ def generate_events(events, job, time_delta):
|
|||||||
print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}')
|
print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}')
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
print(f'attaching {events} events to job {job}')
|
|
||||||
cores = multiprocessing.cpu_count()
|
|
||||||
workers = []
|
|
||||||
|
|
||||||
num_procs = min(cores, events)
|
for i_day in range(days_delta,0,-1):
|
||||||
num_events = events // num_procs
|
for j_hour in range(24):
|
||||||
if num_events <= 1:
|
time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0)
|
||||||
num_events = events
|
created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta)
|
||||||
|
for k_id in created_job_ids:
|
||||||
|
generate_events(events, str(k_id), time_delta)
|
||||||
|
print(datetime.datetime.utcnow().isoformat())
|
||||||
|
conn.close()
|
||||||
|
|
||||||
for i in range(num_procs):
|
|
||||||
p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp))
|
|
||||||
p.daemon = True
|
|
||||||
workers.append(p)
|
|
||||||
|
|
||||||
for w in workers:
|
|
||||||
w.start()
|
|
||||||
|
|
||||||
for w in workers:
|
|
||||||
w.join()
|
|
||||||
|
|
||||||
workers = []
|
|
||||||
|
|
||||||
print('generating unique start/end line counts')
|
|
||||||
cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_seq;')
|
|
||||||
cursor.execute('CREATE SEQUENCE IF NOT EXISTS firehose_line_seq MINVALUE 0;')
|
|
||||||
cursor.execute('ALTER SEQUENCE firehose_seq RESTART WITH 1;')
|
|
||||||
cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;')
|
|
||||||
cursor.execute("SELECT nextval('firehose_line_seq')")
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
cursor.execute(
|
|
||||||
"UPDATE main_jobevent SET "
|
|
||||||
"counter=nextval('firehose_seq')::integer,"
|
|
||||||
"start_line=nextval('firehose_line_seq')::integer,"
|
|
||||||
"end_line=currval('firehose_line_seq')::integer + 2 "
|
|
||||||
f"WHERE job_id={job}"
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
finally:
|
finally:
|
||||||
# restore all indexes
|
# restore all indexes
|
||||||
print(datetime.datetime.utcnow().isoformat())
|
print(datetime.datetime.utcnow().isoformat())
|
||||||
@@ -292,35 +340,6 @@ def generate_events(events, job, time_delta):
|
|||||||
# worthless, because Ansible doesn't emit counters, line
|
# worthless, because Ansible doesn't emit counters, line
|
||||||
# numbers, verbosity, etc... < 0)
|
# numbers, verbosity, etc... < 0)
|
||||||
continue
|
continue
|
||||||
sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}'
|
sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}'
|
||||||
cleanup(sql)
|
cleanup(sql)
|
||||||
conn.close()
|
print(datetime.datetime.utcnow().isoformat())
|
||||||
print(datetime.datetime.utcnow().isoformat())
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
|
||||||
parser.add_argument(
|
|
||||||
'--jobs', type=int, help='Number of jobs to create.',
|
|
||||||
default=1000000) # 1M by default
|
|
||||||
parser.add_argument(
|
|
||||||
'--events', type=int, help='Number of events to create.',
|
|
||||||
default=1000000000) # 1B by default
|
|
||||||
parser.add_argument(
|
|
||||||
'--batch-size', type=int, help='Number of jobs to create in a single batch.',
|
|
||||||
default=1000)
|
|
||||||
parser.add_argument(
|
|
||||||
'--days-delta', type=int, help='Number of days old to create the events. Defaults to 0.',
|
|
||||||
default=0)
|
|
||||||
parser.add_argument(
|
|
||||||
'--hours-delta', type=int, help='Number of hours old to create the events. Defaults to 1.',
|
|
||||||
default=1)
|
|
||||||
params = parser.parse_args()
|
|
||||||
jobs = params.jobs
|
|
||||||
time_delta = params.days_delta, params.hours_delta
|
|
||||||
time_delta = datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=0)
|
|
||||||
events = params.events
|
|
||||||
batch_size = params.batch_size
|
|
||||||
print(datetime.datetime.utcnow().isoformat())
|
|
||||||
created = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta)
|
|
||||||
generate_events(events, str(created.pk), time_delta)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user