Use transaction atomic for the entire request, just bail on transaction

problems to prevent a deadlock scenario
This commit is contained in:
Matthew Jones 2015-03-13 12:38:45 -04:00
parent cb1787816f
commit 446e15314c

View File

@ -158,7 +158,7 @@ class CallbackReceiver(object):
sys.exit(1)
last_parent_events[message['job_id']] = job_parent_events
@transaction.commit_on_success
@transaction.atomic
def process_job_event(self, data):
# Sanity check: Do we need to do anything at all?
event = data.get('event', '')
@ -197,34 +197,27 @@ class CallbackReceiver(object):
data.pop(key)
# Save any modifications to the job event to the database.
# If we get a database error of some kind, try again.
for retry_count in xrange(11):
try:
# If we're not in verbose mode, wipe out any module
# arguments.
res = data['event_data'].get('res', {})
if isinstance(res, dict):
i = res.get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
# If we get a database error of some kind, bail out.
try:
# If we're not in verbose mode, wipe out any module
# arguments.
res = data['event_data'].get('res', {})
if isinstance(res, dict):
i = res.get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
# Create a new JobEvent object.
job_event = JobEvent(**data)
if parent_id is not None:
job_event.parent = JobEvent.objects.get(id=parent_id)
job_event.save(post_process=True)
# Create a new JobEvent object.
job_event = JobEvent(**data)
if parent_id is not None:
job_event.parent = JobEvent.objects.get(id=parent_id)
job_event.save(post_process=True)
# Retrun the job event object.
return job_event
except DatabaseError as e:
transaction.rollback()
# Log the error and try again.
logger.error('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1)
# We failed too many times, and are giving up.
logger.error('Failed to save job event after %d retries.', retry_count)
# Retrun the job event object.
return job_event
except DatabaseError as e:
# Log the error and try again.
logger.error('Database error saving job event: %s', e)
return None
def callback_worker(self, queue_actual, idx):