AC-1015 Added retry count for saving to database, minor retry refactor for publishing job events.

This commit is contained in:
Chris Church
2014-02-09 13:51:40 -05:00
parent a3ee83258c
commit cb8b58b0ae
2 changed files with 23 additions and 15 deletions

View File

@@ -94,7 +94,7 @@ class BaseTask(Task):
# Commit outstanding transaction so that we fetch the latest object # Commit outstanding transaction so that we fetch the latest object
# from the database. # from the database.
transaction.commit() transaction.commit()
while True: for retry_count in xrange(5):
try: try:
instance = self.model.objects.get(pk=pk) instance = self.model.objects.get(pk=pk)
if updates: if updates:
@@ -109,12 +109,16 @@ class BaseTask(Task):
update_fields.append('failed') update_fields.append('failed')
instance.save(update_fields=update_fields) instance.save(update_fields=update_fields)
transaction.commit() transaction.commit()
break return instance
except DatabaseError as e: except DatabaseError as e:
transaction.rollback() transaction.rollback()
logger.debug("Database error encountered, retrying in 5 seconds: " + str(e)) logger.debug('Database error updating %s, retrying in 5 '
'seconds (retry #%d): %s',
self.model._meta.object_name, retry_count + 1, e)
time.sleep(5) time.sleep(5)
return instance else:
logger.error('Failed to update %s after %d retries.',
self.model._meta.object_name, retry_count)
def get_model(self, pk): def get_model(self, pk):
return self.model.objects.get(pk=pk) return self.model.objects.get(pk=pk)
@@ -556,7 +560,7 @@ class SaveJobEvents(Task):
data['task'] = data.get('event_data', {}).get('task', '').strip() data['task'] = data.get('event_data', {}).get('task', '').strip()
duplicate = False duplicate = False
if event != '__complete__': if event != '__complete__':
while True: for retry_count in xrange(11):
try: try:
# Commit any outstanding events before saving stats. # Commit any outstanding events before saving stats.
if event == 'playbook_on_stats': if event == 'playbook_on_stats':
@@ -573,8 +577,12 @@ class SaveJobEvents(Task):
break break
except DatabaseError as e: except DatabaseError as e:
transaction.rollback() transaction.rollback()
logger.debug("Database error encountered, retrying in 1 second: " + str(e)) logger.debug('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1) time.sleep(1)
else:
logger.error('Failed to save job event after %d retries.',
retry_count)
if not duplicate: if not duplicate:
if event not in events_received: if event not in events_received:
events_received[event] = 1 events_received[event] = 1

View File

@@ -161,8 +161,7 @@ class CallbackModule(object):
msg.update({ msg.update({
'pid': os.getpid(), 'pid': os.getpid(),
}) })
retry_count = 0 for retry_count in xrange(4):
while True:
try: try:
if not hasattr(self, 'connection_pid'): if not hasattr(self, 'connection_pid'):
self.connection_pid = os.getpid() self.connection_pid = os.getpid()
@@ -170,14 +169,16 @@ class CallbackModule(object):
self._cleanup_connection() self._cleanup_connection()
if not hasattr(self, 'connection'): if not hasattr(self, 'connection'):
self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True}) self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True})
self.logger.debug('New Connection: %r, retry=%d', self.connection, retry_count) self.logger.debug('New Connection: %r, retry=%d',
self.connection, retry_count)
if not hasattr(self, 'producer'): if not hasattr(self, 'producer'):
channel = self.connection.channel() channel = self.connection.channel()
self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json') self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json')
self.publish = self.connection.ensure(self.producer, self.producer.publish, self.publish = self.connection.ensure(self.producer, self.producer.publish,
errback=self._publish_errback, errback=self._publish_errback,
max_retries=3, interval_start=1, interval_step=1, interval_max=10) max_retries=3, interval_start=1, interval_step=1, interval_max=10)
self.logger.debug('New Producer: %r, retry=%d', self.producer, retry_count) self.logger.debug('New Producer: %r, retry=%d',
self.producer, retry_count)
self.logger.debug('Publish: %r, retry=%d', msg, retry_count) self.logger.debug('Publish: %r, retry=%d', msg, retry_count)
self.publish(msg, exchange=self.job_events_exchange, self.publish(msg, exchange=self.job_events_exchange,
routing_key=('job_events[%d]' % self.job_id), routing_key=('job_events[%d]' % self.job_id),
@@ -186,12 +187,11 @@ class CallbackModule(object):
self._cleanup_connection() self._cleanup_connection()
return return
except Exception, e: except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e, retry_count, exc_info=True) self.logger.info('Publish Exception: %r, retry=%d', e,
if retry_count < 3: retry_count, exc_info=True)
self._cleanup_connection() self._cleanup_connection()
else: if retry_count >= 3:
raise raise
retry_count += 1
def _post_rest_api_event(self, event, event_data): def _post_rest_api_event(self, event, event_data):
data = json.dumps({ data = json.dumps({