diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 9741f83a08..a255e168cd 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -71,9 +71,11 @@ class PoolWorker(object): self.messages_finished = 0 self.managed_tasks = collections.OrderedDict() self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue() + self.last_finished = None self.queue = MPQueue(queue_size) self.process = Process(target=target, args=(self.queue, self.finished) + args) self.process.daemon = True + self.scale_down_in = settings.DISPATCHER_SCALE_DOWN_WAIT_TIME def start(self): self.process.start() @@ -144,6 +146,9 @@ class PoolWorker(object): # state of which events are *currently* being processed. logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid)) + if finished: + self.last_finished = time.time() + @property def current_task(self): if not self.track_managed_tasks: @@ -189,6 +194,14 @@ class PoolWorker(object): def idle(self): return not self.busy + @property + def ready_to_scale_down(self): + if self.busy: + return False + if self.last_finished is None: + return True + return time.time() - self.last_finished > self.scale_down_in + class StatefulPoolWorker(PoolWorker): @@ -249,7 +262,7 @@ class WorkerPool(object): except Exception: logger.exception('could not fork') else: - logger.debug('scaling up worker pid:{}'.format(worker.pid)) + logger.info(f'scaling up worker pid:{worker.pid} total:{len(self.workers)}') return idx, worker def debug(self, *args, **kwargs): @@ -385,12 +398,12 @@ class AutoscalePool(WorkerPool): logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) orphaned.extend(w.orphaned_tasks) self.workers.remove(w) - elif w.idle and len(self.workers) > self.min_workers: + elif (len(self.workers) > self.min_workers) and w.ready_to_scale_down: # the process has an empty queue (it's idle) and we have # more processes in the pool than we need (> min) # send this process a message so it will exit gracefully # at the next opportunity - logger.debug('scaling down worker pid:{}'.format(w.pid)) + logger.info(f'scaling down worker pid:{w.pid} prior total:{len(self.workers)}') w.quit() self.workers.remove(w) if w.alive: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 2751534cfb..bf20f8b9f8 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -438,6 +438,10 @@ EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an # Amount of time dispatcher will try to reconnect to database for jobs and consuming new work DISPATCHER_DB_DOWNTOWN_TOLLERANCE = 40 +# Minimum time to wait after last job finished before scaling down a worker +# A higher value will free up memory more agressively, but a lower value will require less forking +DISPATCHER_SCALE_DOWN_WAIT_TIME = 60 + BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { 'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},