Merge pull request #5973 from ryanpetrello/job-firehose

add the ability to load lots of jobs with firehose.py

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2020-02-18 16:30:12 +00:00 committed by GitHub
commit 6e2bd828a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -28,12 +28,21 @@ import argparse
import datetime
import json
import multiprocessing
import pkg_resources
import subprocess
import sys
from io import StringIO
from time import time
from random import randint
from uuid import uuid4
import psycopg2
from django import setup as setup_django
from django.db import connection
from django.db.models.sql import InsertQuery
from django.utils.timezone import now
db = json.loads(
subprocess.check_output(
['awx-manage', 'print_settings', 'DATABASES', '--format', 'json']
@ -110,14 +119,49 @@ def cleanup(sql):
conn.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('job')
parser.add_argument('--chunk', type=int, default=1000000000) # 1B by default
params = parser.parse_args()
chunk = params.chunk
print(datetime.datetime.utcnow().isoformat())
def generate_jobs(jobs):
print(f'inserting {jobs} job(s)')
sys.path.insert(0, pkg_resources.get_distribution('awx').module_path)
from awx import prepare_env
prepare_env()
setup_django()
from awx.main.models import UnifiedJob, Job, JobTemplate
fields = list(set(Job._meta.fields) - set(UnifiedJob._meta.fields))
job_field_names = set([f.attname for f in fields])
jt = JobTemplate.objects.first()
jt_defaults = dict(
(f.attname, getattr(jt, f.attname))
for f in JobTemplate._meta.get_fields()
if f.editable and f.attname in job_field_names and getattr(jt, f.attname)
)
jt_defaults['job_template_id'] = jt.pk
def make_batch(N, **extra):
jobs = [
Job(status='canceled', created=now(), modified=now(), elapsed=0., **extra)
for i in range(N)
]
ujs = UnifiedJob.objects.bulk_create(jobs)
query = InsertQuery(Job)
query.insert_values(fields, ujs)
with connection.cursor() as cursor:
query, params = query.sql_with_params()[0]
cursor.execute(query, params)
return ujs[-1]
i = 1
while jobs > 0:
s = time()
print('running batch {}, runtime {}'.format(i, time() - s))
created = make_batch(min(jobs, 1000), **jt_defaults)
print('took {}'.format(time() - s))
i += 1
jobs -= 1000
return created
def generate_events(events, job):
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
print('removing indexes and constraints')
@ -146,12 +190,12 @@ if __name__ == '__main__':
print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}')
conn.commit()
print(f'inserting {chunk} events')
print(f'attaching {events} events to job {job}')
cores = multiprocessing.cpu_count()
workers = []
for i in range(cores):
p = multiprocessing.Process(target=firehose, args=(params.job, chunk / cores))
p = multiprocessing.Process(target=firehose, args=(job, events / cores))
p.daemon = True
workers.append(p)
@ -203,3 +247,15 @@ if __name__ == '__main__':
cleanup(sql)
conn.close()
print(datetime.datetime.utcnow().isoformat())
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--jobs', type=int, default=1000000) # 1M by default
parser.add_argument('--events', type=int, default=1000000000) # 1B by default
params = parser.parse_args()
jobs = params.jobs
events = params.events
print(datetime.datetime.utcnow().isoformat())
created = generate_jobs(jobs)
generate_events(events, str(created.pk))