diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index b5e9a90fe0..8847097fbc 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,6 +3,7 @@ # Python import os +import sys import datetime import logging import json @@ -26,6 +27,8 @@ from awx.main.models import * # ZeroMQ import zmq +MAX_REQUESTS = 20000 + class CallbackReceiver(object): def __init__(self): @@ -54,27 +57,36 @@ class CallbackReceiver(object): queue_publisher.bind(queue_port) if use_workers: - workers = [] - for idx in range(4): - w = Process(target=self.callback_worker, args=(queue_port,)) - w.daemon = True - w.start() - workers.append(w) - signal.signal(signal.SIGINT, shutdown_handler(workers)) - signal.signal(signal.SIGTERM, shutdown_handler(workers)) + w = Process(target=self.callback_worker, args=(queue_port,)) + w.daemon = True + w.start() + + signal.signal(signal.SIGINT, shutdown_handler([w])) + signal.signal(signal.SIGTERM, shutdown_handler([w])) if settings.DEBUG: - print 'Started callback receiver (4 workers)' + print 'Started worker' elif settings.DEBUG: print 'Started callback receiver (no workers)' + message_number = 0 while True: # Handle signal message = consumer_subscriber.recv_json() - if True: # check_pre_handle(message) or not use_workers: + message_number += 1 + if not use_workers: self.process_job_event(message) else: queue_publisher.send_json(message) + if message_number >= MAX_REQUESTS: + message_number = 0 + print("Recycling worker process") + w.join() + w = Process(target=self.callback_worker, args=(queue_port,)) + w.daemon = True + w.start() consumer_subscriber.send("1") + # NOTE: This cache doesn't work too terribly well but it can help prevent database queries + # we may want to use something like memcached here instead def process_parent_cache(self, job_id, event_object): if event_object.event not in ('playbook_on_start', 'playbook_on_play_start', 'playbook_on_setup', 'playbook_on_task_start'): return @@ -155,12 +167,18 @@ class CallbackReceiver(object): retry_count) def callback_worker(self, port): + messages_processed = 0 pool_context = zmq.Context() pool_subscriber = pool_context.socket(zmq.PULL) pool_subscriber.connect(port) while True: message = pool_subscriber.recv_json() self.process_job_event(message) + messages_processed += 1 + if messages_processed >= MAX_REQUESTS: + print("Shutting down message receiver") + pool_subscriber.close() + sys.exit(0) class Command(NoArgsCommand): '''