diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 527b4b52fb..7c156bfbeb 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -72,11 +72,9 @@ class PoolWorker(object): self.messages_finished = 0 self.managed_tasks = collections.OrderedDict() self.finished = MPQueue(queue_size) if self.track_managed_tasks else NoOpResultQueue() - self.last_finished = None self.queue = MPQueue(queue_size) self.process = Process(target=target, args=(self.queue, self.finished) + args) self.process.daemon = True - self.scale_down_in = settings.DISPATCHER_SCALE_DOWN_WAIT_TIME def start(self): self.process.start() @@ -147,9 +145,6 @@ class PoolWorker(object): # state of which events are *currently* being processed. logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid)) - if finished: - self.last_finished = time.time() - @property def current_task(self): if not self.track_managed_tasks: @@ -195,14 +190,6 @@ class PoolWorker(object): def idle(self): return not self.busy - @property - def ready_to_scale_down(self): - if self.busy: - return False - if self.last_finished is None: - return True - return time.time() - self.last_finished > self.scale_down_in - class StatefulPoolWorker(PoolWorker): @@ -263,7 +250,7 @@ class WorkerPool(object): except Exception: logger.exception('could not fork') else: - logger.info(f'scaling up worker pid:{worker.pid} total:{len(self.workers)}') + logger.debug('scaling up worker pid:{}'.format(worker.pid)) return idx, worker def debug(self, *args, **kwargs): @@ -402,12 +389,12 @@ class AutoscalePool(WorkerPool): logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) orphaned.extend(w.orphaned_tasks) self.workers.remove(w) - elif (len(self.workers) > self.min_workers) and w.ready_to_scale_down: + elif w.idle and len(self.workers) > self.min_workers: # the process has an empty queue (it's idle) and we have # more processes in the pool than we need (> min) # send this process a message so it will exit gracefully # at the next opportunity - logger.info(f'scaling down worker pid:{w.pid} prior total:{len(self.workers)}') + logger.debug('scaling down worker pid:{}'.format(w.pid)) w.quit() self.workers.remove(w) if w.alive: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 0be21d6020..964d13fbf8 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -444,10 +444,6 @@ EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an # Amount of time dispatcher will try to reconnect to database for jobs and consuming new work DISPATCHER_DB_DOWNTOWN_TOLLERANCE = 40 -# Minimum time to wait after last job finished before scaling down a worker -# A higher value will free up memory more agressively, but a lower value will require less forking -DISPATCHER_SCALE_DOWN_WAIT_TIME = 60 - BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { 'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},