Add grace period settings for task manager timeout, and pod / job waiting reapers

Co-authored-by: Alan Rominger <arominge@redhat.com>
This commit is contained in:
Shane McDonald
2022-07-13 16:40:28 -04:00
committed by Alan Rominger
parent c649809eb2
commit 3c51cb130f
5 changed files with 44 additions and 15 deletions

View File

@@ -344,6 +344,10 @@ class AutoscalePool(WorkerPool):
# max workers can't be less than min_workers
self.max_workers = max(self.min_workers, self.max_workers)
# the task manager enforces settings.TASK_MANAGER_TIMEOUT on its own
# but if the task takes longer than the time defined here, we will force it to stop here
self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD
@property
def should_grow(self):
if len(self.workers) < self.min_workers:
@@ -377,7 +381,6 @@ class AutoscalePool(WorkerPool):
if there's an outage, this method _can_ throw various
django.db.utils.Error exceptions. Act accordingly.
"""
start_time = time.time()
orphaned = []
for w in self.workers[::]:
if not w.alive:
@@ -419,8 +422,8 @@ class AutoscalePool(WorkerPool):
w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age
if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD):
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa
if age > self.task_manager_timeout:
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}')
os.kill(w.pid, signal.SIGTERM)
for m in orphaned:
@@ -436,10 +439,11 @@ class AutoscalePool(WorkerPool):
for worker in self.workers:
worker.calculate_managed_tasks()
running_uuids.extend(list(worker.managed_tasks.keys()))
delta = time.time() - start_time
if delta > 1.0:
logger.warning(f'Took {delta} for internal part of cleanup')
start_time = time.time()
# if we are not in the dangerous situation of queue backup then clear old waiting jobs
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting(excluded_uuids=running_uuids)
reaper.reap(excluded_uuids=running_uuids)
def up(self):

View File

@@ -2,6 +2,7 @@ from datetime import timedelta
import logging
from django.db.models import Q
from django.conf import settings
from django.utils.timezone import now as tz_now
from django.contrib.contenttypes.models import ContentType
@@ -34,7 +35,9 @@ def startup_reaping():
def reap_job(j, status):
if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'):
j.refresh_from_db(fields=['status'])
status_before = j.status
if status_before not in ('running', 'waiting'):
# just in case, don't reap jobs that aren't running
return
j.status = status
@@ -49,13 +52,16 @@ def reap_job(j, status):
if hasattr(j, 'send_notification_templates'):
j.send_notification_templates('failed')
j.websocket_emit_status(status)
logger.error('{} is no longer running; reaping'.format(j.log_format))
logger.error(f'{j.log_format} is no longer {status_before}; reaping')
def reap_waiting(instance=None, status='failed', grace_period=60):
def reap_waiting(instance=None, status='failed', grace_period=None, excluded_uuids=None):
"""
Reap all jobs in waiting for this instance.
"""
if grace_period is None:
grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT
me = instance
if me is None:
try:
@@ -65,11 +71,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60):
return
now = tz_now()
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs:
reap_job(j, status)
def reap(instance=None, status='failed', excluded_uuids=[]):
def reap(instance=None, status='failed', excluded_uuids=None):
"""
Reap all jobs in running for this instance.
"""
@@ -83,6 +91,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]):
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter(
Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
).exclude(celery_task_id__in=excluded_uuids)
)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs:
reap_job(j, status)