diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index ff84bf3701..014d9dac61 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -24,6 +24,9 @@ from awx.main.models import * import zmq class Worker(Process): + ''' + Process to validate and store save job events received via zeromq + ''' def run(self): print("Starting worker") @@ -36,7 +39,6 @@ class Worker(Process): @transaction.commit_on_success def process_job_event(self, data): - print("Received data: %s" % str(data)) event = data.get('event', '') if not event or 'job_id' not in data: return @@ -58,20 +60,15 @@ class Worker(Process): try: if event == 'playbook_on_stats': transaction.commit() - if not JobEvent.objects.filter(**data).exists(): - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - else: - duplicate = True - if settings.DEBUG: - print 'skipping duplicate job event %r' % data + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() break except DatabaseError as e: transaction.rollback() - # logger.debug('Database error saving job event, retrying in ' - # '1 second (retry #%d): %s', retry_count + 1, e) + logger.debug('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) else: logger.error('Failed to save job event after %d retries.', @@ -80,9 +77,11 @@ class Worker(Process): class Command(NoArgsCommand): ''' - Management command to run the job callback receiver + Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback) + Runs as a management command and receives job save events. It then hands + them off to worker processors (see Worker) which writes them to the database ''' - + help = 'Launch the job callback receiver' option_list = NoArgsCommand.option_list + ( @@ -124,4 +123,4 @@ class Command(NoArgsCommand): def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() - self.run_subscriber() + self.run_subscriber(port=options.get('port')) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1cafa4463a..1e893aaba9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -533,91 +533,6 @@ class RunJob(BaseTask): for job_event in job.job_events.order_by('pk'): job_event.save(post_process=True) - -class SaveJobEvents(Task): - - name = 'awx.main.tasks.save_job_events' - - def process_job_event(self, data, message, events_received=None): - if events_received is None: - events_received = {} - begints = time.time() - event = data.get('event', '') - if not event or 'job_id' not in data: - return - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - if settings.DEBUG: - print data - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) - data['play'] = data.get('event_data', {}).get('play', '').strip() - data['task'] = data.get('event_data', {}).get('task', '').strip() - duplicate = False - if event != '__complete__': - for retry_count in xrange(11): - try: - # Commit any outstanding events before saving stats. - if event == 'playbook_on_stats': - transaction.commit() - if not JobEvent.objects.filter(**data).exists(): - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - else: - duplicate = True - if settings.DEBUG: - print 'skipping duplicate job event %r' % data - break - except DatabaseError as e: - transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) - time.sleep(1) - else: - logger.error('Failed to save job event after %d retries.', - retry_count) - if not duplicate: - if event not in events_received: - events_received[event] = 1 - else: - events_received[event] += 1 - if settings.DEBUG: - print 'saved job event in %0.3fs' % (time.time() - begints) - message.ack() - - @transaction.commit_on_success - def run(self, *args, **kwargs): - job_id = kwargs.get('job_id', None) - if not job_id: - return {} - - events_received = {} - process_job_event = functools.partial(self.process_job_event, - events_received=events_received) - - job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events[%d]' % job_id, - exchange=job_events_exchange, - routing_key=('job_events[%d]' % job_id), - auto_delete=True) - with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: - with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer: - while '__complete__' not in events_received: - conn.drain_events() - - return { - 'job_id': job_id, - 'total_events': sum(events_received.values())} - - class RunProjectUpdate(BaseTask): name = 'awx.main.tasks.run_project_update' diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 4c33ed8345..38c299706f 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -117,15 +117,16 @@ class CallbackModule(object): 'event_data': event_data, 'created': datetime.datetime.utcnow().isoformat(), } + active_pid = os.getpid() if self.job_callback_debug: msg.update({ - 'pid': os.getpid(), + 'pid': active_pid, }) for retry_count in xrange(4): try: if not hasattr(self, 'connection_pid'): - self.connection_pid = os.getpid() - if self.connection_pid != os.getpid(): + self.connection_pid = active_pid + if self.connection_pid != active_pid: self._init_connection() if self.context is None: self._start_connection()