diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index c19238a51e..3eeadaa8b0 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -50,10 +50,6 @@ class CallbackReceiver(object): return True return False - self.consumer_context = zmq.Context() - self.consumer_subscriber = self.consumer_context.socket(zmq.REP) - self.consumer_subscriber.bind(consumer_port) - worker_queues = [] if use_workers: @@ -61,7 +57,6 @@ class CallbackReceiver(object): for idx in range(WORKERS): queue_actual = Queue() w = Process(target=self.callback_worker, args=(queue_actual,)) - w.daemon = True w.start() signal.signal(signal.SIGINT, shutdown_handler([w])) @@ -72,10 +67,34 @@ class CallbackReceiver(object): elif settings.DEBUG: print 'Started callback receiver (no workers)' + main_process = Process(target=self.callback_handler, args=(use_workers, consumer_port, worker_queues,)) + main_process.daemon = True + main_process.start() + + signal.signal(signal.SIGINT, shutdown_handler([main_process])) + signal.signal(signal.SIGTERM, shutdown_handler([main_process])) + + while True: + for queue_worker in worker_queues: + if not queue_worker[2].is_alive(): + queue_worker[2].join() + w = Process(target=self.callback_worker, args=(queue_worker[1],)) + w.daemon = True + w.start() + signal.signal(signal.SIGINT, shutdown_handler([w])) + signal.signal(signal.SIGTERM, shutdown_handler([w])) + queue_worker[2] = w + if not main_process.is_alive(): + sys.exit(1) + time.sleep(0.1) + + def callback_handler(self, use_workers, consumer_port, worker_queues): message_number = 0 total_messages = 0 - last_parent_events = {} + self.consumer_context = zmq.Context() + self.consumer_subscriber = self.consumer_context.socket(zmq.REP) + self.consumer_subscriber.bind(consumer_port) while True: # Handle signal message = self.consumer_subscriber.recv_json() @@ -116,17 +135,17 @@ class CallbackReceiver(object): queue_actual_worker[1].put(message) if queue_actual_worker[0] >= MAX_REQUESTS: queue_actual_worker[0] = 0 - print("Recycling worker process") - queue_actual_worker[2].join() - connection.close() - w = Process(target=self.callback_worker, args=(queue_actual_worker[1],)) - w.daemon = True - w.start() + # print("Recycling worker process") + # queue_actual_worker[2].join() + # connection.close() + # w = Process(target=self.callback_worker, args=(queue_actual_worker[1],)) + # w.daemon = True + # w.start() - signal.signal(signal.SIGINT, shutdown_handler([w])) - signal.signal(signal.SIGTERM, shutdown_handler([w])) + # signal.signal(signal.SIGINT, shutdown_handler([w])) + # signal.signal(signal.SIGTERM, shutdown_handler([w])) - queue_actual_worker[2] = w + # queue_actual_worker[2] = w last_parent_events[message['job_id']] = job_parent_events self.consumer_subscriber.send("1") @@ -206,7 +225,7 @@ class CallbackReceiver(object): messages_processed += 1 if messages_processed >= MAX_REQUESTS: print("Shutting down message receiver") - sys.exit(0) + break class Command(NoArgsCommand): '''