mirror of
https://github.com/ansible/awx.git
synced 2026-02-19 12:10:06 -03:30
PR #8074 - limit how many jobs the task manager can start on a given run
This commit is contained in:
@@ -12,6 +12,7 @@ import random
|
|||||||
from django.db import transaction, connection
|
from django.db import transaction, connection
|
||||||
from django.utils.translation import ugettext_lazy as _, gettext_noop
|
from django.utils.translation import ugettext_lazy as _, gettext_noop
|
||||||
from django.utils.timezone import now as tz_now
|
from django.utils.timezone import now as tz_now
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.dispatch.reaper import reap_job
|
from awx.main.dispatch.reaper import reap_job
|
||||||
@@ -45,6 +46,12 @@ class TaskManager():
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.graph = dict()
|
self.graph = dict()
|
||||||
|
# start task limit indicates how many pending jobs can be started on this
|
||||||
|
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
|
||||||
|
# the task manager after 5 minutes. At scale, the task manager can easily take more than
|
||||||
|
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
|
||||||
|
# will no longer be started and will be started on the next task manager cycle.
|
||||||
|
self.start_task_limit = settings.START_TASK_LIMIT
|
||||||
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
|
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
|
||||||
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
|
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
|
||||||
capacity_total=rampart_group.capacity,
|
capacity_total=rampart_group.capacity,
|
||||||
@@ -189,6 +196,10 @@ class TaskManager():
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
|
def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
|
||||||
|
self.start_task_limit -= 1
|
||||||
|
if self.start_task_limit == 0:
|
||||||
|
# schedule another run immediately after this task manager
|
||||||
|
schedule_task_manager()
|
||||||
from awx.main.tasks import handle_work_error, handle_work_success
|
from awx.main.tasks import handle_work_error, handle_work_success
|
||||||
|
|
||||||
dependent_tasks = dependent_tasks or []
|
dependent_tasks = dependent_tasks or []
|
||||||
@@ -448,6 +459,8 @@ class TaskManager():
|
|||||||
def process_pending_tasks(self, pending_tasks):
|
def process_pending_tasks(self, pending_tasks):
|
||||||
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
|
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
|
||||||
for task in pending_tasks:
|
for task in pending_tasks:
|
||||||
|
if self.start_task_limit <= 0:
|
||||||
|
break
|
||||||
if self.is_job_blocked(task):
|
if self.is_job_blocked(task):
|
||||||
logger.debug("{} is blocked from running".format(task.log_format))
|
logger.debug("{} is blocked from running".format(task.log_format))
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -203,6 +203,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000
|
|||||||
# The number of job events to migrate per-transaction when moving from int -> bigint
|
# The number of job events to migrate per-transaction when moving from int -> bigint
|
||||||
JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000
|
JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000
|
||||||
|
|
||||||
|
# The maximum allowed jobs to start on a given task manager cycle
|
||||||
|
START_TASK_LIMIT = 100
|
||||||
|
|
||||||
# Disallow sending session cookies over insecure connections
|
# Disallow sending session cookies over insecure connections
|
||||||
SESSION_COOKIE_SECURE = True
|
SESSION_COOKIE_SECURE = True
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user