exit task manager loops early if we are timed out

add settings to define task manager timeout and grace period

This gives us still TASK_MANAGER_TIMEOUT_GRACE_PERIOD amount of time to
get out of the task manager.

Also, apply start task limit in WorkflowManager to starting pending
workflows
This commit is contained in:
Elijah DeLee 2022-07-06 23:11:49 -04:00 committed by Seth Foster
parent 236c1df676
commit 7eb0c7dd28
No known key found for this signature in database
GPG Key ID: 86E90D96F7184028
3 changed files with 42 additions and 5 deletions

View File

@ -406,8 +406,8 @@ class AutoscalePool(WorkerPool):
w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age
if age > (60 * 5):
logger.error(f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}') # noqa
if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD):
logger.error(f'run_task_manager has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa
os.kill(w.pid, signal.SIGTERM)
for m in orphaned:

View File

@ -64,10 +64,20 @@ class TaskBase:
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.start_time = time.time()
self.start_task_limit = settings.START_TASK_LIMIT
for m in self.subsystem_metrics.METRICS:
if m.startswith(self.prefix):
self.subsystem_metrics.set(m, 0)
def timed_out(self):
"""Return True/False if we have met or exceeded the timeout for the task manager."""
elapsed = time.time() - self.start_time
if elapsed >= settings.TASK_MANAGER_TIMEOUT:
logger.warning(f"{self.prefix} manager has run for {elapsed} which is greater than TASK_MANAGER_TIMEOUT of {settings.TASK_MANAGER_TIMEOUT}.")
return True
return False
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')]
return graph_workflow_jobs
@ -124,6 +134,11 @@ class WorkflowManager(TaskBase):
def spawn_workflow_graph_jobs(self, workflow_jobs):
result = []
for workflow_job in workflow_jobs:
if self.timed_out():
logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early")
# Do not process any more workflow jobs. Stop here.
# Maybe we should schedule another WorkflowManager run
break
dag = WorkflowDAG(workflow_job)
status_changed = False
if workflow_job.cancel_flag:
@ -228,6 +243,11 @@ class WorkflowManager(TaskBase):
workflow_approvals = WorkflowApproval.objects.filter(status='pending')
now = tz_now()
for task in workflow_approvals:
if self.timed_out():
logger.warning("Workflow manager has reached time out while processing approval nodes, exiting loop early")
# Do not process any more workflow approval nodes. Stop here.
# Maybe we should schedule another WorkflowManager run
break
approval_timeout_seconds = timedelta(seconds=task.timeout)
if task.timeout == 0:
continue
@ -248,11 +268,21 @@ class WorkflowManager(TaskBase):
workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')]
workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')]
workflow_to_start = []
running_workflow_pk = {wf.pk for wf in workflow_jobs_running}
running_wfjt_ids = {wf.unified_job_template_id for wf in workflow_jobs_running}
for wf in workflow_jobs_pending:
if wf.allow_simultaneous or wf.pk not in running_workflow_pk:
if wf.allow_simultaneous or wf.pk not in running_wfjt_ids:
wf.status = 'running'
workflow_to_start.append(wf)
running_wfjt_ids.add(wf.unified_job_template_id)
logger.debug('Transitioning %s to running status.', wf.log_format)
self.start_task_limit -= 1
if self.start_task_limit == 0:
break
if self.timed_out():
logger.warning(f"Workflow manager has reached time out processing pending workflows, exiting loop early")
break
else:
logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format)
WorkflowJob.objects.bulk_update(workflow_to_start, ['status'])
workflow_jobs_running.extend(workflow_to_start)
@ -466,7 +496,6 @@ class TaskManager(TaskBase):
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.prefix = "task_manager"
super().__init__()
@ -573,6 +602,9 @@ class TaskManager(TaskBase):
for task in pending_tasks:
if self.start_task_limit <= 0:
break
if self.timed_out():
logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early")
break
blocked_by = self.job_blocked_by(task)
if blocked_by:
self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1)

View File

@ -248,6 +248,11 @@ SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL = 15
# The maximum allowed jobs to start on a given task manager cycle
START_TASK_LIMIT = 100
# Time out task managers if they take longer than this many seconds, plus TASK_MANAGER_TIMEOUT_GRACE_PERIOD
# We have the grace period so the task manager can bail out before the timeout.
TASK_MANAGER_TIMEOUT = 300
TASK_MANAGER_TIMEOUT_GRACE_PERIOD = 60
# Disallow sending session cookies over insecure connections
SESSION_COOKIE_SECURE = True