From e99466efc22e1acd982f507a735f2d6f2239e37a Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 17 Nov 2014 13:27:48 -0500 Subject: [PATCH] Fix a bug in the callback receiver where we weren't properly initializing the parent signal handler --- .../commands/run_callback_receiver.py | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 304f3eefd7..6f9f9ed837 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -41,10 +41,13 @@ class CallbackReceiver(object): def run_subscriber(self, use_workers=True): def shutdown_handler(active_workers): def _handler(signum, frame): - for active_worker in active_workers: - active_worker.terminate() - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it + try: + for active_worker in active_workers: + active_worker.terminate() + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it + except Exception, e: + os.system('echo "exception %s" > /tmp/foob' % str(e)) return _handler def check_pre_handle(data): event = data.get('event', '') @@ -60,9 +63,6 @@ class CallbackReceiver(object): queue_actual = Queue() w = Process(target=self.callback_worker, args=(queue_actual,)) w.start() - - signal.signal(signal.SIGINT, shutdown_handler([w])) - signal.signal(signal.SIGTERM, shutdown_handler([w])) if settings.DEBUG: self.print_log('Started worker %s' % str(idx)) worker_queues.append([0, queue_actual, w]) @@ -76,12 +76,13 @@ class CallbackReceiver(object): main_process.daemon = True main_process.start() - signal.signal(signal.SIGINT, shutdown_handler([main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([main_process])) - + signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) + signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) while True: + workers_changed = False for queue_worker in worker_queues: if not queue_worker[2].is_alive(): + workers_changed = True queue_worker[2].join() w = Process(target=self.callback_worker, args=(queue_worker[1],)) w.daemon = True @@ -89,6 +90,9 @@ class CallbackReceiver(object): signal.signal(signal.SIGINT, shutdown_handler([w])) signal.signal(signal.SIGTERM, shutdown_handler([w])) queue_worker[2] = w + if workers_changed: + signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) + signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) if not main_process.is_alive(): sys.exit(1) time.sleep(0.1) @@ -134,7 +138,11 @@ class CallbackReceiver(object): job_parent_events = {} queue_actual_worker = worker_queues[total_messages % WORKERS] queue_actual_worker[0] += 1 - queue_actual_worker[1].put(message) + try: + queue_actual_worker[1].put(message, block=True, timeout=2) + except Exception: + print("Queue workers went away, continuing...") + continue if queue_actual_worker[0] >= MAX_REQUESTS: queue_actual_worker[0] = 0 # print("Recycling worker process") @@ -221,7 +229,10 @@ class CallbackReceiver(object): def callback_worker(self, queue_actual): messages_processed = 0 while True: - message = queue_actual.get() + try: + message = queue_actual.get(block=True, timeout=1) + except Exception: + continue self.process_job_event(message) messages_processed += 1 if messages_processed >= MAX_REQUESTS: