mirror of
https://github.com/ansible/awx.git
synced 2026-01-09 15:02:07 -03:30
introduce age for workers and mandatory retirement
Retire workers after a certain age, allowing them to finish their current task if they are not idle. This mitigates any issues like memory leaks in long running workers, especially if systems stay busy for months at a time. Introduce new optional setting WORKER_MAX_LIFETIME_SECONDS, defaulting to 4 hours.
This commit is contained in:
parent
01eb162378
commit
6accd1e5e6
@ -37,6 +37,9 @@ else:
|
||||
logger = logging.getLogger('awx.main.dispatch')
|
||||
|
||||
|
||||
RETIRED_SENTINEL_TASK = "[retired]"
|
||||
|
||||
|
||||
class NoOpResultQueue(object):
|
||||
def put(self, item):
|
||||
pass
|
||||
@ -81,11 +84,17 @@ class PoolWorker(object):
|
||||
self.queue = MPQueue(queue_size)
|
||||
self.process = Process(target=target, args=(self.queue, self.finished) + args)
|
||||
self.process.daemon = True
|
||||
self.creation_time = time.monotonic()
|
||||
self.retiring = False
|
||||
|
||||
def start(self):
|
||||
self.process.start()
|
||||
|
||||
def put(self, body):
|
||||
if self.retiring:
|
||||
uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A'
|
||||
logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.")
|
||||
raise QueueFull("Worker is retiring and not accepting new tasks") # AutoscalePool.write handles QueueFull
|
||||
uuid = '?'
|
||||
if isinstance(body, dict):
|
||||
if not body.get('uuid'):
|
||||
@ -104,6 +113,11 @@ class PoolWorker(object):
|
||||
"""
|
||||
self.queue.put('QUIT')
|
||||
|
||||
@property
|
||||
def age(self):
|
||||
"""Returns the current age of the worker in seconds."""
|
||||
return time.monotonic() - self.creation_time
|
||||
|
||||
@property
|
||||
def pid(self):
|
||||
return self.process.pid
|
||||
@ -150,6 +164,8 @@ class PoolWorker(object):
|
||||
# the purpose of self.managed_tasks is to just track internal
|
||||
# state of which events are *currently* being processed.
|
||||
logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid))
|
||||
if self.retiring:
|
||||
self.managed_tasks[RETIRED_SENTINEL_TASK] = {'task': RETIRED_SENTINEL_TASK}
|
||||
|
||||
@property
|
||||
def current_task(self):
|
||||
@ -265,6 +281,8 @@ class WorkerPool(object):
|
||||
'{% for w in workers %}'
|
||||
'. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}'
|
||||
' sent={{ w.messages_sent }}'
|
||||
' age={{ "%.0f"|format(w.age) }}s'
|
||||
' retiring={{ w.retiring }}'
|
||||
'{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}'
|
||||
' qsize={{ w.managed_tasks|length }}'
|
||||
' rss={{ w.mb }}MB'
|
||||
@ -356,6 +374,9 @@ class AutoscalePool(WorkerPool):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.max_workers = kwargs.pop('max_workers', None)
|
||||
self.max_worker_lifetime_seconds = kwargs.pop(
|
||||
'max_worker_lifetime_seconds', getattr(settings, 'WORKER_MAX_LIFETIME_SECONDS', 14400)
|
||||
) # Default to 4 hours
|
||||
super(AutoscalePool, self).__init__(*args, **kwargs)
|
||||
|
||||
if self.max_workers is None:
|
||||
@ -415,6 +436,7 @@ class AutoscalePool(WorkerPool):
|
||||
"""
|
||||
orphaned = []
|
||||
for w in self.workers[::]:
|
||||
is_retirement_age = self.max_worker_lifetime_seconds is not None and w.age > self.max_worker_lifetime_seconds
|
||||
if not w.alive:
|
||||
# the worker process has exited
|
||||
# 1. take the task it was running and enqueue the error
|
||||
@ -423,6 +445,10 @@ class AutoscalePool(WorkerPool):
|
||||
# send them to another worker
|
||||
logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode))
|
||||
if w.current_task:
|
||||
if w.current_task == {'task': RETIRED_SENTINEL_TASK}:
|
||||
logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age))
|
||||
self.workers.remove(w)
|
||||
continue
|
||||
if w.current_task != 'QUIT':
|
||||
try:
|
||||
for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']):
|
||||
@ -433,6 +459,7 @@ class AutoscalePool(WorkerPool):
|
||||
logger.warning(f'Worker was told to quit but has not, pid={w.pid}')
|
||||
orphaned.extend(w.orphaned_tasks)
|
||||
self.workers.remove(w)
|
||||
|
||||
elif w.idle and len(self.workers) > self.min_workers:
|
||||
# the process has an empty queue (it's idle) and we have
|
||||
# more processes in the pool than we need (> min)
|
||||
@ -441,6 +468,22 @@ class AutoscalePool(WorkerPool):
|
||||
logger.debug('scaling down worker pid:{}'.format(w.pid))
|
||||
w.quit()
|
||||
self.workers.remove(w)
|
||||
|
||||
elif w.idle and is_retirement_age:
|
||||
logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age))
|
||||
w.quit()
|
||||
self.workers.remove(w)
|
||||
|
||||
elif is_retirement_age and not w.retiring and not w.idle:
|
||||
logger.info(
|
||||
f"Worker pid:{w.pid} (age: {w.age:.0f}s) exceeded max lifetime ({self.max_worker_lifetime_seconds:.0f}s). "
|
||||
"Signaling for graceful retirement."
|
||||
)
|
||||
# Send QUIT signal; worker will finish current task then exit.
|
||||
w.quit()
|
||||
# mark as retiring to reject any future tasks that might be assigned in meantime
|
||||
w.retiring = True
|
||||
|
||||
if w.alive:
|
||||
# if we discover a task manager invocation that's been running
|
||||
# too long, reap it (because otherwise it'll just hold the postgres
|
||||
|
||||
@ -1100,3 +1100,8 @@ FLAGS = {
|
||||
'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}],
|
||||
'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}],
|
||||
}
|
||||
|
||||
# Dispatcher worker lifetime. If set to None, workers will never be retired
|
||||
# based on age. Note workers will finish their last task before retiring if
|
||||
# they are busy when they reach retirement age.
|
||||
WORKER_MAX_LIFETIME_SECONDS = 14400 # seconds
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user