From bdabe3602931acfcd61f441da823205377c23868 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 15 Oct 2020 15:25:47 -0400 Subject: [PATCH 1/3] reduce parent->child lock contention * We update the parent unified job template to point at new jobs created. We also update a similar foreign key when the job finishes running. This causes lock contention when the job template is allow_simultaneous and there are a lot of jobs from that job template running in parallel. I've seen as bad as 5 minutes waiting for the lock when a job finishes. * This change moves the parent->child update to OUTSIDE of the transaction if the job is allow_simultaneous (inherited from the parent unified job). We sacrafice a bit of correctness for performance. The logic is, if you are launching 1,000 parallel jobs do you really care that the job template contains a pointer to the last one you launched? Probably not. If you do, you can always query jobs related to the job template sorted by created time. --- awx/main/models/unified_jobs.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 1abbb29fcb..c50c8668d5 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -873,7 +873,13 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique # If status changed, update the parent instance. if self.status != status_before: - self._update_parent_instance() + # Update parent outside of the transaction for Job w/ allow_simultaneous=True + # This dodges lock contention at the expense of the foreign key not being + # completely correct. + if getattr(self, 'allow_simultaneous', False): + connection.on_commit(self._update_parent_instance) + else: + self._update_parent_instance() # Done. return result From 11cc6362b5ce63137adeb4b653ec359f3adfc781 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Fri, 2 Oct 2020 13:16:21 -0400 Subject: [PATCH 2/3] reduce per-job database query count * Do not query the database for the set of Instance that belong to the group for which we are trying to fit a job on, for each job. * Instead, cache the set of instances per-instance group. --- awx/main/models/ha.py | 10 +++-- awx/main/scheduler/task_manager.py | 53 +++++++++++++++++++++------ awx/main/tests/unit/models/test_ha.py | 35 +++++++----------- 3 files changed, 61 insertions(+), 37 deletions(-) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index fc4e9c022e..5071786653 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -261,18 +261,20 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): app_label = 'main' - def fit_task_to_most_remaining_capacity_instance(self, task): + @staticmethod + def fit_task_to_most_remaining_capacity_instance(task, instances): instance_most_capacity = None - for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): + for i in instances: if i.remaining_capacity >= task.task_impact and \ (instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity): instance_most_capacity = i return instance_most_capacity - def find_largest_idle_instance(self): + @staticmethod + def find_largest_idle_instance(instances): largest_instance = None - for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): + for i in instances: if i.jobs_running == 0: if largest_instance is None: largest_instance = i diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 9f4818bd37..861aa0b63f 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -7,6 +7,7 @@ import logging import uuid import json import random +from types import SimpleNamespace # Django from django.db import transaction, connection @@ -45,6 +46,15 @@ logger = logging.getLogger('awx.main.scheduler') class TaskManager(): def __init__(self): + ''' + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + ''' self.graph = dict() # start task limit indicates how many pending jobs can be started on this # .schedule() run. Starting jobs is expensive, and there is code in place to reap @@ -52,10 +62,30 @@ class TaskManager(): # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. self.start_task_limit = settings.START_TASK_LIMIT + + def after_lock_init(self): + ''' + Init AFTER we know this instance of the task manager will run because the lock is acquired. + ''' + instances = Instance.objects.filter(capacity__gt=0, enabled=True) + self.real_instances = {i.hostname: i for i in instances} + + instances_partial = [SimpleNamespace(obj=instance, + remaining_capacity=instance.remaining_capacity, + capacity=instance.capacity, + jobs_running=instance.jobs_running, + hostname=instance.hostname) for instance in instances] + + instances_by_hostname = {i.hostname: i for i in instances_partial} + for rampart_group in InstanceGroup.objects.prefetch_related('instances'): self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), capacity_total=rampart_group.capacity, - consumed_capacity=0) + consumed_capacity=0, + instances=[]) + for instance in rampart_group.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): + if instance.hostname in instances_by_hostname: + self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname]) def is_job_blocked(self, task): # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph @@ -466,7 +496,6 @@ class TaskManager(): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False - idle_instance_that_fits = None if isinstance(task, WorkflowJob): if task.unified_job_template_id in running_workflow_templates: if not task.allow_simultaneous: @@ -483,24 +512,23 @@ class TaskManager(): found_acceptable_queue = True break - if idle_instance_that_fits is None: - idle_instance_that_fits = rampart_group.find_largest_idle_instance() remaining_capacity = self.get_remaining_capacity(rampart_group.name) if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug("Skipping group {}, remaining_capacity {} <= 0".format( rampart_group.name, remaining_capacity)) continue - execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) - if execution_instance: - logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( - task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) - elif not execution_instance and idle_instance_that_fits: + execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(task, self.graph[rampart_group.name]['instances']) or \ + InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances']) + + if execution_instance or rampart_group.is_containerized: if not rampart_group.is_containerized: - execution_instance = idle_instance_that_fits + execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact) + execution_instance.jobs_running += 1 logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) - if execution_instance or rampart_group.is_containerized: + + execution_instance = self.real_instances[execution_instance.hostname] self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True @@ -572,6 +600,9 @@ class TaskManager(): def _schedule(self): finished_wfjs = [] all_sorted_tasks = self.get_tasks() + + self.after_lock_init() + if len(all_sorted_tasks) > 0: # TODO: Deal with # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py index 0e29caf8aa..2534acfd15 100644 --- a/awx/main/tests/unit/models/test_ha.py +++ b/awx/main/tests/unit/models/test_ha.py @@ -45,19 +45,14 @@ class TestInstanceGroup(object): (T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"), ]) def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): - with mock.patch.object(InstanceGroup, - 'instances', - Mock(spec_set=['filter'], - filter=lambda *args, **kargs: Mock(spec_set=['order_by'], - order_by=lambda x: instances))): - ig = InstanceGroup(id=10) + ig = InstanceGroup(id=10) - if instance_fit_index is None: - assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason - else: - assert ig.fit_task_to_most_remaining_capacity_instance(task) == \ - instances[instance_fit_index], reason + instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances) + if instance_fit_index is None: + assert instance_picked is None, reason + else: + assert instance_picked == instances[instance_fit_index], reason @pytest.mark.parametrize('instances,instance_fit_index,reason', [ (Is([(0, 100)]), 0, "One idle instance, pick it"), @@ -70,16 +65,12 @@ class TestInstanceGroup(object): def filter_offline_instances(*args): return filter(lambda i: i.capacity > 0, instances) - with mock.patch.object(InstanceGroup, - 'instances', - Mock(spec_set=['filter'], - filter=lambda *args, **kargs: Mock(spec_set=['order_by'], - order_by=filter_offline_instances))): - ig = InstanceGroup(id=10) + ig = InstanceGroup(id=10) + instances_online_only = filter_offline_instances(instances) - if instance_fit_index is None: - assert ig.find_largest_idle_instance() is None, reason - else: - assert ig.find_largest_idle_instance() == \ - instances[instance_fit_index], reason + if instance_fit_index is None: + assert ig.find_largest_idle_instance(instances_online_only) is None, reason + else: + assert ig.find_largest_idle_instance(instances_online_only) == \ + instances[instance_fit_index], reason From 79d7c6d9b3c80aba3f9e35a8dc076355223f9c2c Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 21 Oct 2020 14:17:04 -0400 Subject: [PATCH 3/3] make optimization code work with container groups * Task manager fit_ optimization code caused problems with container group code. * Note that we don't actually get the benefit of the optimization for container groups. We just make it so that the code doesn't blow up. It will take another pass to apply optimizations to the container group task manager path. --- awx/main/scheduler/task_manager.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 861aa0b63f..43d43fe64d 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -14,6 +14,7 @@ from django.db import transaction, connection from django.utils.translation import ugettext_lazy as _, gettext_noop from django.utils.timezone import now as tz_now from django.conf import settings +from django.db.models import Q # AWX from awx.main.dispatch.reaper import reap_job @@ -67,7 +68,7 @@ class TaskManager(): ''' Init AFTER we know this instance of the task manager will run because the lock is acquired. ''' - instances = Instance.objects.filter(capacity__gt=0, enabled=True) + instances = Instance.objects.filter(~Q(hostname=None), capacity__gt=0, enabled=True) self.real_instances = {i.hostname: i for i in instances} instances_partial = [SimpleNamespace(obj=instance, @@ -284,7 +285,7 @@ class TaskManager(): for group in InstanceGroup.objects.all(): if group.is_containerized or group.controller_id: continue - match = group.fit_task_to_most_remaining_capacity_instance(task) + match = group.fit_task_to_most_remaining_capacity_instance(task, group.instances.all()) if match: break task.instance_group = rampart_group @@ -528,7 +529,8 @@ class TaskManager(): logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) - execution_instance = self.real_instances[execution_instance.hostname] + if execution_instance: + execution_instance = self.real_instances[execution_instance.hostname] self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True