Fix the firehose job creation script

to account for the changes made due to the job event table partitioning work.
This commit is contained in:
Jeff Bradberry
2022-02-09 09:49:17 -05:00
parent afc0732a32
commit 04568ea830

View File

@@ -45,11 +45,7 @@ from django.db import connection
from django.db.models.sql import InsertQuery from django.db.models.sql import InsertQuery
from django.utils.timezone import now from django.utils.timezone import now
db = json.loads( db = json.loads(subprocess.check_output(['awx-manage', 'print_settings', 'DATABASES', '--format', 'json']))
subprocess.check_output(
['awx-manage', 'print_settings', 'DATABASES', '--format', 'json']
)
)
name = db['DATABASES']['default']['NAME'] name = db['DATABASES']['default']['NAME']
user = db['DATABASES']['default']['USER'] user = db['DATABASES']['default']['USER']
pw = db['DATABASES']['default']['PASSWORD'] pw = db['DATABASES']['default']['PASSWORD']
@@ -67,38 +63,39 @@ MODULE_OPTIONS = ('yup', 'stonchronize', 'templotz', 'deboog')
class YieldedRows(StringIO): class YieldedRows(StringIO):
def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs): def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs):
self.rows = rows self.rows = rows
self.rowlist = [] self.rowlist = []
for (event, module) in itertools.product(EVENT_OPTIONS, MODULE_OPTIONS): for (event, module) in itertools.product(EVENT_OPTIONS, MODULE_OPTIONS):
event_data_json = { event_data_json = {"task_action": module, "name": "Do a {} thing".format(module), "task": "Do a {} thing".format(module)}
"task_action": module, row = (
"name": "Do a {} thing".format(module), "\t".join(
"task": "Do a {} thing".format(module) [
} f"{created_stamp}",
row = "\t".join([ f"{modified_stamp}",
f"{created_stamp}", f"{created_stamp}",
f"{modified_stamp}", event,
event, json.dumps(event_data_json),
json.dumps(event_data_json), str(event in ('runner_on_failed', 'runner_on_unreachable')),
str(event in ('runner_on_failed', 'runner_on_unreachable')), str(event == 'runner_on_changed'),
str(event == 'runner_on_changed'), "localhost",
"localhost", "Example Play",
"Example Play", "Hello World",
"Hello World", "",
"", "0",
"0", "1",
"1", job_id,
job_id, u,
u, "",
"", "1",
"1", "hello_world.yml",
"hello_world.yml", "0",
"0", "X",
"X", "1",
"1", ]
]) + '\n' )
+ '\n'
)
self.rowlist.append(row) self.rowlist.append(row)
def read(self, x): def read(self, x):
@@ -117,15 +114,19 @@ def firehose(job, count, created_stamp, modified_stamp):
conn = psycopg2.connect(dsn) conn = psycopg2.connect(dsn)
f = YieldedRows(job, count, created_stamp, modified_stamp) f = YieldedRows(job, count, created_stamp, modified_stamp)
with conn.cursor() as cursor: with conn.cursor() as cursor:
cursor.copy_expert(( cursor.copy_expert(
'COPY ' (
'main_jobevent(' 'COPY '
'created, modified, event, event_data, failed, changed, ' 'main_jobevent('
'host_name, play, role, task, counter, host_id, job_id, uuid, ' 'created, modified, job_created, event, event_data, failed, changed, '
'parent_uuid, end_line, playbook, start_line, stdout, verbosity' 'host_name, play, role, task, counter, host_id, job_id, uuid, '
') ' 'parent_uuid, end_line, playbook, start_line, stdout, verbosity'
'FROM STDIN' ') '
), f, size=1024 * 1000) 'FROM STDIN'
),
f,
size=1024 * 1000,
)
conn.commit() conn.commit()
conn.close() conn.close()
@@ -143,10 +144,12 @@ def generate_jobs(jobs, batch_size, time_delta):
print(f'inserting {jobs} job(s)') print(f'inserting {jobs} job(s)')
sys.path.insert(0, pkg_resources.get_distribution('awx').module_path) sys.path.insert(0, pkg_resources.get_distribution('awx').module_path)
from awx import prepare_env from awx import prepare_env
prepare_env() prepare_env()
setup_django() setup_django()
from awx.main.models import UnifiedJob, Job, JobTemplate from awx.main.models import UnifiedJob, Job, JobTemplate
fields = list(set(Job._meta.fields) - set(UnifiedJob._meta.fields)) fields = list(set(Job._meta.fields) - set(UnifiedJob._meta.fields))
job_field_names = set([f.attname for f in fields]) job_field_names = set([f.attname for f in fields])
# extra unified job field names from base class # extra unified job field names from base class
@@ -161,9 +164,7 @@ def generate_jobs(jobs, batch_size, time_delta):
jt = JobTemplate.objects.all()[jt_pos % jt_count] jt = JobTemplate.objects.all()[jt_pos % jt_count]
except IndexError as e: except IndexError as e:
# seems to happen every now and then due to some race condition # seems to happen every now and then due to some race condition
print('Warning: IndexError on {} JT, error: {}'.format( print('Warning: IndexError on {} JT, error: {}'.format(jt_pos % jt_count, e))
jt_pos % jt_count, e
))
jt_pos += 1 jt_pos += 1
jt_defaults = dict( jt_defaults = dict(
(f.attname, getattr(jt, f.attname)) (f.attname, getattr(jt, f.attname))
@@ -176,22 +177,36 @@ def generate_jobs(jobs, batch_size, time_delta):
jobs = [ jobs = [
Job( Job(
status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)], status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)],
started=now() - time_delta, created=now() - time_delta, modified=now() - time_delta, finished=now() - time_delta, started=now() - time_delta,
elapsed=0., **jt_defaults) created=now() - time_delta,
modified=now() - time_delta,
finished=now() - time_delta,
elapsed=0.0,
**jt_defaults,
)
for i in range(N) for i in range(N)
] ]
ujs = UnifiedJob.objects.bulk_create(jobs) ujs = UnifiedJob.objects.bulk_create(jobs)
for uj in ujs:
uj.unifiedjob_ptr_id = uj.id # hack around the polymorphic id field not being picked up
query = InsertQuery(Job) query = InsertQuery(Job)
query.insert_values(fields, ujs) query.insert_values(fields, ujs)
with connection.cursor() as cursor: with connection.cursor() as cursor:
query, params = query.sql_with_params()[0] query, params = query.sql_with_params()[0]
cursor.execute(query, params) cursor.execute(query, params)
return ujs[-1], jt_pos, [ujs[i].pk for i in range(len(ujs))] return ujs[-1], jt_pos, [uj.pk for uj in ujs]
i = 1 i = 1
jt_pos = 0 jt_pos = 0
created_job_ids = [] created_job_ids = []
s = time() s = time()
from awx.main.models import JobEvent
from awx.main.utils.common import create_partition
start_partition = (now() - time_delta).replace(minute=0, second=0, microsecond=0)
create_partition(JobEvent._meta.db_table, start_partition)
while jobs > 0: while jobs > 0:
s_loop = time() s_loop = time()
print('running batch {}, runtime {}'.format(i, time() - s)) print('running batch {}, runtime {}'.format(i, time() - s))
@@ -200,21 +215,20 @@ def generate_jobs(jobs, batch_size, time_delta):
i += 1 i += 1
jobs -= batch_size jobs -= batch_size
created_job_ids += ujs_pk created_job_ids += ujs_pk
print('Creted Job IDS: {}'.format(created_job_ids)) print('Created Job IDS: {}'.format(created_job_ids))
#return created # return created
return created_job_ids return created_job_ids
def generate_events(events, job, time_delta): def generate_events(events, job, time_delta):
conn = psycopg2.connect(dsn) conn = psycopg2.connect(dsn)
cursor = conn.cursor() cursor = conn.cursor()
created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5) created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5)
modified_time = datetime.datetime.today() - time_delta modified_time = datetime.datetime.today() - time_delta
created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S") created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S")
modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S") modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S")
# get all the indexes for main_jobevent
print(f'attaching {events} events to job {job}') print(f'attaching {events} events to job {job}')
cores = multiprocessing.cpu_count() cores = multiprocessing.cpu_count()
workers = [] workers = []
@@ -244,7 +258,7 @@ def generate_events(events, job, time_delta):
cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;') cursor.execute('ALTER SEQUENCE firehose_line_seq RESTART WITH 0;')
cursor.execute("SELECT nextval('firehose_line_seq')") cursor.execute("SELECT nextval('firehose_line_seq')")
conn.commit() conn.commit()
cursor.execute( cursor.execute(
"UPDATE main_jobevent SET " "UPDATE main_jobevent SET "
"counter=nextval('firehose_seq')::integer," "counter=nextval('firehose_seq')::integer,"
@@ -258,18 +272,10 @@ def generate_events(events, job, time_delta):
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument( parser.add_argument('--jobs-per-hour', type=int, help='Number of jobs to create.', default=1000) # 1M by default
'--jobs-per-hour', type=int, help='Number of jobs to create.', parser.add_argument('--events-per-job', type=int, help='Number of events to create.', default=1345) # 1B by default
default=1000) # 1M by default parser.add_argument('--batch-size', type=int, help='Number of jobs to create in a single batch.', default=100)
parser.add_argument( parser.add_argument('--days-delta', type=int, help='Number of days old to create the events. Defaults to 31.', default=31)
'--events-per-job', type=int, help='Number of events to create.',
default=1345) # 1B by default
parser.add_argument(
'--batch-size', type=int, help='Number of jobs to create in a single batch.',
default=100)
parser.add_argument(
'--days-delta', type=int, help='Number of days old to create the events. Defaults to 31.',
default=31)
params = parser.parse_args() params = parser.parse_args()
jobs = params.jobs_per_hour jobs = params.jobs_per_hour
events = params.events_per_job events = params.events_per_job
@@ -279,7 +285,7 @@ if __name__ == '__main__':
conn = psycopg2.connect(dsn) conn = psycopg2.connect(dsn)
cursor = conn.cursor() cursor = conn.cursor()
#Drop all the indexes before generating jobs # Drop all the indexes before generating jobs
print('removing indexes and constraints') print('removing indexes and constraints')
# get all the indexes for main_jobevent # get all the indexes for main_jobevent
# disable WAL to drastically increase write speed # disable WAL to drastically increase write speed
@@ -287,17 +293,20 @@ if __name__ == '__main__':
# insert data as quickly as possible without concern for the risk of # insert data as quickly as possible without concern for the risk of
# data loss on crash # data loss on crash
# see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/ # see: https://www.compose.com/articles/faster-performance-with-unlogged-tables-in-postgresql/
cursor.execute('ALTER TABLE main_jobevent SET UNLOGGED') cursor.execute('ALTER TABLE main_jobevent SET UNLOGGED')
cursor.execute("SELECT indexname, indexdef FROM pg_indexes WHERE tablename='main_jobevent' AND indexname != 'main_jobevent_pkey1';") cursor.execute("SELECT indexname, indexdef FROM pg_indexes WHERE tablename='main_jobevent' AND indexname != 'main_jobevent_pkey1';")
indexes = cursor.fetchall() indexes = cursor.fetchall()
cursor.execute( cursor.execute(
"SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';") # noqa "SELECT conname, contype, pg_catalog.pg_get_constraintdef(r.oid, true) as condef FROM pg_catalog.pg_constraint r WHERE r.conrelid = 'main_jobevent'::regclass AND conname != 'main_jobevent_pkey1';"
) # noqa
constraints = cursor.fetchall() constraints = cursor.fetchall()
# drop all indexes for speed # drop all indexes for speed
for indexname, indexdef in indexes: for indexname, indexdef in indexes:
if indexname == 'main_jobevent_pkey_new': # Dropped by the constraint
continue
cursor.execute(f'DROP INDEX IF EXISTS {indexname}') cursor.execute(f'DROP INDEX IF EXISTS {indexname}')
print(f'DROP INDEX IF EXISTS {indexname}') print(f'DROP INDEX IF EXISTS {indexname}')
for conname, contype, condef in constraints: for conname, contype, condef in constraints:
@@ -305,8 +314,7 @@ if __name__ == '__main__':
print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}') print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}')
conn.commit() conn.commit()
for i_day in range(days_delta, 0, -1):
for i_day in range(days_delta,0,-1):
for j_hour in range(24): for j_hour in range(24):
time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0) time_delta = datetime.timedelta(days=i_day, hours=j_hour, seconds=0)
created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta) created_job_ids = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta)
@@ -315,24 +323,25 @@ if __name__ == '__main__':
print(datetime.datetime.utcnow().isoformat()) print(datetime.datetime.utcnow().isoformat())
conn.close() conn.close()
finally: finally:
# restore all indexes # restore all indexes
print(datetime.datetime.utcnow().isoformat()) print(datetime.datetime.utcnow().isoformat())
print('restoring indexes and constraints (this may take awhile)') print('restoring indexes and constraints (this may take awhile)')
workers = [] workers = []
for indexname, indexdef in indexes: for indexname, indexdef in indexes:
if indexname == 'main_jobevent_pkey_new': # Created by the constraint
continue
p = multiprocessing.Process(target=cleanup, args=(indexdef,)) p = multiprocessing.Process(target=cleanup, args=(indexdef,))
p.daemon = True p.daemon = True
workers.append(p) workers.append(p)
for w in workers: for w in workers:
w.start() w.start()
for w in workers: for w in workers:
w.join() w.join()
for conname, contype, condef in constraints: for conname, contype, condef in constraints:
if contype == 'c': if contype == 'c':
# if there are any check constraints, don't add them back # if there are any check constraints, don't add them back
@@ -340,6 +349,8 @@ if __name__ == '__main__':
# worthless, because Ansible doesn't emit counters, line # worthless, because Ansible doesn't emit counters, line
# numbers, verbosity, etc... < 0) # numbers, verbosity, etc... < 0)
continue continue
sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}'
cleanup(sql) sql = f'ALTER TABLE main_jobevent ADD CONSTRAINT {conname} {condef}'
cleanup(sql)
print(datetime.datetime.utcnow().isoformat()) print(datetime.datetime.utcnow().isoformat())