From 720a6347025bea88be6d5de54e781b011e5253bd Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Mon, 15 Oct 2018 12:17:52 -0400 Subject: [PATCH] don't attempt to recover special QUIT messages in the worker pool when `--reload` is sent to the dispatcher, it sends a special QUIT message to each worker in the pool so that it will exit gracefully at the next opportunity when a worker process exits unexpectedly, the dispatcher attempts to recover its queued messages and sends them to another worker in the pool; in this scenario, we should _never_ re-enqueue these special QUIT messages (because the process doesn't need to quit, it's already gone) To reproduce this race condition: 1. Launch an adhoc that does `sleep 60` 2. Run `awx-manage run_dispatcher --reload` to enqueue a `QUIT` message into the worker's queue 3. Find the pid of the worker running the `sleep 60` and `SIGKILL` it. 4. Observe that dispatcher attempts to requeue the `QUIT` message and logs a confusing error. --- awx/main/dispatch/pool.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 9c4f5b219f..b03bbf4c92 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -145,7 +145,9 @@ class PoolWorker(object): # if this process has any pending messages requeue them for _ in range(self.qsize): try: - orphaned.append(self.queue.get(block=False)) + message = self.queue.get(block=False) + if message != 'QUIT': + orphaned.append(message) except QueueEmpty: break # qsize is not always _totally_ up to date if len(orphaned): @@ -328,11 +330,12 @@ class AutoscalePool(WorkerPool): # send them to another worker logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode)) if w.current_task: - try: - for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']): - reaper.reap_job(j, 'failed') - except Exception: - logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) + if w.current_task != 'QUIT': + try: + for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']): + reaper.reap_job(j, 'failed') + except Exception: + logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) orphaned.extend(w.orphaned_tasks) self.workers.remove(w) elif w.idle and len(self.workers) > self.min_workers: