reduce per-job database query count

* Do not query the database for the set of Instance that belong to the
group for which we are trying to fit a job on, for each job.
* Instead, cache the set of instances per-instance group.
This commit is contained in:
Chris Meyers
2020-10-02 13:16:21 -04:00
parent 09a0448c3e
commit 2eac5a8873
3 changed files with 61 additions and 37 deletions

View File

@@ -261,18 +261,20 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
app_label = 'main' app_label = 'main'
def fit_task_to_most_remaining_capacity_instance(self, task): @staticmethod
def fit_task_to_most_remaining_capacity_instance(task, instances):
instance_most_capacity = None instance_most_capacity = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): for i in instances:
if i.remaining_capacity >= task.task_impact and \ if i.remaining_capacity >= task.task_impact and \
(instance_most_capacity is None or (instance_most_capacity is None or
i.remaining_capacity > instance_most_capacity.remaining_capacity): i.remaining_capacity > instance_most_capacity.remaining_capacity):
instance_most_capacity = i instance_most_capacity = i
return instance_most_capacity return instance_most_capacity
def find_largest_idle_instance(self): @staticmethod
def find_largest_idle_instance(instances):
largest_instance = None largest_instance = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): for i in instances:
if i.jobs_running == 0: if i.jobs_running == 0:
if largest_instance is None: if largest_instance is None:
largest_instance = i largest_instance = i

View File

@@ -7,6 +7,7 @@ import logging
import uuid import uuid
import json import json
import random import random
from types import SimpleNamespace
# Django # Django
from django.db import transaction, connection from django.db import transaction, connection
@@ -45,6 +46,15 @@ logger = logging.getLogger('awx.main.scheduler')
class TaskManager(): class TaskManager():
def __init__(self): def __init__(self):
'''
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
'''
self.graph = dict() self.graph = dict()
# start task limit indicates how many pending jobs can be started on this # start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap # .schedule() run. Starting jobs is expensive, and there is code in place to reap
@@ -52,10 +62,30 @@ class TaskManager():
# 5 minutes to start pending jobs. If this limit is reached, pending jobs # 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle. # will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT self.start_task_limit = settings.START_TASK_LIMIT
def after_lock_init(self):
'''
Init AFTER we know this instance of the task manager will run because the lock is acquired.
'''
instances = Instance.objects.filter(capacity__gt=0, enabled=True)
self.real_instances = {i.hostname: i for i in instances}
instances_partial = [SimpleNamespace(obj=instance,
remaining_capacity=instance.remaining_capacity,
capacity=instance.capacity,
jobs_running=instance.jobs_running,
hostname=instance.hostname) for instance in instances]
instances_by_hostname = {i.hostname: i for i in instances_partial}
for rampart_group in InstanceGroup.objects.prefetch_related('instances'): for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
capacity_total=rampart_group.capacity, capacity_total=rampart_group.capacity,
consumed_capacity=0) consumed_capacity=0,
instances=[])
for instance in rampart_group.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
if instance.hostname in instances_by_hostname:
self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname])
def is_job_blocked(self, task): def is_job_blocked(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
@@ -466,7 +496,6 @@ class TaskManager():
continue continue
preferred_instance_groups = task.preferred_instance_groups preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False found_acceptable_queue = False
idle_instance_that_fits = None
if isinstance(task, WorkflowJob): if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates: if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous: if not task.allow_simultaneous:
@@ -483,24 +512,23 @@ class TaskManager():
found_acceptable_queue = True found_acceptable_queue = True
break break
if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
remaining_capacity = self.get_remaining_capacity(rampart_group.name) remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0: if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format( logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(
rampart_group.name, remaining_capacity)) rampart_group.name, remaining_capacity))
continue continue
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(task, self.graph[rampart_group.name]['instances']) or \
if execution_instance: InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'])
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) if execution_instance or rampart_group.is_containerized:
elif not execution_instance and idle_instance_that_fits:
if not rampart_group.is_containerized: if not rampart_group.is_containerized:
execution_instance = idle_instance_that_fits execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact)
execution_instance.jobs_running += 1
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
if execution_instance or rampart_group.is_containerized:
execution_instance = self.real_instances[execution_instance.hostname]
self.graph[rampart_group.name]['graph'].add_job(task) self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True found_acceptable_queue = True
@@ -572,6 +600,9 @@ class TaskManager():
def _schedule(self): def _schedule(self):
finished_wfjs = [] finished_wfjs = []
all_sorted_tasks = self.get_tasks() all_sorted_tasks = self.get_tasks()
self.after_lock_init()
if len(all_sorted_tasks) > 0: if len(all_sorted_tasks) > 0:
# TODO: Deal with # TODO: Deal with
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)

View File

@@ -45,19 +45,14 @@ class TestInstanceGroup(object):
(T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"), (T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
]) ])
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
with mock.patch.object(InstanceGroup, ig = InstanceGroup(id=10)
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=lambda x: instances))):
ig = InstanceGroup(id=10)
if instance_fit_index is None: instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances)
assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason
else:
assert ig.fit_task_to_most_remaining_capacity_instance(task) == \
instances[instance_fit_index], reason
if instance_fit_index is None:
assert instance_picked is None, reason
else:
assert instance_picked == instances[instance_fit_index], reason
@pytest.mark.parametrize('instances,instance_fit_index,reason', [ @pytest.mark.parametrize('instances,instance_fit_index,reason', [
(Is([(0, 100)]), 0, "One idle instance, pick it"), (Is([(0, 100)]), 0, "One idle instance, pick it"),
@@ -70,16 +65,12 @@ class TestInstanceGroup(object):
def filter_offline_instances(*args): def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances) return filter(lambda i: i.capacity > 0, instances)
with mock.patch.object(InstanceGroup, ig = InstanceGroup(id=10)
'instances', instances_online_only = filter_offline_instances(instances)
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=filter_offline_instances))):
ig = InstanceGroup(id=10)
if instance_fit_index is None: if instance_fit_index is None:
assert ig.find_largest_idle_instance() is None, reason assert ig.find_largest_idle_instance(instances_online_only) is None, reason
else: else:
assert ig.find_largest_idle_instance() == \ assert ig.find_largest_idle_instance(instances_online_only) == \
instances[instance_fit_index], reason instances[instance_fit_index], reason