From a4ae6567dc410483c7e130df13ba912af04cec91 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Sun, 9 Feb 2014 05:58:33 -0500 Subject: [PATCH] AC-1015 Update queue names so that job events are only received by the right save job events task. Add database retry to save job events task. --- awx/main/tasks.py | 124 +++++++++++---------- awx/plugins/callback/job_event_callback.py | 5 +- 2 files changed, 70 insertions(+), 59 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d198a406e0..08df875763 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -6,6 +6,7 @@ import ConfigParser import cStringIO import datetime import distutils.version +import functools import json import logging import os @@ -93,7 +94,6 @@ class BaseTask(Task): # Commit outstanding transaction so that we fetch the latest object # from the database. transaction.commit() - save_succeeded = True while True: try: instance = self.model.objects.get(pk=pk) @@ -109,15 +109,11 @@ class BaseTask(Task): update_fields.append('failed') instance.save(update_fields=update_fields) transaction.commit() - save_succeeded = True + break except DatabaseError as e: transaction.rollback() logger.debug("Database error encountered, retrying in 5 seconds: " + str(e)) time.sleep(5) - save_succeeded = False - finally: - if save_succeeded: - break return instance def get_model(self, pk): @@ -508,12 +504,14 @@ class RunJob(BaseTask): Hook for actions to run after job/task has completed. ''' super(RunJob, self).post_run_hook(job, **kwargs) - # Send a special message to this job's event queue to make sure the - # save job events task exits. + # Send a special message to this job's event queue after the job has run + # to tell the save job events task to end. if settings.BROKER_URL.startswith('amqp://'): job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events', exchange=job_events_exchange, - routing_key=('job_events[%d]' % job.id)) + job_events_queue = Queue('job_events[%d]' % job.id, + exchange=job_events_exchange, + routing_key=('job_events[%d]' % job.id), + auto_delete=True) with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: with conn.Producer(serializer='json') as producer: msg = { @@ -535,64 +533,76 @@ class SaveJobEvents(Task): name = 'awx.main.tasks.save_job_events' + def process_job_event(self, data, message, events_received=None): + if events_received is None: + events_received = {} + begints = time.time() + event = data.get('event', '') + if not event or 'job_id' not in data: + return + try: + if not isinstance(data['created'], datetime.datetime): + data['created'] = parse_datetime(data['created']) + if not data['created'].tzinfo: + data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + except (KeyError, ValueError): + data.pop('created', None) + if 0 and settings.DEBUG: + print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + data['play'] = data.get('event_data', {}).get('play', '').strip() + data['task'] = data.get('event_data', {}).get('task', '').strip() + duplicate = False + if event != '__complete__': + while True: + try: + # Commit any outstanding events before saving stats. + if event == 'playbook_on_stats': + transaction.commit() + if not JobEvent.objects.filter(**data).exists(): + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() + else: + duplicate = True + if 0 and settings.DEBUG: + print 'skipping duplicate job event %r' % data + break + except DatabaseError as e: + transaction.rollback() + logger.debug("Database error encountered, retrying in 1 second: " + str(e)) + time.sleep(1) + if not duplicate: + if event not in events_received: + events_received[event] = 1 + else: + events_received[event] += 1 + if 0 and settings.DEBUG: + print 'saved job event in %0.3fs' % (time.time() - begints) + message.ack() + @transaction.commit_on_success def run(self, *args, **kwargs): job_id = kwargs.get('job_id', None) if not job_id: return {} - job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events', exchange=job_events_exchange, - routing_key=('job_events[%d]' % job_id)) - events_received = {} - def process_job_event(data, message): - begints = time.time() - event = data.get('event', '') - if not event or 'job_id' not in data: - return - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - if settings.DEBUG: - print data - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) - data['play'] = data.get('event_data', {}).get('play', '').strip() - data['task'] = data.get('event_data', {}).get('task', '').strip() - duplicate = False - if event != '__complete__': - if not JobEvent.objects.filter(**data).exists(): - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - else: - duplicate = True - if settings.DEBUG: - print 'skipping duplicate job event %r' % data - if not duplicate: - if event not in events_received: - events_received[event] = 1 - else: - events_received[event] += 1 - if settings.DEBUG: - print 'saved job event in %0.3fs' % (time.time() - begints) - message.ack() - + process_job_event = functools.partial(self.process_job_event, + events_received=events_received) + + job_events_exchange = Exchange('job_events', 'direct', durable=True) + job_events_queue = Queue('job_events[%d]' % job_id, + exchange=job_events_exchange, + routing_key=('job_events[%d]' % job_id), + auto_delete=True) with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer: while '__complete__' not in events_received: conn.drain_events() - try: - bound_exchange = job_events_exchange(conn).delete() - except: - pass return { 'job_id': job_id, diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 84da81d1ca..652b9c5ce7 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -147,9 +147,10 @@ class CallbackModule(object): self.job_events_exchange = Exchange('job_events', 'direct', durable=True) if not hasattr(self, 'job_events_queue'): - self.job_events_queue = Queue('job_events', + self.job_events_queue = Queue('job_events[%d]' % self.job_id, exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id)) + routing_key=('job_events[%d]' % self.job_id), + auto_delete=True) msg = { 'job_id': self.job_id, 'event': event,