diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 0378d27d02..2e5e93766b 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -10,7 +10,7 @@ import json import signal import time from optparse import make_option -from multiprocessing import Process +from multiprocessing import Process, Queue # Django from django.conf import settings @@ -59,12 +59,8 @@ class CallbackReceiver(object): if use_workers: connection.close() for idx in range(WORKERS): - queue_port_actual = queue_port + "-%s" % str(idx) - queue_context = zmq.Context() - queue_publisher = queue_context.socket(zmq.PUSH) - queue_publisher.bind(queue_port_actual) - - w = Process(target=self.callback_worker, args=(queue_port_actual,)) + queue_actual = Queue() + w = Process(target=self.callback_worker, args=(queue_actual,)) w.daemon = True w.start() @@ -72,7 +68,7 @@ class CallbackReceiver(object): signal.signal(signal.SIGTERM, shutdown_handler([w])) if settings.DEBUG: print 'Started worker %s' % str(idx) - worker_queues.append([0, queue_publisher, w]) + worker_queues.append([0, queue_actual, w]) elif settings.DEBUG: print 'Started callback receiver (no workers)' @@ -106,7 +102,7 @@ class CallbackReceiver(object): else: parent = None if parent is not None: - message['parent'] = parent.id + message['parent'] = parent if 'created' in message: del(message['created']) if message['event'] in ('playbook_on_start', 'playbook_on_play_start', @@ -115,18 +111,18 @@ class CallbackReceiver(object): else: if message['event'] == 'playbook_on_stats': job_parent_events = {} - queue_actual = worker_queues[total_messages % WORKERS] - queue_actual[0] += 1 - queue_actual[1].send_json(message) - if queue_actual[0] >= MAX_REQUESTS: - queue_actual[0] = 0 + queue_actual_worker = worker_queues[total_messages % WORKERS] + queue_actual_worker[0] += 1 + queue_actual_worker[1].put(message) + if queue_actual_worker[0] >= MAX_REQUESTS: + queue_actual_worker[0] = 0 print("Recycling worker process") - queue_actual[2].join() + queue_actual_worker[2].join() connection.close() - w = Process(target=self.callback_worker, args=(queue_port + "-%s" % str(total_messages % WORKERS),)) + w = Process(target=self.callback_worker, args=(queue_actual_worker[1],)) w.daemon = True w.start() - queue_actual[2] = w + queue_actual_worker[2] = w last_parent_events[message['job_id']] = job_parent_events consumer_subscriber.send("1") @@ -162,9 +158,9 @@ class CallbackReceiver(object): data['event_data']['res']['invocation']['module_args'] = "" job_event = JobEvent(**data) if parent_id is not None: - job_event.parent = JobEvent.objects.get(id=parent_id) + job_event.parent_id = parent_id job_event.save(post_process=True) - return job_event + return job_event.id except DatabaseError as e: transaction.rollback() print('Database error saving job event, retrying in ' @@ -175,20 +171,14 @@ class CallbackReceiver(object): retry_count) return None - def callback_worker(self, port): + def callback_worker(self, queue_actual): messages_processed = 0 - pool_context = zmq.Context() - pool_subscriber = pool_context.socket(zmq.PULL) - pool_subscriber.connect(port) while True: - message = pool_subscriber.recv_json() + message = queue_actual.get() self.process_job_event(message) messages_processed += 1 if messages_processed >= MAX_REQUESTS: print("Shutting down message receiver") - pool_subscriber.close() - pool_context.term() - time.sleep(0.1) sys.exit(0) class Command(NoArgsCommand):