Merge pull request #2438 from ryanpetrello/dispatcher-quit-race

don't attempt to recover special QUIT messages in the worker pool recovery code

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot]
2018-10-15 20:40:20 +00:00
committed by GitHub

View File

@@ -145,7 +145,9 @@ class PoolWorker(object):
# if this process has any pending messages requeue them # if this process has any pending messages requeue them
for _ in range(self.qsize): for _ in range(self.qsize):
try: try:
orphaned.append(self.queue.get(block=False)) message = self.queue.get(block=False)
if message != 'QUIT':
orphaned.append(message)
except QueueEmpty: except QueueEmpty:
break # qsize is not always _totally_ up to date break # qsize is not always _totally_ up to date
if len(orphaned): if len(orphaned):
@@ -328,11 +330,12 @@ class AutoscalePool(WorkerPool):
# send them to another worker # send them to another worker
logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode)) logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode))
if w.current_task: if w.current_task:
try: if w.current_task != 'QUIT':
for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']): try:
reaper.reap_job(j, 'failed') for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']):
except Exception: reaper.reap_job(j, 'failed')
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) except Exception:
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid']))
orphaned.extend(w.orphaned_tasks) orphaned.extend(w.orphaned_tasks)
self.workers.remove(w) self.workers.remove(w)
elif w.idle and len(self.workers) > self.min_workers: elif w.idle and len(self.workers) > self.min_workers: