Refactor callback receiver

* Move callback socket listener into its own method spawned as a
  seperate process
* launch and manage worker processes from a parent process and isolate
  them such that a fork doesn't inherit the zeromq socket
This commit is contained in:
Matthew Jones
2014-10-03 00:34:26 -04:00
parent d12dddebde
commit c609f7a81d

View File

@@ -50,10 +50,6 @@ class CallbackReceiver(object):
return True return True
return False return False
self.consumer_context = zmq.Context()
self.consumer_subscriber = self.consumer_context.socket(zmq.REP)
self.consumer_subscriber.bind(consumer_port)
worker_queues = [] worker_queues = []
if use_workers: if use_workers:
@@ -61,7 +57,6 @@ class CallbackReceiver(object):
for idx in range(WORKERS): for idx in range(WORKERS):
queue_actual = Queue() queue_actual = Queue()
w = Process(target=self.callback_worker, args=(queue_actual,)) w = Process(target=self.callback_worker, args=(queue_actual,))
w.daemon = True
w.start() w.start()
signal.signal(signal.SIGINT, shutdown_handler([w])) signal.signal(signal.SIGINT, shutdown_handler([w]))
@@ -72,10 +67,34 @@ class CallbackReceiver(object):
elif settings.DEBUG: elif settings.DEBUG:
print 'Started callback receiver (no workers)' print 'Started callback receiver (no workers)'
main_process = Process(target=self.callback_handler, args=(use_workers, consumer_port, worker_queues,))
main_process.daemon = True
main_process.start()
signal.signal(signal.SIGINT, shutdown_handler([main_process]))
signal.signal(signal.SIGTERM, shutdown_handler([main_process]))
while True:
for queue_worker in worker_queues:
if not queue_worker[2].is_alive():
queue_worker[2].join()
w = Process(target=self.callback_worker, args=(queue_worker[1],))
w.daemon = True
w.start()
signal.signal(signal.SIGINT, shutdown_handler([w]))
signal.signal(signal.SIGTERM, shutdown_handler([w]))
queue_worker[2] = w
if not main_process.is_alive():
sys.exit(1)
time.sleep(0.1)
def callback_handler(self, use_workers, consumer_port, worker_queues):
message_number = 0 message_number = 0
total_messages = 0 total_messages = 0
last_parent_events = {} last_parent_events = {}
self.consumer_context = zmq.Context()
self.consumer_subscriber = self.consumer_context.socket(zmq.REP)
self.consumer_subscriber.bind(consumer_port)
while True: # Handle signal while True: # Handle signal
message = self.consumer_subscriber.recv_json() message = self.consumer_subscriber.recv_json()
@@ -116,17 +135,17 @@ class CallbackReceiver(object):
queue_actual_worker[1].put(message) queue_actual_worker[1].put(message)
if queue_actual_worker[0] >= MAX_REQUESTS: if queue_actual_worker[0] >= MAX_REQUESTS:
queue_actual_worker[0] = 0 queue_actual_worker[0] = 0
print("Recycling worker process") # print("Recycling worker process")
queue_actual_worker[2].join() # queue_actual_worker[2].join()
connection.close() # connection.close()
w = Process(target=self.callback_worker, args=(queue_actual_worker[1],)) # w = Process(target=self.callback_worker, args=(queue_actual_worker[1],))
w.daemon = True # w.daemon = True
w.start() # w.start()
signal.signal(signal.SIGINT, shutdown_handler([w])) # signal.signal(signal.SIGINT, shutdown_handler([w]))
signal.signal(signal.SIGTERM, shutdown_handler([w])) # signal.signal(signal.SIGTERM, shutdown_handler([w]))
queue_actual_worker[2] = w # queue_actual_worker[2] = w
last_parent_events[message['job_id']] = job_parent_events last_parent_events[message['job_id']] = job_parent_events
self.consumer_subscriber.send("1") self.consumer_subscriber.send("1")
@@ -206,7 +225,7 @@ class CallbackReceiver(object):
messages_processed += 1 messages_processed += 1
if messages_processed >= MAX_REQUESTS: if messages_processed >= MAX_REQUESTS:
print("Shutting down message receiver") print("Shutting down message receiver")
sys.exit(0) break
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''