diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 8847097fbc..4d36bd0521 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -28,6 +28,7 @@ from awx.main.models import * import zmq MAX_REQUESTS = 20000 +WORKERS = 4 class CallbackReceiver(object): @@ -52,85 +53,85 @@ class CallbackReceiver(object): consumer_subscriber = consumer_context.socket(zmq.REP) consumer_subscriber.bind(consumer_port) - queue_context = zmq.Context() - queue_publisher = queue_context.socket(zmq.PUSH) - queue_publisher.bind(queue_port) + worker_queues = [] if use_workers: - w = Process(target=self.callback_worker, args=(queue_port,)) - w.daemon = True - w.start() - signal.signal(signal.SIGINT, shutdown_handler([w])) - signal.signal(signal.SIGTERM, shutdown_handler([w])) - if settings.DEBUG: - print 'Started worker' + for idx in range(WORKERS): + queue_port_actual = queue_port + "-%s" % str(idx) + queue_context = zmq.Context() + queue_publisher = queue_context.socket(zmq.PUSH) + queue_publisher.bind(queue_port_actual) + + w = Process(target=self.callback_worker, args=(queue_port_actual,)) + w.daemon = True + w.start() + + signal.signal(signal.SIGINT, shutdown_handler([w])) + signal.signal(signal.SIGTERM, shutdown_handler([w])) + if settings.DEBUG: + print 'Started worker %s' % str(idx) + worker_queues.append([0, queue_publisher, w]) elif settings.DEBUG: print 'Started callback receiver (no workers)' message_number = 0 + total_messages = 0 + + last_parent_events = {} + while True: # Handle signal message = consumer_subscriber.recv_json() - message_number += 1 + total_messages += 1 if not use_workers: self.process_job_event(message) else: - queue_publisher.send_json(message) - if message_number >= MAX_REQUESTS: - message_number = 0 - print("Recycling worker process") - w.join() - w = Process(target=self.callback_worker, args=(queue_port,)) - w.daemon = True - w.start() + job_parent_events = last_parent_events.get(message['job_id'], {}) + if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'): + parent = job_parent_events.get('playbook_on_start', None) + elif message['event'] in ('playbook_on_notify', 'playbook_on_setup', + 'playbook_on_task_start', + 'playbook_on_no_hosts_matched', + 'playbook_on_no_hosts_remaining', + 'playbook_on_import_for_host', + 'playbook_on_not_import_for_host'): + parent = job_parent_events.get('playbook_on_play_start', None) + elif message['event'].startswith('runner_on_'): + list_parents = [] + list_parents.append(job_parent_events.get('playbook_on_setup', None)) + list_parents.append(job_parent_events.get('playbook_on_task_start', None)) + list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id-x.id) + parent = list_parents[0] if len(list_parents) > 0 else None + else: + parent = None + if parent is not None: + message['parent'] = parent.id + if 'created' in message: + del(message['created']) + if message['event'] in ('playbook_on_start', 'playbook_on_play_start', + 'playbook_on_setup', 'playbook_on_task_start'): + job_parent_events[message['event']] = self.process_job_event(message) + else: + if message['event'] == 'playbook_on_stats': + job_parent_events = {} + queue_actual = worker_queues[total_messages % WORKERS] + queue_actual[0] += 1 + queue_actual[1].send_json(message) + if queue_actual[0] >= MAX_REQUESTS: + queue_actual[0] = 0 + print("Recycling worker process") + queue_actual[2].join() + w = Process(target=self.callback_worker, args=(queue_port + "-%s" % str(total_messages % WORKERS),)) + w.daemon = True + w.start() + queue_actual[2] = w + last_parent_events[message['job_id']] = job_parent_events consumer_subscriber.send("1") - # NOTE: This cache doesn't work too terribly well but it can help prevent database queries - # we may want to use something like memcached here instead - def process_parent_cache(self, job_id, event_object): - if event_object.event not in ('playbook_on_start', 'playbook_on_play_start', 'playbook_on_setup', 'playbook_on_task_start'): - return - if job_id not in self.parent_mappings: - self.parent_mappings[job_id] = {} - if event_object.event not in self.parent_mappings[job_id]: - self.parent_mappings[job_id][event_object.event] = {event_object.id: event_object} - else: - self.parent_mappings[job_id][event_object.event][event_object.id] = event_object - - def find_parent(self, job_id, event_object): - if job_id not in self.parent_mappings: - return None - job_parent_mappings = self.parent_mappings[job_id] - search_events = set() - if event_object.event in ('playbook_on_play_start', 'playbook_on_stats', - 'playbook_on_vars_prompt'): - search_events.add('playbook_on_start') - elif event_object.event in ('playbook_on_notify', 'playbook_on_setup', - 'playbook_on_task_start', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host'): - search_events.add('playbook_on_play_start') - elif event_object.event.startswith('runner_on_'): - search_events.add('playbook_on_setup') - search_events.add('playbook_on_task_start') - potential_events = [] - for event_type in search_events: - potential_events.extend([e for e in self.parent_mappings[job_id][event_type].values()] if \ - event_type in self.parent_mappings[job_id] else []) - potential_events = sorted(potential_events, cmp=lambda x,y: y.id-x.id) - if len(potential_events) > 0: - return potential_events[0] - return None - - def check_purge_parent_cache(self, job_id, event_object): - if event_object.event == 'playbook_on_stats': - del(self.parent_mappings[job_id]) - @transaction.commit_on_success def process_job_event(self, data): event = data.get('event', '') + parent_id = data.get('parent', None) if not event or 'job_id' not in data: return try: @@ -150,13 +151,10 @@ class CallbackReceiver(object): if event == 'playbook_on_stats': transaction.commit() job_event = JobEvent(**data) - job_event.parent = self.find_parent(data['job_id'], job_event) + if parent_id is not None: + job_event.parent = JobEvent.objects.get(id=parent_id) job_event.save(post_process=True) - self.process_parent_cache(data['job_id'], job_event) - self.check_purge_parent_cache(data['job_id'], job_event) - if not event.startswith('runner_'): - transaction.commit() - break + return job_event except DatabaseError as e: transaction.rollback() print('Database error saving job event, retrying in ' @@ -165,6 +163,7 @@ class CallbackReceiver(object): else: print('Failed to save job event after %d retries.', retry_count) + return None def callback_worker(self, port): messages_processed = 0