From 28a1f64d54d17fcf8df8d03a62b4a95f80689597 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 18 Feb 2014 23:01:55 -0500 Subject: [PATCH] Implement a signal handler on the callback receiver. Use the full receiver when running unit tests --- .../management/commands/run_callback_receiver.py | 13 ++++++++++++- awx/main/tests/base.py | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index f25d1e8d84..f841eda5ef 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -2,9 +2,11 @@ # All Rights Reserved. # Python +import os import datetime import logging import json +import signal from optparse import make_option from multiprocessing import Process @@ -24,7 +26,14 @@ from awx.main.models import * import zmq def run_subscriber(consumer_port, queue_port, use_workers=True): - + def shutdown_handler(active_workers): + def _handler(signum, frame): + for active_worker in active_workers: + active_worker.terminate() + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it + return _handler + consumer_context = zmq.Context() consumer_subscriber = consumer_context.socket(zmq.PULL) consumer_subscriber.bind(consumer_port) @@ -37,8 +46,10 @@ def run_subscriber(consumer_port, queue_port, use_workers=True): workers = [] for idx in range(4): w = Process(target=callback_worker, args=(queue_port,)) + w.daemon = True w.start() workers.append(w) + signal.signal(signal.SIGTERM, shutdown_handler(workers)) while True: # Handle signal message = consumer_subscriber.recv_json() diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index e4ad1b5ade..be01c1c306 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -368,7 +368,7 @@ class BaseTestMixin(object): def start_queue(self, consumer_port, queue_port): self.queue_process = Process(target=run_subscriber, - args=(consumer_port, queue_port, False,)) + args=(consumer_port, queue_port,)) self.queue_process.start() def terminate_queue(self):