From ce3c969c08f668e4fd7abed5fe8c2bbeb0eb47c8 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 25 Aug 2017 11:59:43 -0400 Subject: [PATCH 1/3] correct capacity algorithm for task manager --- awx/main/managers.py | 30 +++++++++++++++++ awx/main/scheduler/__init__.py | 32 +++++++++++++++---- .../task_management/test_scheduler.py | 28 ++++++++++++++++ 3 files changed, 83 insertions(+), 7 deletions(-) diff --git a/awx/main/managers.py b/awx/main/managers.py index d1f351bb6c..9d032799b5 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -79,3 +79,33 @@ class InstanceManager(models.Manager): def my_role(self): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "tower" + + def capacity_mapping(self): + """ + Returns tuple of two dictionaries that shows mutual connections by name + for global accounting of capacity + + instance_ig_mapping: {'instance_name': } + ig_ig_mapping: {'group_name': } + """ + qs = self.all().prefetch_related('rampart_groups') + instance_ig_mapping = {} + ig_instance_mapping = {} + # Create simple dictionary of instance IG memberships + for instance in qs.all(): + if instance.capacity == 0: + continue + instance_ig_mapping[instance.hostname] = set() + for group in instance.rampart_groups.all(): + ig_instance_mapping.setdefault(group.name, set()) + ig_instance_mapping[group.name].add(instance.hostname) + instance_ig_mapping[instance.hostname].add(group.name) + # Create IG mapping by union of all groups their instances are members of + ig_ig_mapping = {} + for group_name in ig_instance_mapping.keys(): + ig_ig_set = set() + for instance_hostname in ig_instance_mapping[group_name]: + ig_ig_set |= instance_ig_mapping[instance_hostname] + ig_ig_mapping[group_name] = ig_ig_set + + return instance_ig_mapping, ig_ig_mapping diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 2337170091..eb4d77cf44 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -403,15 +403,21 @@ class TaskManager(): preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False for rampart_group in preferred_instance_groups: - if self.get_remaining_capacity(rampart_group.name) <= 0: - logger.debug("Skipping group %s capacity <= 0", rampart_group.name) + remaining_capacity = self.get_remaining_capacity(rampart_group.name) + if remaining_capacity <= 0: + logger.debug("Skipping group %s, remaining_capacity %s <= 0", + rampart_group.name, remaining_capacity) continue if not self.would_exceed_capacity(task, rampart_group.name): - logger.debug("Starting %s in group %s", task.log_format, rampart_group.name) + logger.debug("Starting %s in group %s (remaining_capacity=%s)", + task.log_format, rampart_group.name, remaining_capacity) self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain()) found_acceptable_queue = True break + else: + logger.debug("Not enough capacity to run %s on %s (remaining_capacity=%s)", + task.log_format, rampart_group.name, remaining_capacity) if not found_acceptable_queue: logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format) @@ -489,11 +495,19 @@ class TaskManager(): def calculate_capacity_used(self, tasks): for rampart_group in self.graph: self.graph[rampart_group]['capacity_used'] = 0 + instance_ig_mapping, ig_ig_mapping = Instance.objects.capacity_mapping() for t in tasks: # TODO: dock capacity for isolated job management tasks running in queue - for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'): - if group_actual[0] in self.graph: - self.graph[group_actual[0]]['capacity_used'] += t.task_impact + if t.status == 'waiting': + # Subtract capacity from any peer groups that share instances + for instance_group_name in ig_ig_mapping[t.instance_group.name]: + self.graph[instance_group_name]['capacity_used'] += t.task_impact + elif t.status == 'running': + # Subtract capacity from all groups that contain the instance + for instance_group_name in instance_ig_mapping[t.execution_node]: + self.graph[instance_group_name]['capacity_used'] += t.task_impact + else: + logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format) def would_exceed_capacity(self, task, instance_group): current_capacity = self.graph[instance_group]['capacity_used'] @@ -503,6 +517,9 @@ class TaskManager(): return (task.task_impact + current_capacity > capacity_total) def consume_capacity(self, task, instance_group): + logger.debug('%s consumed %s capacity units from %s with prior total of %s', + task.log_format, task.task_impact, instance_group, + self.graph[instance_group]['capacity_used']) self.graph[instance_group]['capacity_used'] += task.task_impact def get_remaining_capacity(self, instance_group): @@ -540,12 +557,13 @@ class TaskManager(): return finished_wfjs def schedule(self): - logger.debug("Starting Schedule") with transaction.atomic(): # Lock with advisory_lock('task_manager_lock', wait=False) as acquired: if acquired is False: + logger.debug("Not running scheduler, another task holds lock") return + logger.debug("Starting Scheduler") self.cleanup_inconsistent_celery_tasks() finished_wfjs = self._schedule() diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 05de8b2a81..fa44eed201 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -4,15 +4,43 @@ from datetime import timedelta, datetime from django.core.cache import cache from django.utils.timezone import now as tz_now +from django.test import TransactionTestCase from awx.main.scheduler import TaskManager from awx.main.models import ( Job, Instance, + InstanceGroup, WorkflowJob, ) + +@pytest.mark.django_db +class TestCapacityMapping(TransactionTestCase): + + def sample_cluster(self): + ig_small = InstanceGroup.objects.create(name='ig_small') + ig_large = InstanceGroup.objects.create(name='ig_large') + tower = InstanceGroup.objects.create(name='tower') + i1 = Instance.objects.create(hostname='i1', capacity=200) + i2 = Instance.objects.create(hostname='i2', capacity=200) + i3 = Instance.objects.create(hostname='i3', capacity=200) + ig_small.instances.add(i1) + ig_large.instances.add(i2, i3) + tower.instances.add(i2) + + def test_something(self): + self.sample_cluster() + with self.assertNumQueries(2): + inst_map, ig_map = Instance.objects.capacity_mapping() + assert inst_map['i1'] == set(['ig_small']) + assert inst_map['i2'] == set(['ig_large', 'tower']) + assert ig_map['ig_small'] == set(['ig_small']) + assert ig_map['ig_large'] == set(['ig_large', 'tower']) + assert ig_map['tower'] == set(['ig_large', 'tower']) + + @pytest.mark.django_db def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker): objects = job_template_factory('jt', organization='org1', project='proj', From 5327a4c62234ee3f3b6fccf1b18b118ae38b5417 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Sun, 27 Aug 2017 13:14:35 -0400 Subject: [PATCH 2/3] Use global capacity algorithm in serializer The task manager was doing work to compute currently consumed capacity, this is moved into the manager and applied in the same form to the instance group list. --- awx/api/serializers.py | 29 ++++- awx/main/access.py | 12 +- awx/main/managers.py | 108 +++++++++++++++--- awx/main/models/ha.py | 11 +- awx/main/scheduler/__init__.py | 30 ++--- awx/main/scheduler/tasks.py | 10 +- .../task_management/test_capacity.py | 69 +++++++++++ .../task_management/test_scheduler.py | 28 ----- 8 files changed, 216 insertions(+), 81 deletions(-) create mode 100644 awx/main/tests/functional/task_management/test_capacity.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index b580f57ec3..f759a85054 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3564,6 +3564,7 @@ class InstanceSerializer(BaseSerializer): class InstanceGroupSerializer(BaseSerializer): + consumed_capacity = serializers.SerializerMethodField() percent_capacity_remaining = serializers.SerializerMethodField() jobs_running = serializers.SerializerMethodField() instances = serializers.SerializerMethodField() @@ -3581,17 +3582,37 @@ class InstanceGroupSerializer(BaseSerializer): res['controller'] = self.reverse('api:instance_group_detail', kwargs={'pk': obj.controller_id}) return res + def get_jobs_qs(self): + # Store running jobs queryset in context, so it will be shared in ListView + if 'running_jobs' not in self.context: + self.context['running_jobs'] = UnifiedJob.objects.filter( + status__in=('running', 'waiting')) + return self.context['running_jobs'] + + def get_capacity_dict(self): + # Store capacity values (globally computed) in the context + if 'capacity_map' not in self.context: + ig_qs = None + if self.parent: # Is ListView: + ig_qs = self.parent.instance + self.context['capacity_map'] = InstanceGroup.objects.capacity_values( + qs=ig_qs, tasks=self.get_jobs_qs(), breakdown=True) + return self.context['capacity_map'] + def get_consumed_capacity(self, obj): - return obj.consumed_capacity + return self.get_capacity_dict()[obj.name]['consumed_capacity'] def get_percent_capacity_remaining(self, obj): - if not obj.capacity or obj.consumed_capacity == obj.capacity: + if not obj.capacity: return 0.0 else: - return float("{0:.2f}".format(((float(obj.capacity) - float(obj.consumed_capacity)) / (float(obj.capacity))) * 100)) + return float("{0:.2f}".format( + ((float(obj.capacity) - float(self.get_consumed_capacity(obj))) / (float(obj.capacity))) * 100) + ) def get_jobs_running(self, obj): - return UnifiedJob.objects.filter(instance_group=obj, status__in=('running', 'waiting',)).count() + jobs_qs = self.get_jobs_qs() + return sum(1 for job in jobs_qs if job.instance_group_id == obj.id) def get_instances(self, obj): return obj.instances.count() diff --git a/awx/main/access.py b/awx/main/access.py index 8842336ffb..4f760897f4 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -377,9 +377,11 @@ class InstanceAccess(BaseAccess): def get_queryset(self): if self.user.is_superuser or self.user.is_system_auditor: - return Instance.objects.all().distinct() + qs = Instance.objects.all().distinct() else: - return Instance.objects.filter(rampart_groups__in=self.user.get_queryset(InstanceGroup)).distinct() + qs = Instance.objects.filter( + rampart_groups__in=self.user.get_queryset(InstanceGroup)).distinct() + return qs.prefetch_related('rampart_groups') def can_add(self, data): return False @@ -397,9 +399,11 @@ class InstanceGroupAccess(BaseAccess): def get_queryset(self): if self.user.is_superuser or self.user.is_system_auditor: - return InstanceGroup.objects.all() + qs = InstanceGroup.objects.all() else: - return InstanceGroup.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role')) + qs = InstanceGroup.objects.filter( + organization__in=Organization.accessible_pk_qs(self.user, 'admin_role')) + return qs.prefetch_related('instances') def can_add(self, data): return False diff --git a/awx/main/managers.py b/awx/main/managers.py index 9d032799b5..4f53da65ae 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -3,6 +3,7 @@ import sys from datetime import timedelta +import logging from django.db import models from django.utils.timezone import now @@ -11,7 +12,9 @@ from django.conf import settings from awx.main.utils.filters import SmartFilter -___all__ = ['HostManager', 'InstanceManager'] +___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager'] + +logger = logging.getLogger('awx.main.managers') class HostManager(models.Manager): @@ -48,6 +51,17 @@ class HostManager(models.Manager): return qs +def get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping): + # Create IG mapping by union of all groups their instances are members of + ig_ig_mapping = {} + for group_name in ig_instance_mapping.keys(): + ig_ig_set = set() + for instance_hostname in ig_instance_mapping[group_name]: + ig_ig_set |= instance_ig_mapping[instance_hostname] + ig_ig_mapping[group_name] = ig_ig_set + return ig_ig_mapping + + class InstanceManager(models.Manager): """A custom manager class for the Instance model. @@ -80,7 +94,7 @@ class InstanceManager(models.Manager): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "tower" - def capacity_mapping(self): + def capacity_mapping(self, qs=None): """ Returns tuple of two dictionaries that shows mutual connections by name for global accounting of capacity @@ -88,24 +102,90 @@ class InstanceManager(models.Manager): instance_ig_mapping: {'instance_name': } ig_ig_mapping: {'group_name': } """ - qs = self.all().prefetch_related('rampart_groups') + if qs is None: + qs = self.all().prefetch_related('rampart_groups') instance_ig_mapping = {} ig_instance_mapping = {} - # Create simple dictionary of instance IG memberships - for instance in qs.all(): + # Create dictionaries that represent basic m2m memberships + for instance in qs: if instance.capacity == 0: continue - instance_ig_mapping[instance.hostname] = set() + instance_ig_mapping[instance.hostname] = set( + group.name for group in instance.rampart_groups.all() + ) for group in instance.rampart_groups.all(): ig_instance_mapping.setdefault(group.name, set()) ig_instance_mapping[group.name].add(instance.hostname) - instance_ig_mapping[instance.hostname].add(group.name) - # Create IG mapping by union of all groups their instances are members of - ig_ig_mapping = {} - for group_name in ig_instance_mapping.keys(): - ig_ig_set = set() - for instance_hostname in ig_instance_mapping[group_name]: - ig_ig_set |= instance_ig_mapping[instance_hostname] - ig_ig_mapping[group_name] = ig_ig_set + # Get IG capacity overlap mapping + ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping) return instance_ig_mapping, ig_ig_mapping + + +class InstanceGroupManager(models.Manager): + """A custom manager class for the Instance model. + + Used for global capacity calculations + """ + + def capacity_mapping(self, qs=None): + """ + Another entry-point to Instance manager method by same name + """ + if qs is None: + qs = self.all().prefetch_related('instances') + instance_ig_mapping = {} + ig_instance_mapping = {} + # Create dictionaries that represent basic m2m memberships + for group in qs: + ig_instance_mapping[group.name] = set( + instance.hostname for instance in group.instances.all() if + instance.capacity != 0 + ) + for inst in group.instances.all(): + if inst.capacity == 0: + continue + instance_ig_mapping.setdefault(inst.hostname, set()) + instance_ig_mapping[inst.hostname].add(group.name) + # Get IG capacity overlap mapping + ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping) + + return instance_ig_mapping, ig_ig_mapping + + def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None): + """ + Returns a dictionary of capacity values for all IGs + """ + if qs is None: + qs = self.all().prefetch_related('instances') + instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs) + + if tasks is None: + tasks = self.model.unifiedjob_set.related.related_model.objects.filter( + status__in=('running', 'waiting')) + + if graph is None: + graph = {group.name: {} for group in qs} + for group_name in graph: + graph[group_name]['consumed_capacity'] = 0 + if breakdown: + graph[group_name]['committed_capacity'] = 0 + graph[group_name]['running_capacity'] = 0 + for t in tasks: + # TODO: dock capacity for isolated job management tasks running in queue + impact = t.task_impact + if t.status == 'waiting' or not t.execution_node: + # Subtract capacity from any peer groups that share instances + for group_name in ig_ig_mapping[t.instance_group.name]: + graph[group_name]['consumed_capacity'] += impact + if breakdown: + graph[group_name]['committed_capacity'] += impact + elif t.status == 'running': + # Subtract capacity from all groups that contain the instance + for group_name in instance_ig_mapping[t.execution_node]: + graph[group_name]['consumed_capacity'] += impact + if breakdown: + graph[group_name]['running_capacity'] += impact + else: + logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format) + return graph diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 509e37b4ac..f2e57f7a07 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -11,7 +11,7 @@ from django.utils.timezone import now, timedelta from solo.models import SingletonModel from awx.api.versioning import reverse -from awx.main.managers import InstanceManager +from awx.main.managers import InstanceManager, InstanceGroupManager from awx.main.models.inventory import InventoryUpdate from awx.main.models.jobs import Job from awx.main.models.projects import ProjectUpdate @@ -66,6 +66,8 @@ class Instance(models.Model): class InstanceGroup(models.Model): """A model representing a Queue/Group of AWX Instances.""" + objects = InstanceGroupManager() + name = models.CharField(max_length=250, unique=True) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) @@ -89,12 +91,7 @@ class InstanceGroup(models.Model): @property def capacity(self): - return sum([x[0] for x in self.instances.values_list('capacity')]) - - @property - def consumed_capacity(self): - return sum(x.task_impact for x in UnifiedJob.objects.filter(instance_group=self, - status__in=('running', 'waiting'))) + return sum([inst.capacity for inst in self.instances.all()]) class Meta: app_label = 'main' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index eb4d77cf44..6430d007a4 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -41,7 +41,7 @@ class TaskManager(): for rampart_group in InstanceGroup.objects.all(): self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), capacity_total=rampart_group.capacity, - capacity_used=0) + consumed_capacity=0) def is_job_blocked(self, task): # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph @@ -492,25 +492,11 @@ class TaskManager(): self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time) - def calculate_capacity_used(self, tasks): - for rampart_group in self.graph: - self.graph[rampart_group]['capacity_used'] = 0 - instance_ig_mapping, ig_ig_mapping = Instance.objects.capacity_mapping() - for t in tasks: - # TODO: dock capacity for isolated job management tasks running in queue - if t.status == 'waiting': - # Subtract capacity from any peer groups that share instances - for instance_group_name in ig_ig_mapping[t.instance_group.name]: - self.graph[instance_group_name]['capacity_used'] += t.task_impact - elif t.status == 'running': - # Subtract capacity from all groups that contain the instance - for instance_group_name in instance_ig_mapping[t.execution_node]: - self.graph[instance_group_name]['capacity_used'] += t.task_impact - else: - logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format) + def calculate_capacity_consumed(self, tasks): + self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) def would_exceed_capacity(self, task, instance_group): - current_capacity = self.graph[instance_group]['capacity_used'] + current_capacity = self.graph[instance_group]['consumed_capacity'] capacity_total = self.graph[instance_group]['capacity_total'] if current_capacity == 0: return False @@ -519,16 +505,16 @@ class TaskManager(): def consume_capacity(self, task, instance_group): logger.debug('%s consumed %s capacity units from %s with prior total of %s', task.log_format, task.task_impact, instance_group, - self.graph[instance_group]['capacity_used']) - self.graph[instance_group]['capacity_used'] += task.task_impact + self.graph[instance_group]['consumed_capacity']) + self.graph[instance_group]['consumed_capacity'] += task.task_impact def get_remaining_capacity(self, instance_group): - return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['capacity_used']) + return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['consumed_capacity']) def process_tasks(self, all_sorted_tasks): running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks) - self.calculate_capacity_used(running_tasks) + self.calculate_capacity_consumed(running_tasks) self.process_running_tasks(running_tasks) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 48b80cdbb6..ae97f367be 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -3,7 +3,7 @@ import logging # Celery -from celery import task +from celery import Task, task # AWX from awx.main.scheduler import TaskManager @@ -15,6 +15,12 @@ logger = logging.getLogger('awx.main.scheduler') # updated model, the call to schedule() may get stale data. +class LogErrorsTask(Task): + def on_failure(self, exc, task_id, args, kwargs, einfo): + logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc) + super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) + + @task def run_job_launch(job_id): TaskManager().schedule() @@ -25,7 +31,7 @@ def run_job_complete(job_id): TaskManager().schedule() -@task +@task(base=LogErrorsTask) def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() diff --git a/awx/main/tests/functional/task_management/test_capacity.py b/awx/main/tests/functional/task_management/test_capacity.py new file mode 100644 index 0000000000..66d2eab7f2 --- /dev/null +++ b/awx/main/tests/functional/task_management/test_capacity.py @@ -0,0 +1,69 @@ +import pytest +import mock + +from django.test import TransactionTestCase + +from awx.main.models import ( + Job, + Instance, + InstanceGroup, +) + + +@pytest.mark.django_db +class TestCapacityMapping(TransactionTestCase): + + def sample_cluster(self): + ig_small = InstanceGroup.objects.create(name='ig_small') + ig_large = InstanceGroup.objects.create(name='ig_large') + tower = InstanceGroup.objects.create(name='tower') + i1 = Instance.objects.create(hostname='i1', capacity=200) + i2 = Instance.objects.create(hostname='i2', capacity=200) + i3 = Instance.objects.create(hostname='i3', capacity=200) + ig_small.instances.add(i1) + ig_large.instances.add(i2, i3) + tower.instances.add(i2) + return [tower, ig_large, ig_small] + + def test_mapping(self): + self.sample_cluster() + with self.assertNumQueries(2): + inst_map, ig_map = Instance.objects.capacity_mapping() + assert inst_map['i1'] == set(['ig_small']) + assert inst_map['i2'] == set(['ig_large', 'tower']) + assert ig_map['ig_small'] == set(['ig_small']) + assert ig_map['ig_large'] == set(['ig_large', 'tower']) + assert ig_map['tower'] == set(['ig_large', 'tower']) + + def test_committed_capacity(self): + tower, ig_large, ig_small = self.sample_cluster() + tasks = [ + Job(status='waiting', instance_group=tower), + Job(status='waiting', instance_group=ig_large), + Job(status='waiting', instance_group=ig_small) + ] + with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)): + capacities = InstanceGroup.objects.capacity_values( + tasks=tasks, breakdown=True + ) + # Jobs submitted to either tower or ig_larg must count toward both + assert capacities['tower']['committed_capacity'] == 43 * 2 + assert capacities['ig_large']['committed_capacity'] == 43 * 2 + assert capacities['ig_small']['committed_capacity'] == 43 + + def test_running_capacity(self): + tower, ig_large, ig_small = self.sample_cluster() + tasks = [ + Job(status='running', execution_node='i1'), + Job(status='running', execution_node='i2'), + Job(status='running', execution_node='i3') + ] + with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)): + capacities = InstanceGroup.objects.capacity_values( + tasks=tasks, breakdown=True + ) + # Tower is only given 1 instance + assert capacities['tower']['running_capacity'] == 43 + # Large IG has 2 instances + assert capacities['ig_large']['running_capacity'] == 43 * 2 + assert capacities['ig_small']['running_capacity'] == 43 diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index fa44eed201..05de8b2a81 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -4,43 +4,15 @@ from datetime import timedelta, datetime from django.core.cache import cache from django.utils.timezone import now as tz_now -from django.test import TransactionTestCase from awx.main.scheduler import TaskManager from awx.main.models import ( Job, Instance, - InstanceGroup, WorkflowJob, ) - -@pytest.mark.django_db -class TestCapacityMapping(TransactionTestCase): - - def sample_cluster(self): - ig_small = InstanceGroup.objects.create(name='ig_small') - ig_large = InstanceGroup.objects.create(name='ig_large') - tower = InstanceGroup.objects.create(name='tower') - i1 = Instance.objects.create(hostname='i1', capacity=200) - i2 = Instance.objects.create(hostname='i2', capacity=200) - i3 = Instance.objects.create(hostname='i3', capacity=200) - ig_small.instances.add(i1) - ig_large.instances.add(i2, i3) - tower.instances.add(i2) - - def test_something(self): - self.sample_cluster() - with self.assertNumQueries(2): - inst_map, ig_map = Instance.objects.capacity_mapping() - assert inst_map['i1'] == set(['ig_small']) - assert inst_map['i2'] == set(['ig_large', 'tower']) - assert ig_map['ig_small'] == set(['ig_small']) - assert ig_map['ig_large'] == set(['ig_large', 'tower']) - assert ig_map['tower'] == set(['ig_large', 'tower']) - - @pytest.mark.django_db def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker): objects = job_template_factory('jt', organization='org1', project='proj', From d54eb93f26f03529b3a55458eecafa1892a07da0 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 28 Aug 2017 15:50:13 -0400 Subject: [PATCH 3/3] Handle capacity algorithm corner cases Instance has gone lost, and jobs are still either running or waiting inside of its instance group RBAC - user does not have permission to see some of the groups that would be used in the capacity calculation For some cases, a naive capacity dictionary is returned, main goal is to not throw errors and avoid unpredicted behavior Detailed capacity tests are moved into new unit test file. --- awx/main/managers.py | 71 ++++----- awx/main/scheduler/__init__.py | 2 +- .../task_management/test_capacity.py | 37 +---- awx/main/tests/unit/test_capacity.py | 135 ++++++++++++++++++ awx/main/tests/unit/test_task_manager.py | 4 +- 5 files changed, 176 insertions(+), 73 deletions(-) create mode 100644 awx/main/tests/unit/test_capacity.py diff --git a/awx/main/managers.py b/awx/main/managers.py index 4f53da65ae..155a46fa25 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -58,6 +58,8 @@ def get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping): ig_ig_set = set() for instance_hostname in ig_instance_mapping[group_name]: ig_ig_set |= instance_ig_mapping[instance_hostname] + else: + ig_ig_set.add(group_name) # Group contains no instances, return self ig_ig_mapping[group_name] = ig_ig_set return ig_ig_mapping @@ -94,33 +96,6 @@ class InstanceManager(models.Manager): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "tower" - def capacity_mapping(self, qs=None): - """ - Returns tuple of two dictionaries that shows mutual connections by name - for global accounting of capacity - - instance_ig_mapping: {'instance_name': } - ig_ig_mapping: {'group_name': } - """ - if qs is None: - qs = self.all().prefetch_related('rampart_groups') - instance_ig_mapping = {} - ig_instance_mapping = {} - # Create dictionaries that represent basic m2m memberships - for instance in qs: - if instance.capacity == 0: - continue - instance_ig_mapping[instance.hostname] = set( - group.name for group in instance.rampart_groups.all() - ) - for group in instance.rampart_groups.all(): - ig_instance_mapping.setdefault(group.name, set()) - ig_instance_mapping[group.name].add(instance.hostname) - # Get IG capacity overlap mapping - ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping) - - return instance_ig_mapping, ig_ig_mapping - class InstanceGroupManager(models.Manager): """A custom manager class for the Instance model. @@ -152,11 +127,20 @@ class InstanceGroupManager(models.Manager): return instance_ig_mapping, ig_ig_mapping + @staticmethod + def zero_out_group(graph, name, breakdown): + if name not in graph: + graph[name] = {} + graph[name]['consumed_capacity'] = 0 + if breakdown: + graph[name]['committed_capacity'] = 0 + graph[name]['running_capacity'] = 0 + def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None): """ Returns a dictionary of capacity values for all IGs """ - if qs is None: + if qs is None: # Optionally BYOQS - bring your own queryset qs = self.all().prefetch_related('instances') instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs) @@ -167,22 +151,41 @@ class InstanceGroupManager(models.Manager): if graph is None: graph = {group.name: {} for group in qs} for group_name in graph: - graph[group_name]['consumed_capacity'] = 0 - if breakdown: - graph[group_name]['committed_capacity'] = 0 - graph[group_name]['running_capacity'] = 0 + self.zero_out_group(graph, group_name, breakdown) for t in tasks: # TODO: dock capacity for isolated job management tasks running in queue impact = t.task_impact if t.status == 'waiting' or not t.execution_node: # Subtract capacity from any peer groups that share instances - for group_name in ig_ig_mapping[t.instance_group.name]: + if not t.instance_group: + logger.warning('Excluded %s from capacity algorithm ' + '(missing instance_group).', t.log_format) + impacted_groups = [] + elif t.instance_group.name not in ig_ig_mapping: + # Waiting job in group with 0 capacity has no collateral impact + impacted_groups = [t.instance_group.name] + else: + impacted_groups = ig_ig_mapping[t.instance_group.name] + for group_name in impacted_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) graph[group_name]['consumed_capacity'] += impact if breakdown: graph[group_name]['committed_capacity'] += impact elif t.status == 'running': # Subtract capacity from all groups that contain the instance - for group_name in instance_ig_mapping[t.execution_node]: + if t.execution_node not in instance_ig_mapping: + logger.warning('Detected %s running inside lost instance, ' + 'may still be waiting for reaper.', t.log_format) + if t.instance_group: + impacted_groups = [t.instance_group.name] + else: + impacted_groups = [] + else: + impacted_groups = instance_ig_mapping[t.execution_node] + for group_name in impacted_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) graph[group_name]['consumed_capacity'] += impact if breakdown: graph[group_name]['running_capacity'] += impact diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 6430d007a4..a6f477539c 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -38,7 +38,7 @@ class TaskManager(): def __init__(self): self.graph = dict() - for rampart_group in InstanceGroup.objects.all(): + for rampart_group in InstanceGroup.objects.prefetch_related('instances'): self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), capacity_total=rampart_group.capacity, consumed_capacity=0) diff --git a/awx/main/tests/functional/task_management/test_capacity.py b/awx/main/tests/functional/task_management/test_capacity.py index 66d2eab7f2..7b7b7d7dc0 100644 --- a/awx/main/tests/functional/task_management/test_capacity.py +++ b/awx/main/tests/functional/task_management/test_capacity.py @@ -1,10 +1,8 @@ import pytest -import mock from django.test import TransactionTestCase from awx.main.models import ( - Job, Instance, InstanceGroup, ) @@ -28,42 +26,9 @@ class TestCapacityMapping(TransactionTestCase): def test_mapping(self): self.sample_cluster() with self.assertNumQueries(2): - inst_map, ig_map = Instance.objects.capacity_mapping() + inst_map, ig_map = InstanceGroup.objects.capacity_mapping() assert inst_map['i1'] == set(['ig_small']) assert inst_map['i2'] == set(['ig_large', 'tower']) assert ig_map['ig_small'] == set(['ig_small']) assert ig_map['ig_large'] == set(['ig_large', 'tower']) assert ig_map['tower'] == set(['ig_large', 'tower']) - - def test_committed_capacity(self): - tower, ig_large, ig_small = self.sample_cluster() - tasks = [ - Job(status='waiting', instance_group=tower), - Job(status='waiting', instance_group=ig_large), - Job(status='waiting', instance_group=ig_small) - ] - with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)): - capacities = InstanceGroup.objects.capacity_values( - tasks=tasks, breakdown=True - ) - # Jobs submitted to either tower or ig_larg must count toward both - assert capacities['tower']['committed_capacity'] == 43 * 2 - assert capacities['ig_large']['committed_capacity'] == 43 * 2 - assert capacities['ig_small']['committed_capacity'] == 43 - - def test_running_capacity(self): - tower, ig_large, ig_small = self.sample_cluster() - tasks = [ - Job(status='running', execution_node='i1'), - Job(status='running', execution_node='i2'), - Job(status='running', execution_node='i3') - ] - with mock.patch.object(Job, 'task_impact', new=mock.PropertyMock(return_value=43)): - capacities = InstanceGroup.objects.capacity_values( - tasks=tasks, breakdown=True - ) - # Tower is only given 1 instance - assert capacities['tower']['running_capacity'] == 43 - # Large IG has 2 instances - assert capacities['ig_large']['running_capacity'] == 43 * 2 - assert capacities['ig_small']['running_capacity'] == 43 diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py new file mode 100644 index 0000000000..7817521f2e --- /dev/null +++ b/awx/main/tests/unit/test_capacity.py @@ -0,0 +1,135 @@ +import pytest + +from awx.main.models import InstanceGroup + + +class FakeObject(object): + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + +class Job(FakeObject): + task_impact = 43 + + def log_format(self): + return 'job 382 (fake)' + + +@pytest.fixture +def sample_cluster(): + def stand_up_cluster(): + + class Instances(FakeObject): + def add(self, *args): + for instance in args: + self.obj.instance_list.append(instance) + + def all(self): + return self.obj.instance_list + + class InstanceGroup(FakeObject): + + def __init__(self, **kwargs): + super(InstanceGroup, self).__init__(**kwargs) + self.instance_list = [] + + @property + def instances(self): + mgr = Instances(obj=self) + return mgr + + + class Instance(FakeObject): + pass + + + ig_small = InstanceGroup(name='ig_small') + ig_large = InstanceGroup(name='ig_large') + tower = InstanceGroup(name='tower') + i1 = Instance(hostname='i1', capacity=200) + i2 = Instance(hostname='i2', capacity=200) + i3 = Instance(hostname='i3', capacity=200) + ig_small.instances.add(i1) + ig_large.instances.add(i2, i3) + tower.instances.add(i2) + return [tower, ig_large, ig_small] + return stand_up_cluster + + +def test_committed_capacity(sample_cluster): + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='waiting', instance_group=tower), + Job(status='waiting', instance_group=ig_large), + Job(status='waiting', instance_group=ig_small) + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True + ) + # Jobs submitted to either tower or ig_larg must count toward both + assert capacities['tower']['committed_capacity'] == 43 * 2 + assert capacities['ig_large']['committed_capacity'] == 43 * 2 + assert capacities['ig_small']['committed_capacity'] == 43 + + +def test_running_capacity(sample_cluster): + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='running', execution_node='i1'), + Job(status='running', execution_node='i2'), + Job(status='running', execution_node='i3') + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True + ) + # Tower is only given 1 instance + assert capacities['tower']['running_capacity'] == 43 + # Large IG has 2 instances + assert capacities['ig_large']['running_capacity'] == 43 * 2 + assert capacities['ig_small']['running_capacity'] == 43 + + +def test_offline_node_running(sample_cluster): + """ + Assure that algorithm doesn't explode if a job is marked running + in an offline node + """ + tower, ig_large, ig_small = sample_cluster() + ig_small.instance_list[0].capacity = 0 + tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks) + assert capacities['ig_small']['consumed_capacity'] == 43 + + +def test_offline_node_waiting(sample_cluster): + """ + Same but for a waiting job + """ + tower, ig_large, ig_small = sample_cluster() + ig_small.instance_list[0].capacity = 0 + tasks = [Job(status='waiting', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks) + assert capacities['ig_small']['consumed_capacity'] == 43 + + +def test_RBAC_reduced_filter(sample_cluster): + """ + User can see jobs that are running in `ig_small` and `ig_large` IGs, + but user does not have permission to see those actual instance groups. + Verify that this does not blow everything up. + """ + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='waiting', instance_group=tower), + Job(status='waiting', instance_group=ig_large), + Job(status='waiting', instance_group=ig_small) + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower], tasks=tasks, breakdown=True + ) + # Cross-links between groups not visible to current user, + # so a naieve accounting of capacities is returned instead + assert capacities['tower']['committed_capacity'] == 43 diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index f76e77862b..65b7607bb4 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -20,7 +20,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) - @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) @mock.patch('awx.main.scheduler.logger') def test_instance_does_not_exist(self, logger_mock, *args): @@ -36,7 +36,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []})) - @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(TaskManager, 'get_running_tasks') @mock.patch('awx.main.scheduler.logger') def test_save_failed(self, logger_mock, get_running_tasks, *args):