Attempt to fix a mysterious zmq crash related to sockets lingering after

a fork
This commit is contained in:
Matthew Jones
2014-09-29 14:31:14 -04:00
parent eabfbd572f
commit cc6d673e4f

View File

@@ -50,9 +50,9 @@ class CallbackReceiver(object):
return True return True
return False return False
consumer_context = zmq.Context() self.consumer_context = zmq.Context()
consumer_subscriber = consumer_context.socket(zmq.REP) self.consumer_subscriber = self.consumer_context.socket(zmq.REP)
consumer_subscriber.bind(consumer_port) self.consumer_subscriber.bind(consumer_port)
worker_queues = [] worker_queues = []
@@ -78,7 +78,7 @@ class CallbackReceiver(object):
last_parent_events = {} last_parent_events = {}
while True: # Handle signal while True: # Handle signal
message = consumer_subscriber.recv_json() message = self.consumer_subscriber.recv_json()
total_messages += 1 total_messages += 1
if not use_workers: if not use_workers:
self.process_job_event(message) self.process_job_event(message)
@@ -128,7 +128,7 @@ class CallbackReceiver(object):
queue_actual_worker[2] = w queue_actual_worker[2] = w
last_parent_events[message['job_id']] = job_parent_events last_parent_events[message['job_id']] = job_parent_events
consumer_subscriber.send("1") self.consumer_subscriber.send("1")
def process_job_event(self, data): def process_job_event(self, data):
# Sanity check: Do we need to do anything at all? # Sanity check: Do we need to do anything at all?
@@ -199,6 +199,7 @@ class CallbackReceiver(object):
return None return None
def callback_worker(self, queue_actual): def callback_worker(self, queue_actual):
self.consumer_context.destroy()
messages_processed = 0 messages_processed = 0
while True: while True:
message = queue_actual.get() message = queue_actual.get()