Implement a signal handler on the callback receiver. Use the full

receiver when running unit tests
This commit is contained in:
Matthew Jones
2014-02-18 23:01:55 -05:00
parent 398d7633d3
commit 28a1f64d54
2 changed files with 13 additions and 2 deletions

View File

@@ -2,9 +2,11 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import os
import datetime import datetime
import logging import logging
import json import json
import signal
from optparse import make_option from optparse import make_option
from multiprocessing import Process from multiprocessing import Process
@@ -24,6 +26,13 @@ from awx.main.models import *
import zmq import zmq
def run_subscriber(consumer_port, queue_port, use_workers=True): def run_subscriber(consumer_port, queue_port, use_workers=True):
def shutdown_handler(active_workers):
def _handler(signum, frame):
for active_worker in active_workers:
active_worker.terminate()
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
return _handler
consumer_context = zmq.Context() consumer_context = zmq.Context()
consumer_subscriber = consumer_context.socket(zmq.PULL) consumer_subscriber = consumer_context.socket(zmq.PULL)
@@ -37,8 +46,10 @@ def run_subscriber(consumer_port, queue_port, use_workers=True):
workers = [] workers = []
for idx in range(4): for idx in range(4):
w = Process(target=callback_worker, args=(queue_port,)) w = Process(target=callback_worker, args=(queue_port,))
w.daemon = True
w.start() w.start()
workers.append(w) workers.append(w)
signal.signal(signal.SIGTERM, shutdown_handler(workers))
while True: # Handle signal while True: # Handle signal
message = consumer_subscriber.recv_json() message = consumer_subscriber.recv_json()

View File

@@ -368,7 +368,7 @@ class BaseTestMixin(object):
def start_queue(self, consumer_port, queue_port): def start_queue(self, consumer_port, queue_port):
self.queue_process = Process(target=run_subscriber, self.queue_process = Process(target=run_subscriber,
args=(consumer_port, queue_port, False,)) args=(consumer_port, queue_port,))
self.queue_process.start() self.queue_process.start()
def terminate_queue(self): def terminate_queue(self):