Merge pull request #262 from AlanCoding/graceful

Graceful killing of receiver workers
This commit is contained in:
Alan Rominger
2017-08-15 13:11:07 -04:00
committed by GitHub

View File

@@ -9,6 +9,7 @@ from multiprocessing import Process
from multiprocessing import Queue as MPQueue from multiprocessing import Queue as MPQueue
from Queue import Empty as QueueEmpty from Queue import Empty as QueueEmpty
from Queue import Full as QueueFull from Queue import Full as QueueFull
import os
from kombu import Connection, Exchange, Queue from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin from kombu.mixins import ConsumerMixin
@@ -26,6 +27,17 @@ from awx.main.models import * # noqa
logger = logging.getLogger('awx.main.commands.run_callback_receiver') logger = logging.getLogger('awx.main.commands.run_callback_receiver')
class WorkerSignalHandler:
def __init__(self):
self.kill_now = False
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args, **kwargs):
self.kill_now = True
class CallbackBrokerWorker(ConsumerMixin): class CallbackBrokerWorker(ConsumerMixin):
def __init__(self, connection, use_workers=True): def __init__(self, connection, use_workers=True):
self.connection = connection self.connection = connection
@@ -42,8 +54,7 @@ class CallbackBrokerWorker(ConsumerMixin):
signal.signal(signum, signal.SIG_DFL) signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
except Exception: except Exception:
# TODO: LOG logger.exception('Error in shutdown_handler')
pass
return _handler return _handler
if use_workers: if use_workers:
@@ -102,7 +113,8 @@ class CallbackBrokerWorker(ConsumerMixin):
return None return None
def callback_worker(self, queue_actual, idx): def callback_worker(self, queue_actual, idx):
while True: signal_handler = WorkerSignalHandler()
while not signal_handler.kill_now:
try: try:
body = queue_actual.get(block=True, timeout=1) body = queue_actual.get(block=True, timeout=1)
except QueueEmpty: except QueueEmpty: