AC-990 Updated callback plugin and related task to prevent missed/duplicate job events, catch exceptions and retry. Fixed task run_pexect method so that canceling a job will work.

This commit is contained in:
Chris Church
2014-02-04 17:38:31 -05:00
parent d08319100d
commit 91cc144e87
3 changed files with 86 additions and 38 deletions

View File

@@ -731,8 +731,8 @@ class JobEvent(BaseModel):
update_fields.append('host') update_fields.append('host')
except (Host.DoesNotExist, AttributeError): except (Host.DoesNotExist, AttributeError):
pass pass
self.play = self.event_data.get('play', '') self.play = self.event_data.get('play', '').strip()
self.task = self.event_data.get('task', '') self.task = self.event_data.get('task', '').strip()
self.parent = self._find_parent() self.parent = self._find_parent()
update_fields.extend(['play', 'task', 'parent']) update_fields.extend(['play', 'task', 'parent'])
# Manually perform auto_now_add and auto_now logic (to allow overriding # Manually perform auto_now_add and auto_now logic (to allow overriding

View File

@@ -108,7 +108,7 @@ class BaseTask(Task):
update_fields.append('failed') update_fields.append('failed')
instance.save(update_fields=update_fields) instance.save(update_fields=update_fields)
transaction.commit() transaction.commit()
save_succeeded = True save_succeeded = True
except DatabaseError as e: except DatabaseError as e:
transaction.rollback() transaction.rollback()
logger.debug("Database error encountered, retrying in 5 seconds: " + str(e)) logger.debug("Database error encountered, retrying in 5 seconds: " + str(e))
@@ -246,7 +246,8 @@ class BaseTask(Task):
expect_list.append(item[0]) expect_list.append(item[0])
expect_passwords[n] = passwords.get(item[1], '') or '' expect_passwords[n] = passwords.get(item[1], '') or ''
expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) expect_list.extend([pexpect.TIMEOUT, pexpect.EOF])
self.update_model(instance.pk, status='running', output_replacements=output_replacements) instance = self.update_model(instance.pk, status='running',
output_replacements=output_replacements)
while child.isalive(): while child.isalive():
result_id = child.expect(expect_list, timeout=pexpect_timeout) result_id = child.expect(expect_list, timeout=pexpect_timeout)
if result_id in expect_passwords: if result_id in expect_passwords:
@@ -255,9 +256,10 @@ class BaseTask(Task):
logfile_pos = logfile.tell() logfile_pos = logfile.tell()
last_stdout_update = time.time() last_stdout_update = time.time()
# NOTE: In case revoke doesn't have an affect # NOTE: In case revoke doesn't have an affect
instance = self.update_model(instance.pk)
if instance.cancel_flag: if instance.cancel_flag:
child.close(True) child.terminate(canceled)
canceled = True canceled = True
if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: if idle_timeout and (time.time() - last_stdout_update) > idle_timeout:
child.close(True) child.close(True)
canceled = True canceled = True
@@ -376,6 +378,8 @@ class RunJob(BaseTask):
env['REST_API_TOKEN'] = job.task_auth_token or '' env['REST_API_TOKEN'] = job.task_auth_token or ''
if settings.BROKER_URL.startswith('amqp://'): if settings.BROKER_URL.startswith('amqp://'):
env['BROKER_URL'] = settings.BROKER_URL env['BROKER_URL'] = settings.BROKER_URL
if settings.DEBUG:
env['JOB_CALLBACK_DEBUG'] = '1'
# When using Ansible >= 1.3, allow the inventory script to include host # When using Ansible >= 1.3, allow the inventory script to include host
# variables inline via ['_meta']['hostvars']. # variables inline via ['_meta']['hostvars'].
@@ -490,7 +494,7 @@ class RunJob(BaseTask):
job_events_exchange = Exchange('job_events', 'direct', durable=True) job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events', exchange=job_events_exchange, job_events_queue = Queue('job_events', exchange=job_events_exchange,
routing_key=('job_events[%d]' % job.id)) routing_key=('job_events[%d]' % job.id))
with Connection(settings.BROKER_URL) as conn: with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn:
with conn.Producer(serializer='json') as producer: with conn.Producer(serializer='json') as producer:
msg = { msg = {
'job_id': job.id, 'job_id': job.id,
@@ -515,7 +519,7 @@ class SaveJobEvents(Task):
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
job_id = kwargs.get('job_id', None) job_id = kwargs.get('job_id', None)
if not job_id: if not job_id:
return {'job_id': job_id} return {}
job_events_exchange = Exchange('job_events', 'direct', durable=True) job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events', exchange=job_events_exchange, job_events_queue = Queue('job_events', exchange=job_events_exchange,
@@ -525,11 +529,8 @@ class SaveJobEvents(Task):
def process_job_event(data, message): def process_job_event(data, message):
begints = time.time() begints = time.time()
event = data.get('event', '') event = data.get('event', '')
if not event: if not event or 'job_id' not in data:
return return
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'):
data.pop(key)
try: try:
if not isinstance(data['created'], datetime.datetime): if not isinstance(data['created'], datetime.datetime):
data['created'] = parse_datetime(data['created']) data['created'] = parse_datetime(data['created'])
@@ -539,19 +540,32 @@ class SaveJobEvents(Task):
data.pop('created', None) data.pop('created', None)
if settings.DEBUG: if settings.DEBUG:
print data print data
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'):
data.pop(key)
data['play'] = data.get('event_data', {}).get('play', '').strip()
data['task'] = data.get('event_data', {}).get('task', '').strip()
duplicate = False
if event != '__complete__': if event != '__complete__':
job_event = JobEvent(**data) if not JobEvent.objects.filter(**data).exists():
job_event.save(post_process=True) job_event = JobEvent(**data)
transaction.commit() job_event.save(post_process=True)
if event not in events_received: if not event.startswith('runner_'):
events_received[event] = 1 transaction.commit()
else: else:
events_received[event] += 1 duplicate = True
if settings.DEBUG:
print 'skipping duplicate job event %r' % data
if not duplicate:
if event not in events_received:
events_received[event] = 1
else:
events_received[event] += 1
if settings.DEBUG:
print 'saved job event in %0.3fs' % (time.time() - begints)
message.ack() message.ack()
if settings.DEBUG:
print 'saved job event in %0.3fs' % (time.time() - begints)
with Connection(settings.BROKER_URL) as conn: with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn:
with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer: with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer:
while '__complete__' not in events_received: while '__complete__' not in events_received:
conn.drain_events() conn.drain_events()
@@ -562,7 +576,6 @@ class SaveJobEvents(Task):
return { return {
'job_id': job_id, 'job_id': job_id,
#'events_received': events_received,
'total_events': sum(events_received.values())} 'total_events': sum(events_received.values())}

View File

@@ -91,6 +91,10 @@ class CallbackModule(object):
self.base_url = os.getenv('REST_API_URL', '') self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '') self.auth_token = os.getenv('REST_API_TOKEN', '')
self.broker_url = os.getenv('BROKER_URL', '') self.broker_url = os.getenv('BROKER_URL', '')
self.job_callback_debug = os.getenv('JOB_CALLBACK_DEBUG', '')
def __del__(self):
self._cleanup_connection()
def _start_save_job_events_task(self): def _start_save_job_events_task(self):
app = Celery('tasks', broker=self.broker_url) app = Celery('tasks', broker=self.broker_url)
@@ -98,6 +102,24 @@ class CallbackModule(object):
'job_id': self.job_id, 'job_id': self.job_id,
}, serializer='json') }, serializer='json')
def _publish_errback(self, exc, interval):
if self.job_callback_debug:
print 'Publish Error: %r, retry in %s seconds, pid=%s' % (exc, interval, os.getpid())
def _cleanup_connection(self):
if hasattr(self, 'producer'):
try:
self.producer.cancel()
except:
pass
del self.producer
if hasattr(self, 'connection'):
try:
self.connection.release()
except:
pass
del self.connection
def _post_job_event_queue_msg(self, event, event_data): def _post_job_event_queue_msg(self, event, event_data):
if not hasattr(self, 'job_events_exchange'): if not hasattr(self, 'job_events_exchange'):
self.job_events_exchange = Exchange('job_events', 'direct', self.job_events_exchange = Exchange('job_events', 'direct',
@@ -106,28 +128,41 @@ class CallbackModule(object):
self.job_events_queue = Queue('job_events', self.job_events_queue = Queue('job_events',
exchange=self.job_events_exchange, exchange=self.job_events_exchange,
routing_key=('job_events[%d]' % self.job_id)) routing_key=('job_events[%d]' % self.job_id))
if not hasattr(self, 'connection'):
self.connection = Connection(self.broker_url)
if not hasattr(self, 'producer'):
self.producer = self.connection.Producer(serializer='json')
msg = { msg = {
'job_id': self.job_id, 'job_id': self.job_id,
'event': event, 'event': event,
'event_data': event_data, 'event_data': event_data,
'created': datetime.datetime.utcnow().isoformat(), 'created': datetime.datetime.utcnow().isoformat(),
} }
self.producer.publish(msg, exchange=self.job_events_exchange, if self.job_callback_debug:
routing_key=('job_events[%d]' % self.job_id), msg.update({
declare=[self.job_events_queue]) 'pid': os.getpid(),
if event == 'playbook_on_stats': })
retry_count = 0
while True:
try: try:
self.producer.cancel() if not hasattr(self, 'connection'):
del self.producer self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True})
self.connection.release() if not hasattr(self, 'producer'):
del self.connection channel = self.connection.channel()
except: self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json')
pass self.publish = self.connection.ensure(self.producer, self.producer.publish,
errback=self._publish_errback,
max_retries=3, interval_start=1, interval_step=1, interval_max=10)
self.publish(msg, exchange=self.job_events_exchange,
routing_key=('job_events[%d]' % self.job_id),
declare=[self.job_events_queue])
if event == 'playbook_on_stats':
self._cleanup_connection()
return
except Exception, e:
if self.job_callback_debug:
print 'Publish Exception: %r, pid=%s, retry=%d' % (e, os.getpid(), retry_count)
if retry_count < 3:
self._cleanup_connection()
else:
raise
retry_count += 1
def _post_rest_api_event(self, event, event_data): def _post_rest_api_event(self, event, event_data):
data = json.dumps({ data = json.dumps({