AC-990 Add new task for saving all job events from a queue, updated callback plugin to send to queue, work on optimizing SQL that runs when saving a job event.

This commit is contained in:
Chris Church
2014-02-03 17:04:56 -05:00
parent 88dc25fc15
commit 32364ad055
5 changed files with 591 additions and 49 deletions

View File

@@ -22,6 +22,9 @@ import uuid
# Pexpect
import pexpect
# Kombu
from kombu import Connection, Exchange, Queue
# Celery
from celery import Task, task
@@ -29,7 +32,9 @@ from celery import Task, task
from django.conf import settings
from django.db import transaction, DatabaseError
from django.utils.datastructures import SortedDict
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now
from django.utils.tzinfo import FixedOffset
# AWX
from awx.main.models import Job, JobEvent, ProjectUpdate, InventoryUpdate
@@ -479,24 +484,86 @@ class RunJob(BaseTask):
Hook for actions to run after job/task has completed.
'''
super(RunJob, self).post_run_hook(job, **kwargs)
# Send a special message to this job's event queue to make sure the
# save job events task exits.
if settings.BROKER_URL.startswith('amqp://'):
job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events', exchange=job_events_exchange,
routing_key=('job_events[%d]' % job.id))
with Connection(settings.BROKER_URL) as conn:
with conn.Producer(serializer='json') as producer:
msg = {
'job_id': job.id,
'event': '__complete__'
}
producer.publish(msg, exchange=job_events_exchange,
routing_key=('job_events[%d]' % job.id),
declare=[job_events_queue])
# Update job event fields after job has completed (only when using REST
# API callback).
if not settings.BROKER_URL.startswith('amqp://'):
else:
for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True)
class SaveJobEvent(Task):
class SaveJobEvents(Task):
name = 'awx.main.tasks.save_job_event'
name = 'awx.main.tasks.save_job_events'
@transaction.commit_on_success
def run(self, *args, **kwargs):
for key in kwargs.keys():
if key not in ('job_id', 'event', 'event_data'):
kwargs.pop(key)
job_event = JobEvent(**kwargs)
job_event.save(post_process=True)
job_id = kwargs.get('job_id', None)
if not job_id:
return {'job_id': job_id}
job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events', exchange=job_events_exchange,
routing_key=('job_events[%d]' % job_id))
events_received = {}
def process_job_event(data, message):
begints = time.time()
event = data.get('event', '')
if not event:
return
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'):
data.pop(key)
try:
if not isinstance(data['created'], datetime.datetime):
data['created'] = parse_datetime(data['created'])
if not data['created'].tzinfo:
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
data.pop('created', None)
if settings.DEBUG:
print data
if event != '__complete__':
job_event = JobEvent(**data)
job_event.save(post_process=True)
transaction.commit()
if event not in events_received:
events_received[event] = 1
else:
events_received[event] += 1
message.ack()
if settings.DEBUG:
print 'saved job event in %0.3fs' % (time.time() - begints)
with Connection(settings.BROKER_URL) as conn:
with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer:
while '__complete__' not in events_received:
conn.drain_events()
try:
bound_exchange = job_events_exchange(conn).delete()
except:
pass
return {
'job_id': job_id,
#'events_received': events_received,
'total_events': sum(events_received.values())}
class RunProjectUpdate(BaseTask):