From f98b2e24553ea30f04871d58c6e584305f41e30f Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Fri, 9 May 2025 14:10:45 -0400 Subject: [PATCH] introduce age for workers and mandatory retirement Retire workers after a certain age, allowing them to finish their current task if they are not idle. This mitigates any issues like memory leaks in long running workers, especially if systems stay busy for months at a time. Introduce new optional setting WORKER_MAX_LIFETIME_SECONDS, defaulting to 4 hours --- awx/main/dispatch/pool.py | 43 +++++++++++++++++++++++++++++++++++++++ awx/settings/defaults.py | 5 +++++ 2 files changed, 48 insertions(+) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 16366c3e0c..061e35e8ab 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -31,6 +31,9 @@ else: logger = logging.getLogger('awx.main.dispatch') +RETIRED_SENTINEL_TASK = "[retired]" + + class NoOpResultQueue(object): def put(self, item): pass @@ -75,11 +78,17 @@ class PoolWorker(object): self.queue = MPQueue(queue_size) self.process = Process(target=target, args=(self.queue, self.finished) + args) self.process.daemon = True + self.creation_time = time.monotonic() + self.retiring = False def start(self): self.process.start() def put(self, body): + if self.retiring: + uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A' + logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.") + raise QueueFull("Worker is retiring and not accepting new tasks") # AutoscalePool.write handles QueueFull uuid = '?' if isinstance(body, dict): if not body.get('uuid'): @@ -98,6 +107,11 @@ class PoolWorker(object): """ self.queue.put('QUIT') + @property + def age(self): + """Returns the current age of the worker in seconds.""" + return time.monotonic() - self.creation_time + @property def pid(self): return self.process.pid @@ -144,6 +158,8 @@ class PoolWorker(object): # the purpose of self.managed_tasks is to just track internal # state of which events are *currently* being processed. logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid)) + if self.retiring: + self.managed_tasks[RETIRED_SENTINEL_TASK] = {'task': RETIRED_SENTINEL_TASK} @property def current_task(self): @@ -259,6 +275,8 @@ class WorkerPool(object): '{% for w in workers %}' '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' ' sent={{ w.messages_sent }}' + ' age={{ "%.0f"|format(w.age) }}s' + ' retiring={{ w.retiring }}' '{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}' ' qsize={{ w.managed_tasks|length }}' ' rss={{ w.mb }}MB' @@ -315,6 +333,9 @@ class AutoscalePool(WorkerPool): def __init__(self, *args, **kwargs): self.max_workers = kwargs.pop('max_workers', None) + self.max_worker_lifetime_seconds = kwargs.pop( + 'max_worker_lifetime_seconds', getattr(settings, 'WORKER_MAX_LIFETIME_SECONDS', 14400) + ) # Default to 4 hours super(AutoscalePool, self).__init__(*args, **kwargs) if self.max_workers is None: @@ -383,6 +404,7 @@ class AutoscalePool(WorkerPool): """ orphaned = [] for w in self.workers[::]: + is_retirement_age = self.max_worker_lifetime_seconds is not None and w.age > self.max_worker_lifetime_seconds if not w.alive: # the worker process has exited # 1. take the task it was running and enqueue the error @@ -391,6 +413,10 @@ class AutoscalePool(WorkerPool): # send them to another worker logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode)) if w.current_task: + if w.current_task == {'task': RETIRED_SENTINEL_TASK}: + logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age)) + self.workers.remove(w) + continue if w.current_task != 'QUIT': try: for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']): @@ -401,6 +427,7 @@ class AutoscalePool(WorkerPool): logger.warning(f'Worker was told to quit but has not, pid={w.pid}') orphaned.extend(w.orphaned_tasks) self.workers.remove(w) + elif w.idle and len(self.workers) > self.min_workers: # the process has an empty queue (it's idle) and we have # more processes in the pool than we need (> min) @@ -409,6 +436,22 @@ class AutoscalePool(WorkerPool): logger.debug('scaling down worker pid:{}'.format(w.pid)) w.quit() self.workers.remove(w) + + elif w.idle and is_retirement_age: + logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age)) + w.quit() + self.workers.remove(w) + + elif is_retirement_age and not w.retiring and not w.idle: + logger.info( + f"Worker pid:{w.pid} (age: {w.age:.0f}s) exceeded max lifetime ({self.max_worker_lifetime_seconds:.0f}s). " + "Signaling for graceful retirement." + ) + # Send QUIT signal; worker will finish current task then exit. + w.quit() + # mark as retiring to reject any future tasks that might be assigned in meantime + w.retiring = True + if w.alive: # if we discover a task manager invocation that's been running # too long, reap it (because otherwise it'll just hold the postgres diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f3b570cb8d..191447d416 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1258,3 +1258,8 @@ FLAGS = { 'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}], 'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}], } + +# Dispatcher worker lifetime. If set to None, workers will never be retired +# based on age. Note workers will finish their last task before retiring if +# they are busy when they reach retirement age. +WORKER_MAX_LIFETIME_SECONDS = 14400 # seconds