Merge pull request #8403 from chrismeyersfsu/fix-same_jt_abuse_devel

Improve general performance for a variety of high-load job launch use cases

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2020-10-19 19:38:32 +00:00 committed by GitHub
commit d7864c58c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 38 deletions

View File

@ -261,18 +261,20 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
app_label = 'main'
def fit_task_to_most_remaining_capacity_instance(self, task):
@staticmethod
def fit_task_to_most_remaining_capacity_instance(task, instances):
instance_most_capacity = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
for i in instances:
if i.remaining_capacity >= task.task_impact and \
(instance_most_capacity is None or
i.remaining_capacity > instance_most_capacity.remaining_capacity):
instance_most_capacity = i
return instance_most_capacity
def find_largest_idle_instance(self):
@staticmethod
def find_largest_idle_instance(instances):
largest_instance = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
for i in instances:
if i.jobs_running == 0:
if largest_instance is None:
largest_instance = i

View File

@ -873,7 +873,13 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
# If status changed, update the parent instance.
if self.status != status_before:
self._update_parent_instance()
# Update parent outside of the transaction for Job w/ allow_simultaneous=True
# This dodges lock contention at the expense of the foreign key not being
# completely correct.
if getattr(self, 'allow_simultaneous', False):
connection.on_commit(self._update_parent_instance)
else:
self._update_parent_instance()
# Done.
return result

View File

@ -7,6 +7,7 @@ import logging
import uuid
import json
import random
from types import SimpleNamespace
# Django
from django.db import transaction, connection
@ -45,6 +46,15 @@ logger = logging.getLogger('awx.main.scheduler')
class TaskManager():
def __init__(self):
'''
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
'''
self.graph = dict()
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
@ -52,10 +62,30 @@ class TaskManager():
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
def after_lock_init(self):
'''
Init AFTER we know this instance of the task manager will run because the lock is acquired.
'''
instances = Instance.objects.filter(capacity__gt=0, enabled=True)
self.real_instances = {i.hostname: i for i in instances}
instances_partial = [SimpleNamespace(obj=instance,
remaining_capacity=instance.remaining_capacity,
capacity=instance.capacity,
jobs_running=instance.jobs_running,
hostname=instance.hostname) for instance in instances]
instances_by_hostname = {i.hostname: i for i in instances_partial}
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
capacity_total=rampart_group.capacity,
consumed_capacity=0)
consumed_capacity=0,
instances=[])
for instance in rampart_group.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
if instance.hostname in instances_by_hostname:
self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname])
def is_job_blocked(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
@ -466,7 +496,6 @@ class TaskManager():
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
idle_instance_that_fits = None
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous:
@ -483,24 +512,23 @@ class TaskManager():
found_acceptable_queue = True
break
if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(
rampart_group.name, remaining_capacity))
continue
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
if execution_instance:
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
elif not execution_instance and idle_instance_that_fits:
execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(task, self.graph[rampart_group.name]['instances']) or \
InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'])
if execution_instance or rampart_group.is_containerized:
if not rampart_group.is_containerized:
execution_instance = idle_instance_that_fits
execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact)
execution_instance.jobs_running += 1
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
if execution_instance or rampart_group.is_containerized:
execution_instance = self.real_instances[execution_instance.hostname]
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
@ -572,6 +600,9 @@ class TaskManager():
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
self.after_lock_init()
if len(all_sorted_tasks) > 0:
# TODO: Deal with
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)

View File

@ -45,19 +45,14 @@ class TestInstanceGroup(object):
(T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
])
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=lambda x: instances))):
ig = InstanceGroup(id=10)
ig = InstanceGroup(id=10)
if instance_fit_index is None:
assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason
else:
assert ig.fit_task_to_most_remaining_capacity_instance(task) == \
instances[instance_fit_index], reason
instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances)
if instance_fit_index is None:
assert instance_picked is None, reason
else:
assert instance_picked == instances[instance_fit_index], reason
@pytest.mark.parametrize('instances,instance_fit_index,reason', [
(Is([(0, 100)]), 0, "One idle instance, pick it"),
@ -70,16 +65,12 @@ class TestInstanceGroup(object):
def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances)
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=filter_offline_instances))):
ig = InstanceGroup(id=10)
ig = InstanceGroup(id=10)
instances_online_only = filter_offline_instances(instances)
if instance_fit_index is None:
assert ig.find_largest_idle_instance() is None, reason
else:
assert ig.find_largest_idle_instance() == \
instances[instance_fit_index], reason
if instance_fit_index is None:
assert ig.find_largest_idle_instance(instances_online_only) is None, reason
else:
assert ig.find_largest_idle_instance(instances_online_only) == \
instances[instance_fit_index], reason