AC-1015 Update queue names so that job events are only received by the right save job events task. Add database retry to save job events task.

This commit is contained in:
Chris Church
2014-02-09 05:58:33 -05:00
parent 818f235f72
commit a4ae6567dc
2 changed files with 70 additions and 59 deletions

View File

@@ -6,6 +6,7 @@ import ConfigParser
import cStringIO import cStringIO
import datetime import datetime
import distutils.version import distutils.version
import functools
import json import json
import logging import logging
import os import os
@@ -93,7 +94,6 @@ class BaseTask(Task):
# Commit outstanding transaction so that we fetch the latest object # Commit outstanding transaction so that we fetch the latest object
# from the database. # from the database.
transaction.commit() transaction.commit()
save_succeeded = True
while True: while True:
try: try:
instance = self.model.objects.get(pk=pk) instance = self.model.objects.get(pk=pk)
@@ -109,15 +109,11 @@ class BaseTask(Task):
update_fields.append('failed') update_fields.append('failed')
instance.save(update_fields=update_fields) instance.save(update_fields=update_fields)
transaction.commit() transaction.commit()
save_succeeded = True break
except DatabaseError as e: except DatabaseError as e:
transaction.rollback() transaction.rollback()
logger.debug("Database error encountered, retrying in 5 seconds: " + str(e)) logger.debug("Database error encountered, retrying in 5 seconds: " + str(e))
time.sleep(5) time.sleep(5)
save_succeeded = False
finally:
if save_succeeded:
break
return instance return instance
def get_model(self, pk): def get_model(self, pk):
@@ -508,12 +504,14 @@ class RunJob(BaseTask):
Hook for actions to run after job/task has completed. Hook for actions to run after job/task has completed.
''' '''
super(RunJob, self).post_run_hook(job, **kwargs) super(RunJob, self).post_run_hook(job, **kwargs)
# Send a special message to this job's event queue to make sure the # Send a special message to this job's event queue after the job has run
# save job events task exits. # to tell the save job events task to end.
if settings.BROKER_URL.startswith('amqp://'): if settings.BROKER_URL.startswith('amqp://'):
job_events_exchange = Exchange('job_events', 'direct', durable=True) job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events', exchange=job_events_exchange, job_events_queue = Queue('job_events[%d]' % job.id,
routing_key=('job_events[%d]' % job.id)) exchange=job_events_exchange,
routing_key=('job_events[%d]' % job.id),
auto_delete=True)
with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn:
with conn.Producer(serializer='json') as producer: with conn.Producer(serializer='json') as producer:
msg = { msg = {
@@ -535,18 +533,9 @@ class SaveJobEvents(Task):
name = 'awx.main.tasks.save_job_events' name = 'awx.main.tasks.save_job_events'
@transaction.commit_on_success def process_job_event(self, data, message, events_received=None):
def run(self, *args, **kwargs): if events_received is None:
job_id = kwargs.get('job_id', None)
if not job_id:
return {}
job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events', exchange=job_events_exchange,
routing_key=('job_events[%d]' % job_id))
events_received = {} events_received = {}
def process_job_event(data, message):
begints = time.time() begints = time.time()
event = data.get('event', '') event = data.get('event', '')
if not event or 'job_id' not in data: if not event or 'job_id' not in data:
@@ -558,7 +547,7 @@ class SaveJobEvents(Task):
data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError): except (KeyError, ValueError):
data.pop('created', None) data.pop('created', None)
if settings.DEBUG: if 0 and settings.DEBUG:
print data print data
for key in data.keys(): for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'): if key not in ('job_id', 'event', 'event_data', 'created'):
@@ -567,6 +556,11 @@ class SaveJobEvents(Task):
data['task'] = data.get('event_data', {}).get('task', '').strip() data['task'] = data.get('event_data', {}).get('task', '').strip()
duplicate = False duplicate = False
if event != '__complete__': if event != '__complete__':
while True:
try:
# Commit any outstanding events before saving stats.
if event == 'playbook_on_stats':
transaction.commit()
if not JobEvent.objects.filter(**data).exists(): if not JobEvent.objects.filter(**data).exists():
job_event = JobEvent(**data) job_event = JobEvent(**data)
job_event.save(post_process=True) job_event.save(post_process=True)
@@ -574,25 +568,41 @@ class SaveJobEvents(Task):
transaction.commit() transaction.commit()
else: else:
duplicate = True duplicate = True
if settings.DEBUG: if 0 and settings.DEBUG:
print 'skipping duplicate job event %r' % data print 'skipping duplicate job event %r' % data
break
except DatabaseError as e:
transaction.rollback()
logger.debug("Database error encountered, retrying in 1 second: " + str(e))
time.sleep(1)
if not duplicate: if not duplicate:
if event not in events_received: if event not in events_received:
events_received[event] = 1 events_received[event] = 1
else: else:
events_received[event] += 1 events_received[event] += 1
if settings.DEBUG: if 0 and settings.DEBUG:
print 'saved job event in %0.3fs' % (time.time() - begints) print 'saved job event in %0.3fs' % (time.time() - begints)
message.ack() message.ack()
@transaction.commit_on_success
def run(self, *args, **kwargs):
job_id = kwargs.get('job_id', None)
if not job_id:
return {}
events_received = {}
process_job_event = functools.partial(self.process_job_event,
events_received=events_received)
job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events[%d]' % job_id,
exchange=job_events_exchange,
routing_key=('job_events[%d]' % job_id),
auto_delete=True)
with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn:
with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer: with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer:
while '__complete__' not in events_received: while '__complete__' not in events_received:
conn.drain_events() conn.drain_events()
try:
bound_exchange = job_events_exchange(conn).delete()
except:
pass
return { return {
'job_id': job_id, 'job_id': job_id,

View File

@@ -147,9 +147,10 @@ class CallbackModule(object):
self.job_events_exchange = Exchange('job_events', 'direct', self.job_events_exchange = Exchange('job_events', 'direct',
durable=True) durable=True)
if not hasattr(self, 'job_events_queue'): if not hasattr(self, 'job_events_queue'):
self.job_events_queue = Queue('job_events', self.job_events_queue = Queue('job_events[%d]' % self.job_id,
exchange=self.job_events_exchange, exchange=self.job_events_exchange,
routing_key=('job_events[%d]' % self.job_id)) routing_key=('job_events[%d]' % self.job_id),
auto_delete=True)
msg = { msg = {
'job_id': self.job_id, 'job_id': self.job_id,
'event': event, 'event': event,