From 0f5beca9ae190af5ed66cca9634c645261f3be41 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 13 Mar 2015 11:11:49 -0400 Subject: [PATCH] Major fixes for job callback receiver processing * Add logic to ansible callback plugin to prevent it from waiting forever to submit events to Tower * Lower process recycle threshold for tower callback receiver * Make recycle threshold configurable * Properly exit the main callback receiver management process if the event receiver process is dead so we don't leave dead worker processes * Set a configurable maximum number of messages that can be waiting in a worker process queue before it is skipped instead of filling up memory on a dead worker process * Skip over a dead worker process if it's queue is full * Force restart callback receiver if all queues are dead * Roll back transaction.atomic with the thought that it is causing deadlocks in the worker process. Use the old commit_on_success mechanism with retry logic * Seperate queue nonblocking expected exception from any other type of exception that could be encountered on the queue fetch operation --- .../commands/run_callback_receiver.py | 95 ++++++++++--------- awx/plugins/callback/job_event_callback.py | 9 +- awx/settings/defaults.py | 8 ++ 3 files changed, 65 insertions(+), 47 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 376d3d7764..ac1ba04af7 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -9,6 +9,7 @@ import logging import signal import time from multiprocessing import Process, Queue +from Queue import Empty as QueueEmpty # Django from django.conf import settings @@ -24,7 +25,6 @@ from awx.main.socket import Socket logger = logging.getLogger('awx.main.commands.run_callback_receiver') -MAX_REQUESTS = 10000 WORKERS = 4 class CallbackReceiver(object): @@ -55,8 +55,8 @@ class CallbackReceiver(object): if use_workers: connection.close() for idx in range(WORKERS): - queue_actual = Queue() - w = Process(target=self.callback_worker, args=(queue_actual,)) + queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE) + w = Process(target=self.callback_worker, args=(queue_actual, idx,)) w.start() if settings.DEBUG: logger.info('Started worker %s' % str(idx)) @@ -75,27 +75,45 @@ class CallbackReceiver(object): signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) while True: workers_changed = False + idx = 0 for queue_worker in worker_queues: if not queue_worker[2].is_alive(): + logger.warn("Worker %s was not alive, restarting" % str(queue_worker)) workers_changed = True queue_worker[2].join() - w = Process(target=self.callback_worker, args=(queue_worker[1],)) + w = Process(target=self.callback_worker, args=(queue_worker[1], idx,)) w.daemon = True w.start() signal.signal(signal.SIGINT, shutdown_handler([w])) signal.signal(signal.SIGTERM, shutdown_handler([w])) queue_worker[2] = w + idx += 1 if workers_changed: signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) if not main_process.is_alive(): - sys.exit(1) + logger.error("Main process is not alive") + for queue_worker in worker_queues: + queue_worker[2].terminate() + break time.sleep(0.1) + def write_queue_worker(self, preferred_queue, worker_queues, message): + queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) + for queue_actual in queue_order: + try: + worker_actual = worker_queues[queue_actual] + worker_queues[queue_actual][1].put(message, block=True, timeout=2) + worker_queues[queue_actual][0] += 1 + return queue_actual + except Exception, e: + logger.warn("Could not write to queue %s" % preferred_queue) + continue + return None + def callback_handler(self, use_workers, worker_queues): total_messages = 0 last_parent_events = {} - with Socket('callbacks', 'r') as callbacks: for message in callbacks.listen(): total_messages += 1 @@ -131,28 +149,16 @@ class CallbackReceiver(object): else: if message['event'] == 'playbook_on_stats': job_parent_events = {} - queue_actual_worker = worker_queues[total_messages % WORKERS] - queue_actual_worker[0] += 1 - try: - queue_actual_worker[1].put(message, block=True, timeout=2) - except Exception: - print("Queue workers went away, continuing...") - continue - if queue_actual_worker[0] >= MAX_REQUESTS: - queue_actual_worker[0] = 0 - # print("Recycling worker process") - # queue_actual_worker[2].join() - # connection.close() - # w = Process(target=self.callback_worker, args=(queue_actual_worker[1],)) - # w.daemon = True - # w.start() - # signal.signal(signal.SIGINT, shutdown_handler([w])) - # signal.signal(signal.SIGTERM, shutdown_handler([w])) - - # queue_actual_worker[2] = w + actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message) + # NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full + # the drawback is that if we under extremely high load we may be legitimately taking a while to process messages + if actual_queue is None: + logger.error("All queues full!") + sys.exit(1) last_parent_events[message['job_id']] = job_parent_events + @transaction.commit_on_success def process_job_event(self, data): # Sanity check: Do we need to do anything at all? event = data.get('event', '') @@ -194,24 +200,24 @@ class CallbackReceiver(object): # If we get a database error of some kind, try again. for retry_count in xrange(11): try: - with transaction.atomic(): - # If we're not in verbose mode, wipe out any module - # arguments. - res = data['event_data'].get('res', {}) - if isinstance(res, dict): - i = res.get('invocation', {}) - if verbose == 0 and 'module_args' in i: - i['module_args'] = '' + # If we're not in verbose mode, wipe out any module + # arguments. + res = data['event_data'].get('res', {}) + if isinstance(res, dict): + i = res.get('invocation', {}) + if verbose == 0 and 'module_args' in i: + i['module_args'] = '' - # Create a new JobEvent object. - job_event = JobEvent(**data) - if parent_id is not None: - job_event.parent = JobEvent.objects.get(id=parent_id) - job_event.save(post_process=True) + # Create a new JobEvent object. + job_event = JobEvent(**data) + if parent_id is not None: + job_event.parent = JobEvent.objects.get(id=parent_id) + job_event.save(post_process=True) - # Retrun the job event object. - return job_event + # Retrun the job event object. + return job_event except DatabaseError as e: + transaction.rollback() # Log the error and try again. logger.error('Database error saving job event, retrying in ' '1 second (retry #%d): %s', retry_count + 1, e) @@ -221,16 +227,19 @@ class CallbackReceiver(object): logger.error('Failed to save job event after %d retries.', retry_count) return None - def callback_worker(self, queue_actual): + def callback_worker(self, queue_actual, idx): messages_processed = 0 while True: try: message = queue_actual.get(block=True, timeout=1) - except Exception: + except QueueEmpty: continue + except Exception, e: + logger.error("Exception on listen socket, restarting: " + str(e)) + break self.process_job_event(message) messages_processed += 1 - if messages_processed >= MAX_REQUESTS: + if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD: logger.info("Shutting down message receiver") break diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 1a613eb787..5a3c25ffe9 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -121,6 +121,8 @@ class CallbackModule(object): def _start_connection(self): self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) + self.socket.setsockopt(zmq.RCVTIMEO, 4000) + self.socket.setsockopt(zmq.LINGER, 2000) self.socket.connect(self.callback_consumer_port) def _post_job_event_queue_msg(self, event, event_data): @@ -146,16 +148,15 @@ class CallbackModule(object): self._init_connection() if self.context is None: self._start_connection() - self.socket.send_json(msg) self.socket.recv() return except Exception, e: - self.logger.info('Publish Exception: %r, retry=%d', e, + self.logger.info('Publish Job Event Exception: %r, retry=%d', e, retry_count, exc_info=True) - # TODO: Maybe recycle connection here? + retry_count += 1 if retry_count >= 3: - raise + break def _post_rest_api_event(self, event, event_data): data = json.dumps({ diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 973c684207..ba988295fd 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -317,6 +317,14 @@ ANSIBLE_FORCE_COLOR = True # the celery task. AWX_TASK_ENV = {} +# Maximum number of job events processed by the callback receiver worker process +# before it recycles +JOB_EVENT_RECYCLE_THRESHOLD = 3000 + +# Maximum number of job events that can be waiting on a single worker queue before +# it can be skipped as too busy +JOB_EVENT_MAX_QUEUE_SIZE = 100 + # Flag to enable/disable updating hosts M2M when saving job events. CAPTURE_JOB_EVENT_HOSTS = False