diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 7c203eebce..001274baa5 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -37,6 +37,9 @@ else: logger = logging.getLogger('awx.main.dispatch') +RETIRED_SENTINEL_TASK = "[retired]" + + class NoOpResultQueue(object): def put(self, item): pass @@ -81,11 +84,17 @@ class PoolWorker(object): self.queue = MPQueue(queue_size) self.process = Process(target=target, args=(self.queue, self.finished) + args) self.process.daemon = True + self.creation_time = time.monotonic() + self.retiring = False def start(self): self.process.start() def put(self, body): + if self.retiring: + uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A' + logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.") + raise QueueFull("Worker is retiring and not accepting new tasks") # AutoscalePool.write handles QueueFull uuid = '?' if isinstance(body, dict): if not body.get('uuid'): @@ -104,6 +113,11 @@ class PoolWorker(object): """ self.queue.put('QUIT') + @property + def age(self): + """Returns the current age of the worker in seconds.""" + return time.monotonic() - self.creation_time + @property def pid(self): return self.process.pid @@ -150,6 +164,8 @@ class PoolWorker(object): # the purpose of self.managed_tasks is to just track internal # state of which events are *currently* being processed. logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid)) + if self.retiring: + self.managed_tasks[RETIRED_SENTINEL_TASK] = {'task': RETIRED_SENTINEL_TASK} @property def current_task(self): @@ -265,6 +281,8 @@ class WorkerPool(object): '{% for w in workers %}' '. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}' ' sent={{ w.messages_sent }}' + ' age={{ "%.0f"|format(w.age) }}s' + ' retiring={{ w.retiring }}' '{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}' ' qsize={{ w.managed_tasks|length }}' ' rss={{ w.mb }}MB' @@ -356,6 +374,9 @@ class AutoscalePool(WorkerPool): def __init__(self, *args, **kwargs): self.max_workers = kwargs.pop('max_workers', None) + self.max_worker_lifetime_seconds = kwargs.pop( + 'max_worker_lifetime_seconds', getattr(settings, 'WORKER_MAX_LIFETIME_SECONDS', 14400) + ) # Default to 4 hours super(AutoscalePool, self).__init__(*args, **kwargs) if self.max_workers is None: @@ -415,6 +436,7 @@ class AutoscalePool(WorkerPool): """ orphaned = [] for w in self.workers[::]: + is_retirement_age = self.max_worker_lifetime_seconds is not None and w.age > self.max_worker_lifetime_seconds if not w.alive: # the worker process has exited # 1. take the task it was running and enqueue the error @@ -423,6 +445,10 @@ class AutoscalePool(WorkerPool): # send them to another worker logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode)) if w.current_task: + if w.current_task == {'task': RETIRED_SENTINEL_TASK}: + logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age)) + self.workers.remove(w) + continue if w.current_task != 'QUIT': try: for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']): @@ -433,6 +459,7 @@ class AutoscalePool(WorkerPool): logger.warning(f'Worker was told to quit but has not, pid={w.pid}') orphaned.extend(w.orphaned_tasks) self.workers.remove(w) + elif w.idle and len(self.workers) > self.min_workers: # the process has an empty queue (it's idle) and we have # more processes in the pool than we need (> min) @@ -441,6 +468,22 @@ class AutoscalePool(WorkerPool): logger.debug('scaling down worker pid:{}'.format(w.pid)) w.quit() self.workers.remove(w) + + elif w.idle and is_retirement_age: + logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age)) + w.quit() + self.workers.remove(w) + + elif is_retirement_age and not w.retiring and not w.idle: + logger.info( + f"Worker pid:{w.pid} (age: {w.age:.0f}s) exceeded max lifetime ({self.max_worker_lifetime_seconds:.0f}s). " + "Signaling for graceful retirement." + ) + # Send QUIT signal; worker will finish current task then exit. + w.quit() + # mark as retiring to reject any future tasks that might be assigned in meantime + w.retiring = True + if w.alive: # if we discover a task manager invocation that's been running # too long, reap it (because otherwise it'll just hold the postgres diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f20df26c6b..81ef13cc4d 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1100,3 +1100,8 @@ FLAGS = { 'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}], 'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}], } + +# Dispatcher worker lifetime. If set to None, workers will never be retired +# based on age. Note workers will finish their last task before retiring if +# they are busy when they reach retirement age. +WORKER_MAX_LIFETIME_SECONDS = 14400 # seconds