mirror of
https://github.com/ansible/awx.git
synced 2026-01-18 05:01:19 -03:30
don't attempt to recover special QUIT messages in the worker pool
when `--reload` is sent to the dispatcher, it sends a special QUIT
message to each worker in the pool so that it will exit gracefully at
the next opportunity
when a worker process exits unexpectedly, the dispatcher attempts to
recover its queued messages and sends them to another worker in the
pool; in this scenario, we should _never_ re-enqueue these special
QUIT messages (because the process doesn't need to quit, it's already
gone)
To reproduce this race condition:
1. Launch an adhoc that does `sleep 60`
2. Run `awx-manage run_dispatcher --reload` to enqueue a `QUIT` message
into the worker's queue
3. Find the pid of the worker running the `sleep 60` and `SIGKILL` it.
4. Observe that dispatcher attempts to requeue the `QUIT` message and
logs a confusing error.
This commit is contained in:
parent
c722e50595
commit
720a634702
@ -145,7 +145,9 @@ class PoolWorker(object):
|
||||
# if this process has any pending messages requeue them
|
||||
for _ in range(self.qsize):
|
||||
try:
|
||||
orphaned.append(self.queue.get(block=False))
|
||||
message = self.queue.get(block=False)
|
||||
if message != 'QUIT':
|
||||
orphaned.append(message)
|
||||
except QueueEmpty:
|
||||
break # qsize is not always _totally_ up to date
|
||||
if len(orphaned):
|
||||
@ -328,11 +330,12 @@ class AutoscalePool(WorkerPool):
|
||||
# send them to another worker
|
||||
logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode))
|
||||
if w.current_task:
|
||||
try:
|
||||
for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']):
|
||||
reaper.reap_job(j, 'failed')
|
||||
except Exception:
|
||||
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid']))
|
||||
if w.current_task != 'QUIT':
|
||||
try:
|
||||
for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']):
|
||||
reaper.reap_job(j, 'failed')
|
||||
except Exception:
|
||||
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid']))
|
||||
orphaned.extend(w.orphaned_tasks)
|
||||
self.workers.remove(w)
|
||||
elif w.idle and len(self.workers) > self.min_workers:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user