diff --git a/awx/main/managers.py b/awx/main/managers.py index d1f351bb6c..9d032799b5 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -79,3 +79,33 @@ class InstanceManager(models.Manager): def my_role(self): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "tower" + + def capacity_mapping(self): + """ + Returns tuple of two dictionaries that shows mutual connections by name + for global accounting of capacity + + instance_ig_mapping: {'instance_name': } + ig_ig_mapping: {'group_name': } + """ + qs = self.all().prefetch_related('rampart_groups') + instance_ig_mapping = {} + ig_instance_mapping = {} + # Create simple dictionary of instance IG memberships + for instance in qs.all(): + if instance.capacity == 0: + continue + instance_ig_mapping[instance.hostname] = set() + for group in instance.rampart_groups.all(): + ig_instance_mapping.setdefault(group.name, set()) + ig_instance_mapping[group.name].add(instance.hostname) + instance_ig_mapping[instance.hostname].add(group.name) + # Create IG mapping by union of all groups their instances are members of + ig_ig_mapping = {} + for group_name in ig_instance_mapping.keys(): + ig_ig_set = set() + for instance_hostname in ig_instance_mapping[group_name]: + ig_ig_set |= instance_ig_mapping[instance_hostname] + ig_ig_mapping[group_name] = ig_ig_set + + return instance_ig_mapping, ig_ig_mapping diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 2337170091..eb4d77cf44 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -403,15 +403,21 @@ class TaskManager(): preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False for rampart_group in preferred_instance_groups: - if self.get_remaining_capacity(rampart_group.name) <= 0: - logger.debug("Skipping group %s capacity <= 0", rampart_group.name) + remaining_capacity = self.get_remaining_capacity(rampart_group.name) + if remaining_capacity <= 0: + logger.debug("Skipping group %s, remaining_capacity %s <= 0", + rampart_group.name, remaining_capacity) continue if not self.would_exceed_capacity(task, rampart_group.name): - logger.debug("Starting %s in group %s", task.log_format, rampart_group.name) + logger.debug("Starting %s in group %s (remaining_capacity=%s)", + task.log_format, rampart_group.name, remaining_capacity) self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain()) found_acceptable_queue = True break + else: + logger.debug("Not enough capacity to run %s on %s (remaining_capacity=%s)", + task.log_format, rampart_group.name, remaining_capacity) if not found_acceptable_queue: logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format) @@ -489,11 +495,19 @@ class TaskManager(): def calculate_capacity_used(self, tasks): for rampart_group in self.graph: self.graph[rampart_group]['capacity_used'] = 0 + instance_ig_mapping, ig_ig_mapping = Instance.objects.capacity_mapping() for t in tasks: # TODO: dock capacity for isolated job management tasks running in queue - for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'): - if group_actual[0] in self.graph: - self.graph[group_actual[0]]['capacity_used'] += t.task_impact + if t.status == 'waiting': + # Subtract capacity from any peer groups that share instances + for instance_group_name in ig_ig_mapping[t.instance_group.name]: + self.graph[instance_group_name]['capacity_used'] += t.task_impact + elif t.status == 'running': + # Subtract capacity from all groups that contain the instance + for instance_group_name in instance_ig_mapping[t.execution_node]: + self.graph[instance_group_name]['capacity_used'] += t.task_impact + else: + logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format) def would_exceed_capacity(self, task, instance_group): current_capacity = self.graph[instance_group]['capacity_used'] @@ -503,6 +517,9 @@ class TaskManager(): return (task.task_impact + current_capacity > capacity_total) def consume_capacity(self, task, instance_group): + logger.debug('%s consumed %s capacity units from %s with prior total of %s', + task.log_format, task.task_impact, instance_group, + self.graph[instance_group]['capacity_used']) self.graph[instance_group]['capacity_used'] += task.task_impact def get_remaining_capacity(self, instance_group): @@ -540,12 +557,13 @@ class TaskManager(): return finished_wfjs def schedule(self): - logger.debug("Starting Schedule") with transaction.atomic(): # Lock with advisory_lock('task_manager_lock', wait=False) as acquired: if acquired is False: + logger.debug("Not running scheduler, another task holds lock") return + logger.debug("Starting Scheduler") self.cleanup_inconsistent_celery_tasks() finished_wfjs = self._schedule() diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 05de8b2a81..fa44eed201 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -4,15 +4,43 @@ from datetime import timedelta, datetime from django.core.cache import cache from django.utils.timezone import now as tz_now +from django.test import TransactionTestCase from awx.main.scheduler import TaskManager from awx.main.models import ( Job, Instance, + InstanceGroup, WorkflowJob, ) + +@pytest.mark.django_db +class TestCapacityMapping(TransactionTestCase): + + def sample_cluster(self): + ig_small = InstanceGroup.objects.create(name='ig_small') + ig_large = InstanceGroup.objects.create(name='ig_large') + tower = InstanceGroup.objects.create(name='tower') + i1 = Instance.objects.create(hostname='i1', capacity=200) + i2 = Instance.objects.create(hostname='i2', capacity=200) + i3 = Instance.objects.create(hostname='i3', capacity=200) + ig_small.instances.add(i1) + ig_large.instances.add(i2, i3) + tower.instances.add(i2) + + def test_something(self): + self.sample_cluster() + with self.assertNumQueries(2): + inst_map, ig_map = Instance.objects.capacity_mapping() + assert inst_map['i1'] == set(['ig_small']) + assert inst_map['i2'] == set(['ig_large', 'tower']) + assert ig_map['ig_small'] == set(['ig_small']) + assert ig_map['ig_large'] == set(['ig_large', 'tower']) + assert ig_map['tower'] == set(['ig_large', 'tower']) + + @pytest.mark.django_db def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker): objects = job_template_factory('jt', organization='org1', project='proj',