From 3c51cb130f77c6f7e2d813d9870ed2841d4fca85 Mon Sep 17 00:00:00 2001 From: Shane McDonald Date: Wed, 13 Jul 2022 16:40:28 -0400 Subject: [PATCH] Add grace period settings for task manager timeout, and pod / job waiting reapers Co-authored-by: Alan Rominger --- awx/main/dispatch/pool.py | 18 +++++++++++------- awx/main/dispatch/reaper.py | 20 +++++++++++++++----- awx/main/tasks/jobs.py | 5 ++++- awx/main/tasks/system.py | 5 +++-- awx/settings/defaults.py | 11 +++++++++++ 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index a102ff0b03..c9a60f2265 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -344,6 +344,10 @@ class AutoscalePool(WorkerPool): # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) + # the task manager enforces settings.TASK_MANAGER_TIMEOUT on its own + # but if the task takes longer than the time defined here, we will force it to stop here + self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD + @property def should_grow(self): if len(self.workers) < self.min_workers: @@ -377,7 +381,6 @@ class AutoscalePool(WorkerPool): if there's an outage, this method _can_ throw various django.db.utils.Error exceptions. Act accordingly. """ - start_time = time.time() orphaned = [] for w in self.workers[::]: if not w.alive: @@ -419,8 +422,8 @@ class AutoscalePool(WorkerPool): w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age - if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD): - logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa + if age > self.task_manager_timeout: + logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') os.kill(w.pid, signal.SIGTERM) for m in orphaned: @@ -436,10 +439,11 @@ class AutoscalePool(WorkerPool): for worker in self.workers: worker.calculate_managed_tasks() running_uuids.extend(list(worker.managed_tasks.keys())) - delta = time.time() - start_time - if delta > 1.0: - logger.warning(f'Took {delta} for internal part of cleanup') - start_time = time.time() + + # if we are not in the dangerous situation of queue backup then clear old waiting jobs + if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: + reaper.reap_waiting(excluded_uuids=running_uuids) + reaper.reap(excluded_uuids=running_uuids) def up(self): diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index fc67b5762f..243ae3b715 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -2,6 +2,7 @@ from datetime import timedelta import logging from django.db.models import Q +from django.conf import settings from django.utils.timezone import now as tz_now from django.contrib.contenttypes.models import ContentType @@ -34,7 +35,9 @@ def startup_reaping(): def reap_job(j, status): - if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'): + j.refresh_from_db(fields=['status']) + status_before = j.status + if status_before not in ('running', 'waiting'): # just in case, don't reap jobs that aren't running return j.status = status @@ -49,13 +52,16 @@ def reap_job(j, status): if hasattr(j, 'send_notification_templates'): j.send_notification_templates('failed') j.websocket_emit_status(status) - logger.error('{} is no longer running; reaping'.format(j.log_format)) + logger.error(f'{j.log_format} is no longer {status_before}; reaping') -def reap_waiting(instance=None, status='failed', grace_period=60): +def reap_waiting(instance=None, status='failed', grace_period=None, excluded_uuids=None): """ Reap all jobs in waiting for this instance. """ + if grace_period is None: + grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT + me = instance if me is None: try: @@ -65,11 +71,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60): return now = tz_now() jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: reap_job(j, status) -def reap(instance=None, status='failed', excluded_uuids=[]): +def reap(instance=None, status='failed', excluded_uuids=None): """ Reap all jobs in running for this instance. """ @@ -83,6 +91,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]): workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id jobs = UnifiedJob.objects.filter( Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) - ).exclude(celery_task_id__in=excluded_uuids) + ) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: reap_job(j, status) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 954ced13fd..94ce454b44 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -609,7 +609,10 @@ class BaseTask(object): elif status == 'canceled': self.instance = self.update_model(pk) if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback(): - self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") + # MERGE: prefer devel over this with runner_callback.delay_update() + job_explanation = "Task was canceled due to receiving a shutdown signal." + self.instance.job_explanation = self.instance.job_explanation or job_explanation + extra_update_fields['job_explanation'] = self.instance.job_explanation status = 'failed' except ReceptorNodeNotFound as exc: self.runner_callback.delay_update(job_explanation=str(exc)) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 7f9e157d8c..fc50938ece 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -15,7 +15,7 @@ from distutils.version import LooseVersion as Version from django.conf import settings from django.db import transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str from django.contrib.auth.models import User from django.utils.translation import gettext_lazy as _ @@ -608,7 +608,8 @@ def awx_k8s_reaper(): for group in InstanceGroup.objects.filter(is_container_group=True).iterator(): logger.debug("Checking for orphaned k8s pods for {}.".format(group)) pods = PodManager.list_active_jobs(group) - for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES): + time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD) + for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES): logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) try: pm = PodManager(job) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index bf20f8b9f8..499225a17d 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1028,3 +1028,14 @@ DEFAULT_CONTAINER_RUN_OPTIONS = ['--network', 'slirp4netns:enable_ipv6=true'] # Mount exposed paths as hostPath resource in k8s/ocp AWX_MOUNT_ISOLATED_PATHS_ON_K8S = False + +# Time out task managers if they take longer than this many seconds +TASK_MANAGER_TIMEOUT = 300 + +# Number of seconds _in addition to_ the task manager timeout a job can stay +# in waiting without being reaped +JOB_WAITING_GRACE_PERIOD = 60 + +# Number of seconds after a container group job finished time to wait +# before the awx_k8s_reaper task will tear down the pods +K8S_POD_REAPER_GRACE_PERIOD = 60