diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index cba6161d83..9f4818bd37 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -12,6 +12,7 @@ import random from django.db import transaction, connection from django.utils.translation import ugettext_lazy as _, gettext_noop from django.utils.timezone import now as tz_now +from django.conf import settings # AWX from awx.main.dispatch.reaper import reap_job @@ -45,6 +46,12 @@ class TaskManager(): def __init__(self): self.graph = dict() + # start task limit indicates how many pending jobs can be started on this + # .schedule() run. Starting jobs is expensive, and there is code in place to reap + # the task manager after 5 minutes. At scale, the task manager can easily take more than + # 5 minutes to start pending jobs. If this limit is reached, pending jobs + # will no longer be started and will be started on the next task manager cycle. + self.start_task_limit = settings.START_TASK_LIMIT for rampart_group in InstanceGroup.objects.prefetch_related('instances'): self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), capacity_total=rampart_group.capacity, @@ -189,6 +196,10 @@ class TaskManager(): return result def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): + self.start_task_limit -= 1 + if self.start_task_limit == 0: + # schedule another run immediately after this task manager + schedule_task_manager() from awx.main.tasks import handle_work_error, handle_work_success dependent_tasks = dependent_tasks or [] @@ -448,6 +459,8 @@ class TaskManager(): def process_pending_tasks(self, pending_tasks): running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()]) for task in pending_tasks: + if self.start_task_limit <= 0: + break if self.is_job_blocked(task): logger.debug("{} is blocked from running".format(task.log_format)) continue diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 8d42af1ae4..05e7ed77e3 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -203,6 +203,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000 # The number of job events to migrate per-transaction when moving from int -> bigint JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000 +# The maximum allowed jobs to start on a given task manager cycle +START_TASK_LIMIT = 100 + # Disallow sending session cookies over insecure connections SESSION_COOKIE_SECURE = True