diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index be61fb3411..a7b4d9cd13 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -731,8 +731,8 @@ class JobEvent(BaseModel): update_fields.append('host') except (Host.DoesNotExist, AttributeError): pass - self.play = self.event_data.get('play', '') - self.task = self.event_data.get('task', '') + self.play = self.event_data.get('play', '').strip() + self.task = self.event_data.get('task', '').strip() self.parent = self._find_parent() update_fields.extend(['play', 'task', 'parent']) # Manually perform auto_now_add and auto_now logic (to allow overriding diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 21cadb9aee..e709d86773 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -108,7 +108,7 @@ class BaseTask(Task): update_fields.append('failed') instance.save(update_fields=update_fields) transaction.commit() - save_succeeded = True + save_succeeded = True except DatabaseError as e: transaction.rollback() logger.debug("Database error encountered, retrying in 5 seconds: " + str(e)) @@ -246,7 +246,8 @@ class BaseTask(Task): expect_list.append(item[0]) expect_passwords[n] = passwords.get(item[1], '') or '' expect_list.extend([pexpect.TIMEOUT, pexpect.EOF]) - self.update_model(instance.pk, status='running', output_replacements=output_replacements) + instance = self.update_model(instance.pk, status='running', + output_replacements=output_replacements) while child.isalive(): result_id = child.expect(expect_list, timeout=pexpect_timeout) if result_id in expect_passwords: @@ -255,9 +256,10 @@ class BaseTask(Task): logfile_pos = logfile.tell() last_stdout_update = time.time() # NOTE: In case revoke doesn't have an affect + instance = self.update_model(instance.pk) if instance.cancel_flag: - child.close(True) - canceled = True + child.terminate(canceled) + canceled = True if idle_timeout and (time.time() - last_stdout_update) > idle_timeout: child.close(True) canceled = True @@ -376,6 +378,8 @@ class RunJob(BaseTask): env['REST_API_TOKEN'] = job.task_auth_token or '' if settings.BROKER_URL.startswith('amqp://'): env['BROKER_URL'] = settings.BROKER_URL + if settings.DEBUG: + env['JOB_CALLBACK_DEBUG'] = '1' # When using Ansible >= 1.3, allow the inventory script to include host # variables inline via ['_meta']['hostvars']. @@ -490,7 +494,7 @@ class RunJob(BaseTask): job_events_exchange = Exchange('job_events', 'direct', durable=True) job_events_queue = Queue('job_events', exchange=job_events_exchange, routing_key=('job_events[%d]' % job.id)) - with Connection(settings.BROKER_URL) as conn: + with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: with conn.Producer(serializer='json') as producer: msg = { 'job_id': job.id, @@ -515,7 +519,7 @@ class SaveJobEvents(Task): def run(self, *args, **kwargs): job_id = kwargs.get('job_id', None) if not job_id: - return {'job_id': job_id} + return {} job_events_exchange = Exchange('job_events', 'direct', durable=True) job_events_queue = Queue('job_events', exchange=job_events_exchange, @@ -525,11 +529,8 @@ class SaveJobEvents(Task): def process_job_event(data, message): begints = time.time() event = data.get('event', '') - if not event: + if not event or 'job_id' not in data: return - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) try: if not isinstance(data['created'], datetime.datetime): data['created'] = parse_datetime(data['created']) @@ -539,19 +540,32 @@ class SaveJobEvents(Task): data.pop('created', None) if settings.DEBUG: print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + data['play'] = data.get('event_data', {}).get('play', '').strip() + data['task'] = data.get('event_data', {}).get('task', '').strip() + duplicate = False if event != '__complete__': - job_event = JobEvent(**data) - job_event.save(post_process=True) - transaction.commit() - if event not in events_received: - events_received[event] = 1 - else: - events_received[event] += 1 + if not JobEvent.objects.filter(**data).exists(): + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() + else: + duplicate = True + if settings.DEBUG: + print 'skipping duplicate job event %r' % data + if not duplicate: + if event not in events_received: + events_received[event] = 1 + else: + events_received[event] += 1 + if settings.DEBUG: + print 'saved job event in %0.3fs' % (time.time() - begints) message.ack() - if settings.DEBUG: - print 'saved job event in %0.3fs' % (time.time() - begints) - with Connection(settings.BROKER_URL) as conn: + with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer: while '__complete__' not in events_received: conn.drain_events() @@ -562,7 +576,6 @@ class SaveJobEvents(Task): return { 'job_id': job_id, - #'events_received': events_received, 'total_events': sum(events_received.values())} diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 0e17c91eb4..adec38751d 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -91,6 +91,10 @@ class CallbackModule(object): self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') self.broker_url = os.getenv('BROKER_URL', '') + self.job_callback_debug = os.getenv('JOB_CALLBACK_DEBUG', '') + + def __del__(self): + self._cleanup_connection() def _start_save_job_events_task(self): app = Celery('tasks', broker=self.broker_url) @@ -98,6 +102,24 @@ class CallbackModule(object): 'job_id': self.job_id, }, serializer='json') + def _publish_errback(self, exc, interval): + if self.job_callback_debug: + print 'Publish Error: %r, retry in %s seconds, pid=%s' % (exc, interval, os.getpid()) + + def _cleanup_connection(self): + if hasattr(self, 'producer'): + try: + self.producer.cancel() + except: + pass + del self.producer + if hasattr(self, 'connection'): + try: + self.connection.release() + except: + pass + del self.connection + def _post_job_event_queue_msg(self, event, event_data): if not hasattr(self, 'job_events_exchange'): self.job_events_exchange = Exchange('job_events', 'direct', @@ -106,28 +128,41 @@ class CallbackModule(object): self.job_events_queue = Queue('job_events', exchange=self.job_events_exchange, routing_key=('job_events[%d]' % self.job_id)) - if not hasattr(self, 'connection'): - self.connection = Connection(self.broker_url) - if not hasattr(self, 'producer'): - self.producer = self.connection.Producer(serializer='json') - msg = { 'job_id': self.job_id, 'event': event, 'event_data': event_data, 'created': datetime.datetime.utcnow().isoformat(), } - self.producer.publish(msg, exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id), - declare=[self.job_events_queue]) - if event == 'playbook_on_stats': + if self.job_callback_debug: + msg.update({ + 'pid': os.getpid(), + }) + retry_count = 0 + while True: try: - self.producer.cancel() - del self.producer - self.connection.release() - del self.connection - except: - pass + if not hasattr(self, 'connection'): + self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True}) + if not hasattr(self, 'producer'): + channel = self.connection.channel() + self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json') + self.publish = self.connection.ensure(self.producer, self.producer.publish, + errback=self._publish_errback, + max_retries=3, interval_start=1, interval_step=1, interval_max=10) + self.publish(msg, exchange=self.job_events_exchange, + routing_key=('job_events[%d]' % self.job_id), + declare=[self.job_events_queue]) + if event == 'playbook_on_stats': + self._cleanup_connection() + return + except Exception, e: + if self.job_callback_debug: + print 'Publish Exception: %r, pid=%s, retry=%d' % (e, os.getpid(), retry_count) + if retry_count < 3: + self._cleanup_connection() + else: + raise + retry_count += 1 def _post_rest_api_event(self, event, event_data): data = json.dumps({