From b3eb9e01934e7cd1e02a81fd526020771b447c5c Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Wed, 6 Jul 2022 00:44:30 -0400 Subject: [PATCH] pid kill each of the 3 task managers on timeout --- awx/main/dispatch/pool.py | 6 ++++-- awx/main/scheduler/task_manager.py | 12 +++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index a302325993..64a08c3e7f 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -401,13 +401,15 @@ class AutoscalePool(WorkerPool): # the task manager to never do more work current_task = w.current_task if current_task and isinstance(current_task, dict): - if current_task.get('task', '').endswith('tasks.run_task_manager'): + endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager'] + current_task_name = current_task.get('task', '') + if any([current_task_name.endswith(e) for e in endings]): if 'started' not in current_task: w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD): - logger.error(f'run_task_manager has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa + logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa os.kill(w.pid, signal.SIGTERM) for m in orphaned: diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 394577544c..044f095be5 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -59,7 +59,8 @@ def timeit(func): class TaskBase: - def __init__(self): + def __init__(self, prefix=""): + self.prefix = prefix # initialize each metric to 0 and force metric_has_changed to true. This # ensures each task manager metric will be overridden when pipe_execute # is called later. @@ -127,8 +128,7 @@ class TaskBase: class WorkflowManager(TaskBase): def __init__(self): - self.prefix = "workflow_manager" - super().__init__() + super().__init__(prefix="workflow_manager") @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): @@ -298,8 +298,7 @@ class WorkflowManager(TaskBase): class DependencyManager(TaskBase): def __init__(self): - self.prefix = "dependency_manager" - super().__init__() + super().__init__(prefix="dependency_manager") def create_project_update(self, task, project_id=None): if project_id is None: @@ -497,8 +496,7 @@ class TaskManager(TaskBase): # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. self.time_delta_job_explanation = timedelta(seconds=30) - self.prefix = "task_manager" - super().__init__() + super().__init__(prefix="task_manager") def after_lock_init(self, all_sorted_tasks): """