From cc6d673e4fce6a2e8c426ca31c7ded83bd1d0f96 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 29 Sep 2014 14:31:14 -0400 Subject: [PATCH] Attempt to fix a mysterious zmq crash related to sockets lingering after a fork --- awx/main/management/commands/run_callback_receiver.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index ff8851bee8..0eee2e4719 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -50,9 +50,9 @@ class CallbackReceiver(object): return True return False - consumer_context = zmq.Context() - consumer_subscriber = consumer_context.socket(zmq.REP) - consumer_subscriber.bind(consumer_port) + self.consumer_context = zmq.Context() + self.consumer_subscriber = self.consumer_context.socket(zmq.REP) + self.consumer_subscriber.bind(consumer_port) worker_queues = [] @@ -78,7 +78,7 @@ class CallbackReceiver(object): last_parent_events = {} while True: # Handle signal - message = consumer_subscriber.recv_json() + message = self.consumer_subscriber.recv_json() total_messages += 1 if not use_workers: self.process_job_event(message) @@ -128,7 +128,7 @@ class CallbackReceiver(object): queue_actual_worker[2] = w last_parent_events[message['job_id']] = job_parent_events - consumer_subscriber.send("1") + self.consumer_subscriber.send("1") def process_job_event(self, data): # Sanity check: Do we need to do anything at all? @@ -199,6 +199,7 @@ class CallbackReceiver(object): return None def callback_worker(self, queue_actual): + self.consumer_context.destroy() messages_processed = 0 while True: message = queue_actual.get()