From 7eb0c7dd282a30a0a508518b9e9fbe7c3c49c20f Mon Sep 17 00:00:00 2001 From: Elijah DeLee Date: Wed, 6 Jul 2022 23:11:49 -0400 Subject: [PATCH] exit task manager loops early if we are timed out add settings to define task manager timeout and grace period This gives us still TASK_MANAGER_TIMEOUT_GRACE_PERIOD amount of time to get out of the task manager. Also, apply start task limit in WorkflowManager to starting pending workflows --- awx/main/dispatch/pool.py | 4 ++-- awx/main/scheduler/task_manager.py | 38 +++++++++++++++++++++++++++--- awx/settings/defaults.py | 5 ++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 576f6bf799..a302325993 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -406,8 +406,8 @@ class AutoscalePool(WorkerPool): w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age - if age > (60 * 5): - logger.error(f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}') # noqa + if age > (settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD): + logger.error(f'run_task_manager has held the advisory lock for {age}, sending SIGTERM to {w.pid}') # noqa os.kill(w.pid, signal.SIGTERM) for m in orphaned: diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 39c8bfcb81..87d075b406 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -64,10 +64,20 @@ class TaskBase: # ensures each task manager metric will be overridden when pipe_execute # is called later. self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + self.start_time = time.time() + self.start_task_limit = settings.START_TASK_LIMIT for m in self.subsystem_metrics.METRICS: if m.startswith(self.prefix): self.subsystem_metrics.set(m, 0) + def timed_out(self): + """Return True/False if we have met or exceeded the timeout for the task manager.""" + elapsed = time.time() - self.start_time + if elapsed >= settings.TASK_MANAGER_TIMEOUT: + logger.warning(f"{self.prefix} manager has run for {elapsed} which is greater than TASK_MANAGER_TIMEOUT of {settings.TASK_MANAGER_TIMEOUT}.") + return True + return False + def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs @@ -124,6 +134,11 @@ class WorkflowManager(TaskBase): def spawn_workflow_graph_jobs(self, workflow_jobs): result = [] for workflow_job in workflow_jobs: + if self.timed_out(): + logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early") + # Do not process any more workflow jobs. Stop here. + # Maybe we should schedule another WorkflowManager run + break dag = WorkflowDAG(workflow_job) status_changed = False if workflow_job.cancel_flag: @@ -228,6 +243,11 @@ class WorkflowManager(TaskBase): workflow_approvals = WorkflowApproval.objects.filter(status='pending') now = tz_now() for task in workflow_approvals: + if self.timed_out(): + logger.warning("Workflow manager has reached time out while processing approval nodes, exiting loop early") + # Do not process any more workflow approval nodes. Stop here. + # Maybe we should schedule another WorkflowManager run + break approval_timeout_seconds = timedelta(seconds=task.timeout) if task.timeout == 0: continue @@ -248,11 +268,21 @@ class WorkflowManager(TaskBase): workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')] workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')] workflow_to_start = [] - running_workflow_pk = {wf.pk for wf in workflow_jobs_running} + running_wfjt_ids = {wf.unified_job_template_id for wf in workflow_jobs_running} for wf in workflow_jobs_pending: - if wf.allow_simultaneous or wf.pk not in running_workflow_pk: + if wf.allow_simultaneous or wf.pk not in running_wfjt_ids: wf.status = 'running' workflow_to_start.append(wf) + running_wfjt_ids.add(wf.unified_job_template_id) + logger.debug('Transitioning %s to running status.', wf.log_format) + self.start_task_limit -= 1 + if self.start_task_limit == 0: + break + if self.timed_out(): + logger.warning(f"Workflow manager has reached time out processing pending workflows, exiting loop early") + break + else: + logger.debug('Workflow %s staying in pending, blocked by another running workflow from the same workflow job template', wf.log_format) WorkflowJob.objects.bulk_update(workflow_to_start, ['status']) workflow_jobs_running.extend(workflow_to_start) @@ -466,7 +496,6 @@ class TaskManager(TaskBase): # the task manager after 5 minutes. At scale, the task manager can easily take more than # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. - self.start_task_limit = settings.START_TASK_LIMIT self.time_delta_job_explanation = timedelta(seconds=30) self.prefix = "task_manager" super().__init__() @@ -573,6 +602,9 @@ class TaskManager(TaskBase): for task in pending_tasks: if self.start_task_limit <= 0: break + if self.timed_out(): + logger.warning("Task manager has reached time out while processing pending jobs, exiting loop early") + break blocked_by = self.job_blocked_by(task) if blocked_by: self.subsystem_metrics.inc(f"{self.prefix}_tasks_blocked", 1) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 6a02d0d710..a2b19a82d8 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -248,6 +248,11 @@ SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL = 15 # The maximum allowed jobs to start on a given task manager cycle START_TASK_LIMIT = 100 +# Time out task managers if they take longer than this many seconds, plus TASK_MANAGER_TIMEOUT_GRACE_PERIOD +# We have the grace period so the task manager can bail out before the timeout. +TASK_MANAGER_TIMEOUT = 300 +TASK_MANAGER_TIMEOUT_GRACE_PERIOD = 60 + # Disallow sending session cookies over insecure connections SESSION_COOKIE_SECURE = True