Merge pull request #7177 from kdelee/firehose_event_time

Set event time and date in firehose script

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2020-05-28 17:42:02 +00:00
committed by GitHub

View File

@@ -61,11 +61,11 @@ STATUS_OPTIONS = ('successful', 'failed', 'error', 'canceled')
class YieldedRows(StringIO): class YieldedRows(StringIO):
def __init__(self, job_id, rows, *args, **kwargs): def __init__(self, job_id, rows, created_stamp, modified_stamp, *args, **kwargs):
self.rows = rows self.rows = rows
self.row = "\t".join([ self.row = "\t".join([
"2020-01-02 12:00:00", f"{created_stamp}",
"2020-01-02 12:00:01", f"{modified_stamp}",
"playbook_on_start", "playbook_on_start",
"{}", "{}",
'false', 'false',
@@ -94,9 +94,9 @@ class YieldedRows(StringIO):
return self.row * 10000 return self.row * 10000
def firehose(job, count): def firehose(job, count, created_stamp, modified_stamp):
conn = psycopg2.connect(dsn) conn = psycopg2.connect(dsn)
f = YieldedRows(job, count) f = YieldedRows(job, count, created_stamp, modified_stamp)
with conn.cursor() as cursor: with conn.cursor() as cursor:
cursor.copy_expert(( cursor.copy_expert((
'COPY ' 'COPY '
@@ -120,7 +120,7 @@ def cleanup(sql):
conn.close() conn.close()
def generate_jobs(jobs, batch_size): def generate_jobs(jobs, batch_size, time_delta):
print(f'inserting {jobs} job(s)') print(f'inserting {jobs} job(s)')
sys.path.insert(0, pkg_resources.get_distribution('awx').module_path) sys.path.insert(0, pkg_resources.get_distribution('awx').module_path)
from awx import prepare_env from awx import prepare_env
@@ -157,7 +157,7 @@ def generate_jobs(jobs, batch_size):
jobs = [ jobs = [
Job( Job(
status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)], status=STATUS_OPTIONS[i % len(STATUS_OPTIONS)],
started=now(), created=now(), modified=now(), finished=now(), started=now() - time_delta, created=now() - time_delta, modified=now() - time_delta, finished=now() - time_delta,
elapsed=0., **jt_defaults) elapsed=0., **jt_defaults)
for i in range(N) for i in range(N)
] ]
@@ -182,11 +182,14 @@ def generate_jobs(jobs, batch_size):
return created return created
def generate_events(events, job): def generate_events(events, job, time_delta):
conn = psycopg2.connect(dsn) conn = psycopg2.connect(dsn)
cursor = conn.cursor() cursor = conn.cursor()
print('removing indexes and constraints') print('removing indexes and constraints')
created_time = datetime.datetime.today() - time_delta - datetime.timedelta(seconds=5)
modified_time = datetime.datetime.today() - time_delta
created_stamp = created_time.strftime("%Y-%m-%d %H:%M:%S")
modified_stamp = modified_time.strftime("%Y-%m-%d %H:%M:%S")
# get all the indexes for main_jobevent # get all the indexes for main_jobevent
try: try:
# disable WAL to drastically increase write speed # disable WAL to drastically increase write speed
@@ -217,8 +220,13 @@ def generate_events(events, job):
cores = multiprocessing.cpu_count() cores = multiprocessing.cpu_count()
workers = [] workers = []
for i in range(cores): num_procs = min(cores, events)
p = multiprocessing.Process(target=firehose, args=(job, events / cores)) num_events = events // num_procs
if num_events <= 1:
num_events = events
for i in range(num_procs):
p = multiprocessing.Process(target=firehose, args=(job, num_events, created_stamp, modified_stamp))
p.daemon = True p.daemon = True
workers.append(p) workers.append(p)
@@ -287,10 +295,18 @@ if __name__ == '__main__':
parser.add_argument( parser.add_argument(
'--batch-size', type=int, help='Number of jobs to create in a single batch.', '--batch-size', type=int, help='Number of jobs to create in a single batch.',
default=1000) default=1000)
parser.add_argument(
'--days-delta', type=int, help='Number of days old to create the events. Defaults to 0.',
default=0)
parser.add_argument(
'--hours-delta', type=int, help='Number of hours old to create the events. Defaults to 1.',
default=1)
params = parser.parse_args() params = parser.parse_args()
jobs = params.jobs jobs = params.jobs
time_delta = params.days_delta, params.hours_delta
time_delta = datetime.timedelta(days=time_delta[0], hours=time_delta[1], seconds=0)
events = params.events events = params.events
batch_size = params.batch_size batch_size = params.batch_size
print(datetime.datetime.utcnow().isoformat()) print(datetime.datetime.utcnow().isoformat())
created = generate_jobs(jobs, batch_size=batch_size) created = generate_jobs(jobs, batch_size=batch_size, time_delta=time_delta)
generate_events(events, str(created.pk)) generate_events(events, str(created.pk), time_delta)