Revert "Merge pull request #12584 from AlanCoding/lazy_workers"

This reverts commit 64157f7207, reversing
changes made to 9e8ba6ca09.
This commit is contained in:
Alan Rominger
2022-08-18 20:40:08 -04:00
parent b4ef687b60
commit 974f845059
2 changed files with 3 additions and 20 deletions

View File

@@ -72,11 +72,9 @@ class PoolWorker(object):
self.messages_finished = 0
self.managed_tasks = collections.OrderedDict()
self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue()
self.last_finished = None
self.queue = MPQueue(queue_size)
self.process = Process(target=target, args=(self.queue, self.finished) + args)
self.process.daemon = True
self.scale_down_in = settings.DISPATCHER_SCALE_DOWN_WAIT_TIME
def start(self):
self.process.start()
@@ -147,9 +145,6 @@ class PoolWorker(object):
# state of which events are *currently* being processed.
logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid))
if finished:
self.last_finished = time.time()
@property
def current_task(self):
if not self.track_managed_tasks:
@@ -195,14 +190,6 @@ class PoolWorker(object):
def idle(self):
return not self.busy
@property
def ready_to_scale_down(self):
if self.busy:
return False
if self.last_finished is None:
return True
return time.time() - self.last_finished > self.scale_down_in
class StatefulPoolWorker(PoolWorker):
@@ -263,7 +250,7 @@ class WorkerPool(object):
except Exception:
logger.exception('could not fork')
else:
logger.info(f'scaling up worker pid:{worker.pid} total:{len(self.workers)}')
logger.debug('scaling up worker pid:{}'.format(worker.pid))
return idx, worker
def debug(self, *args, **kwargs):
@@ -402,12 +389,12 @@ class AutoscalePool(WorkerPool):
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid']))
orphaned.extend(w.orphaned_tasks)
self.workers.remove(w)
elif (len(self.workers) > self.min_workers) and w.ready_to_scale_down:
elif w.idle and len(self.workers) > self.min_workers:
# the process has an empty queue (it's idle) and we have
# more processes in the pool than we need (> min)
# send this process a message so it will exit gracefully
# at the next opportunity
logger.info(f'scaling down worker pid:{w.pid} prior total:{len(self.workers)}')
logger.debug('scaling down worker pid:{}'.format(w.pid))
w.quit()
self.workers.remove(w)
if w.alive: