From 4d06c812e628374f9c1c0dd91e0b6a1f10d696f6 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Tue, 18 Feb 2020 08:55:06 -0500 Subject: [PATCH] add the ability to load lots of jobs with firehose.py $ awx-python tools/scripts/firehose.py --jobs 5000000 --events 100000000 --- tools/scripts/firehose.py | 64 +++++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/tools/scripts/firehose.py b/tools/scripts/firehose.py index 91d6569f9e..7bb818bd2a 100755 --- a/tools/scripts/firehose.py +++ b/tools/scripts/firehose.py @@ -28,12 +28,21 @@ import argparse import datetime import json import multiprocessing +import pkg_resources import subprocess +import sys from io import StringIO +from time import time +from random import randint from uuid import uuid4 import psycopg2 +from django import setup as setup_django +from django.db import connection +from django.db.models.sql import InsertQuery +from django.utils.timezone import now + db = json.loads( subprocess.check_output( ['awx-manage', 'print_settings', 'DATABASES', '--format', 'json'] @@ -110,14 +119,39 @@ def cleanup(sql): conn.close() -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('job') - parser.add_argument('--chunk', type=int, default=1000000000) # 1B by default - params = parser.parse_args() - chunk = params.chunk - print(datetime.datetime.utcnow().isoformat()) +def generate_jobs(jobs): + print(f'inserting {jobs} job(s)') + sys.path.insert(0, pkg_resources.get_distribution('awx').module_path) + from awx import prepare_env + prepare_env() + setup_django() + from awx.main.models import UnifiedJob, Job, JobTemplate + + def make_batch(N): + jt = JobTemplate.objects.first() + jobs = [Job(job_template=jt, status='canceled', created=now(), modified=now(), elapsed=0.) for i in range(N)] + ujs = UnifiedJob.objects.bulk_create(jobs) + query = InsertQuery(Job) + fields = list(set(Job._meta.fields) - set(UnifiedJob._meta.fields)) + query.insert_values(fields, ujs) + with connection.cursor() as cursor: + query, params = query.sql_with_params()[0] + cursor.execute(query, params) + return ujs[-1] + + i = 1 + while jobs > 0: + s = time() + print('running batch {}, runtime {}'.format(i, time() - s)) + created = make_batch(min(jobs, 1000)) + print('took {}'.format(time() - s)) + i += 1 + jobs -= 1000 + return created + + +def generate_events(events, job): conn = psycopg2.connect(dsn) cursor = conn.cursor() print('removing indexes and constraints') @@ -146,12 +180,12 @@ if __name__ == '__main__': print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}') conn.commit() - print(f'inserting {chunk} events') + print(f'attaching {events} events to job {job}') cores = multiprocessing.cpu_count() workers = [] for i in range(cores): - p = multiprocessing.Process(target=firehose, args=(params.job, chunk / cores)) + p = multiprocessing.Process(target=firehose, args=(job, events / cores)) p.daemon = True workers.append(p) @@ -203,3 +237,15 @@ if __name__ == '__main__': cleanup(sql) conn.close() print(datetime.datetime.utcnow().isoformat()) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--jobs', type=int, default=1000000) # 1M by default + parser.add_argument('--events', type=int, default=1000000000) # 1B by default + params = parser.parse_args() + jobs = params.jobs + events = params.events + print(datetime.datetime.utcnow().isoformat()) + created = generate_jobs(jobs) + generate_events(events, str(created.pk))