pid kill each of the 3 task managers on timeout

This commit is contained in:
Seth Foster 2022-07-06 00:44:30 -04:00
parent b26d2ab0e9
commit b3eb9e0193
No known key found for this signature in database
GPG Key ID: 86E90D96F7184028
2 changed files with 9 additions and 9 deletions

View File

@ -401,13 +401,15 @@ class AutoscalePool(WorkerPool):
# the task manager to never do more work
current_task = w.current_task
if current_task and isinstance(current_task, dict):
if current_task.get('task', '').endswith('tasks.run_task_manager'):
endings = ['tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager']
current_task_name = current_task.get('task', '')
if any([current_task_name.endswith(e) for e in endings]):
if 'started' not in current_task:
w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age
if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD):
logger.error(f'run_task_manager has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa
os.kill(w.pid, signal.SIGTERM)
for m in orphaned:

View File

@ -59,7 +59,8 @@ def timeit(func):
class TaskBase:
def __init__(self):
def __init__(self, prefix=""):
self.prefix = prefix
# initialize each metric to 0 and force metric_has_changed to true. This
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
@ -127,8 +128,7 @@ class TaskBase:
class WorkflowManager(TaskBase):
def __init__(self):
self.prefix = "workflow_manager"
super().__init__()
super().__init__(prefix="workflow_manager")
@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
@ -298,8 +298,7 @@ class WorkflowManager(TaskBase):
class DependencyManager(TaskBase):
def __init__(self):
self.prefix = "dependency_manager"
super().__init__()
super().__init__(prefix="dependency_manager")
def create_project_update(self, task, project_id=None):
if project_id is None:
@ -497,8 +496,7 @@ class TaskManager(TaskBase):
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.time_delta_job_explanation = timedelta(seconds=30)
self.prefix = "task_manager"
super().__init__()
super().__init__(prefix="task_manager")
def after_lock_init(self, all_sorted_tasks):
"""