From cb63d92bbf5d8e10834264f0eb8142c4eb5c9161 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 22 Apr 2022 13:41:32 -0400 Subject: [PATCH] Remove committed_capacity field, delete supporting code (#12086) * Remove committed_capacity field, delete supporting code * Track consumed capacity to solve the negatives problem * Use more verbose name for IG queryset --- awx/api/serializers.py | 32 +++-- awx/api/views/mixin.py | 12 +- awx/main/managers.py | 119 +----------------- awx/main/models/ha.py | 4 +- awx/main/scheduler/task_manager.py | 6 +- awx/main/scheduler/task_manager_models.py | 30 ++++- .../task_management/test_capacity.py | 19 +-- awx/main/tests/unit/test_capacity.py | 69 +++++----- 8 files changed, 100 insertions(+), 191 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index ca2908daaa..2424b461b9 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -113,6 +113,7 @@ from awx.main.utils import ( ) from awx.main.utils.filters import SmartFilter from awx.main.utils.named_url_graph import reset_counters +from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances from awx.main.redact import UriCleaner, REPLACE_STR from awx.main.validators import vars_validate_or_raise @@ -4873,7 +4874,6 @@ class InstanceGroupSerializer(BaseSerializer): show_capabilities = ['edit', 'delete'] - committed_capacity = serializers.SerializerMethodField(help_text=_('This resource has been deprecated and will be removed in a future release')) consumed_capacity = serializers.SerializerMethodField() percent_capacity_remaining = serializers.SerializerMethodField() jobs_running = serializers.IntegerField( @@ -4922,7 +4922,6 @@ class InstanceGroupSerializer(BaseSerializer): "created", "modified", "capacity", - "committed_capacity", "consumed_capacity", "percent_capacity_remaining", "jobs_running", @@ -5003,30 +5002,29 @@ class InstanceGroupSerializer(BaseSerializer): return attrs - def get_capacity_dict(self): + def get_ig_mgr(self): # Store capacity values (globally computed) in the context - if 'capacity_map' not in self.context: - ig_qs = None + if 'task_manager_igs' not in self.context: + instance_groups_queryset = None jobs_qs = UnifiedJob.objects.filter(status__in=('running', 'waiting')) if self.parent: # Is ListView: - ig_qs = self.parent.instance - self.context['capacity_map'] = InstanceGroup.objects.capacity_values(qs=ig_qs, tasks=jobs_qs, breakdown=True) - return self.context['capacity_map'] + instance_groups_queryset = self.parent.instance + + instances = TaskManagerInstances(jobs_qs) + instance_groups = TaskManagerInstanceGroups(instances_by_hostname=instances, instance_groups_queryset=instance_groups_queryset) + + self.context['task_manager_igs'] = instance_groups + return self.context['task_manager_igs'] def get_consumed_capacity(self, obj): - return self.get_capacity_dict()[obj.name]['running_capacity'] - - def get_committed_capacity(self, obj): - return self.get_capacity_dict()[obj.name]['committed_capacity'] + ig_mgr = self.get_ig_mgr() + return ig_mgr.get_consumed_capacity(obj.name) def get_percent_capacity_remaining(self, obj): if not obj.capacity: return 0.0 - consumed = self.get_consumed_capacity(obj) - if consumed >= obj.capacity: - return 0.0 - else: - return float("{0:.2f}".format(((float(obj.capacity) - float(consumed)) / (float(obj.capacity))) * 100)) + ig_mgr = self.get_ig_mgr() + return float("{0:.2f}".format((float(ig_mgr.get_remaining_capacity(obj.name)) / (float(obj.capacity))) * 100)) def get_instances(self, obj): return obj.instances.count() diff --git a/awx/api/views/mixin.py b/awx/api/views/mixin.py index 9ad757c406..a0a679d4e5 100644 --- a/awx/api/views/mixin.py +++ b/awx/api/views/mixin.py @@ -77,13 +77,13 @@ class InstanceGroupMembershipMixin(object): else: inst_name = get_object_or_400(self.model, pk=sub_id).hostname with transaction.atomic(): - ig_qs = InstanceGroup.objects.select_for_update() + instance_groups_queryset = InstanceGroup.objects.select_for_update() if self.parent_model is Instance: - ig_obj = get_object_or_400(ig_qs, pk=sub_id) + ig_obj = get_object_or_400(instance_groups_queryset, pk=sub_id) else: # similar to get_parent_object, but selected for update parent_filter = {self.lookup_field: self.kwargs.get(self.lookup_field, None)} - ig_obj = get_object_or_404(ig_qs, **parent_filter) + ig_obj = get_object_or_404(instance_groups_queryset, **parent_filter) if inst_name not in ig_obj.policy_instance_list: ig_obj.policy_instance_list.append(inst_name) ig_obj.save(update_fields=['policy_instance_list']) @@ -98,13 +98,13 @@ class InstanceGroupMembershipMixin(object): else: inst_name = get_object_or_400(self.model, pk=sub_id).hostname with transaction.atomic(): - ig_qs = InstanceGroup.objects.select_for_update() + instance_groups_queryset = InstanceGroup.objects.select_for_update() if self.parent_model is Instance: - ig_obj = get_object_or_400(ig_qs, pk=sub_id) + ig_obj = get_object_or_400(instance_groups_queryset, pk=sub_id) else: # similar to get_parent_object, but selected for update parent_filter = {self.lookup_field: self.kwargs.get(self.lookup_field, None)} - ig_obj = get_object_or_404(ig_qs, **parent_filter) + ig_obj = get_object_or_404(instance_groups_queryset, **parent_filter) if inst_name in ig_obj.policy_instance_list: ig_obj.policy_instance_list.pop(ig_obj.policy_instance_list.index(inst_name)) ig_obj.save(update_fields=['policy_instance_list']) diff --git a/awx/main/managers.py b/awx/main/managers.py index b87d80954a..23acd15139 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -7,10 +7,9 @@ from django.conf import settings from django.db.models.functions import Lower from awx.main.utils.filters import SmartFilter from awx.main.utils.pglock import advisory_lock -from awx.main.utils.common import get_capacity_type from awx.main.constants import RECEPTOR_PENDING -___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager', 'DeferJobCreatedManager', 'UUID_DEFAULT'] +___all__ = ['HostManager', 'InstanceManager', 'DeferJobCreatedManager', 'UUID_DEFAULT'] logger = logging.getLogger('awx.main.managers') UUID_DEFAULT = '00000000-0000-0000-0000-000000000000' @@ -162,119 +161,3 @@ class InstanceManager(models.Manager): create_defaults['version'] = RECEPTOR_PENDING instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option) return (True, instance) - - -class InstanceGroupManager(models.Manager): - """A custom manager class for the Instance model. - - Used for global capacity calculations - """ - - def capacity_mapping(self, qs=None): - """ - Another entry-point to Instance manager method by same name - """ - if qs is None: - qs = self.all().prefetch_related('instances') - instance_ig_mapping = {} - ig_instance_mapping = {} - # Create dictionaries that represent basic m2m memberships - for group in qs: - ig_instance_mapping[group.name] = set(instance.hostname for instance in group.instances.all() if instance.capacity != 0) - for inst in group.instances.all(): - if inst.capacity == 0: - continue - instance_ig_mapping.setdefault(inst.hostname, set()) - instance_ig_mapping[inst.hostname].add(group.name) - # Get IG capacity overlap mapping - ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping) - - return instance_ig_mapping, ig_ig_mapping - - @staticmethod - def zero_out_group(graph, name, breakdown): - if name not in graph: - graph[name] = {} - graph[name]['consumed_capacity'] = 0 - for capacity_type in ('execution', 'control'): - graph[name][f'consumed_{capacity_type}_capacity'] = 0 - if breakdown: - graph[name]['committed_capacity'] = 0 - graph[name]['running_capacity'] = 0 - - def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None): - """ - Returns a dictionary of capacity values for all IGs - """ - if qs is None: # Optionally BYOQS - bring your own queryset - qs = self.all().prefetch_related('instances') - instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs) - - if tasks is None: - tasks = self.model.unifiedjob_set.related.related_model.objects.filter(status__in=('running', 'waiting')) - - if graph is None: - graph = {group.name: {} for group in qs} - for group_name in graph: - self.zero_out_group(graph, group_name, breakdown) - for t in tasks: - # TODO: dock capacity for isolated job management tasks running in queue - impact = t.task_impact - control_groups = [] - if t.controller_node: - control_groups = instance_ig_mapping.get(t.controller_node, []) - if not control_groups: - logger.warning(f"No instance group found for {t.controller_node}, capacity consumed may be innaccurate.") - - if t.status == 'waiting' or (not t.execution_node and not t.is_container_group_task): - # Subtract capacity from any peer groups that share instances - if not t.instance_group: - impacted_groups = [] - elif t.instance_group.name not in ig_ig_mapping: - # Waiting job in group with 0 capacity has no collateral impact - impacted_groups = [t.instance_group.name] - else: - impacted_groups = ig_ig_mapping[t.instance_group.name] - for group_name in impacted_groups: - if group_name not in graph: - self.zero_out_group(graph, group_name, breakdown) - graph[group_name]['consumed_capacity'] += impact - capacity_type = get_capacity_type(t) - graph[group_name][f'consumed_{capacity_type}_capacity'] += impact - if breakdown: - graph[group_name]['committed_capacity'] += impact - for group_name in control_groups: - if group_name not in graph: - self.zero_out_group(graph, group_name, breakdown) - graph[group_name][f'consumed_control_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT - if breakdown: - graph[group_name]['committed_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT - elif t.status == 'running': - # Subtract capacity from all groups that contain the instance - if t.execution_node not in instance_ig_mapping: - if not t.is_container_group_task: - logger.warning('Detected %s running inside lost instance, ' 'may still be waiting for reaper.', t.log_format) - if t.instance_group: - impacted_groups = [t.instance_group.name] - else: - impacted_groups = [] - else: - impacted_groups = instance_ig_mapping[t.execution_node] - - for group_name in impacted_groups: - if group_name not in graph: - self.zero_out_group(graph, group_name, breakdown) - graph[group_name]['consumed_capacity'] += impact - capacity_type = get_capacity_type(t) - graph[group_name][f'consumed_{capacity_type}_capacity'] += impact - if breakdown: - graph[group_name]['running_capacity'] += impact - for group_name in control_groups: - if group_name not in graph: - self.zero_out_group(graph, group_name, breakdown) - graph[group_name][f'consumed_control_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT - if breakdown: - graph[group_name]['running_capacity'] += settings.AWX_CONTROL_NODE_TASK_IMPACT - else: - logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format) - return graph diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index a7d6e51a82..782ca59344 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -19,7 +19,7 @@ from solo.models import SingletonModel from awx import __version__ as awx_application_version from awx.api.versioning import reverse from awx.main.fields import JSONBlob -from awx.main.managers import InstanceManager, InstanceGroupManager, UUID_DEFAULT +from awx.main.managers import InstanceManager, UUID_DEFAULT from awx.main.constants import JOB_FOLDER_PREFIX from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search from awx.main.models.unified_jobs import UnifiedJob @@ -300,8 +300,6 @@ class Instance(HasPolicyEditsMixin, BaseModel): class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): """A model representing a Queue/Group of AWX Instances.""" - objects = InstanceGroupManager() - name = models.CharField(max_length=250, unique=True) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index d5c31194f2..05520f50d6 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -464,7 +464,7 @@ class TaskManager: # All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups if task.capacity_type == 'control': task.execution_node = control_instance.hostname - control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact) + control_instance.consume_capacity(control_impact) self.dependency_graph.add_job(task) execution_instance = self.instances[control_instance.hostname].obj task.log_lifecycle("controller_node_chosen") @@ -497,9 +497,9 @@ class TaskManager: control_instance = execution_instance task.controller_node = execution_instance.hostname - control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - settings.AWX_CONTROL_NODE_TASK_IMPACT) + control_instance.consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) task.log_lifecycle("controller_node_chosen") - execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact) + execution_instance.consume_capacity(task.task_impact) task.log_lifecycle("execution_node_chosen") logger.debug( "Starting {} in group {} instance {} (remaining_capacity={})".format( diff --git a/awx/main/scheduler/task_manager_models.py b/awx/main/scheduler/task_manager_models.py index 556cc94f64..678e545152 100644 --- a/awx/main/scheduler/task_manager_models.py +++ b/awx/main/scheduler/task_manager_models.py @@ -18,10 +18,20 @@ class TaskManagerInstance: def __init__(self, obj): self.obj = obj self.node_type = obj.node_type - self.remaining_capacity = obj.capacity + self.consumed_capacity = 0 self.capacity = obj.capacity self.hostname = obj.hostname + def consume_capacity(self, impact): + self.consumed_capacity += impact + + @property + def remaining_capacity(self): + remaining = self.capacity - self.consumed_capacity + if remaining < 0: + return 0 + return remaining + class TaskManagerInstances: def __init__(self, active_tasks, instances=None): @@ -40,9 +50,9 @@ class TaskManagerInstances: control_instance = self.instances_by_hostname.get(task.controller_node, '') execution_instance = self.instances_by_hostname.get(task.execution_node, '') if execution_instance and execution_instance.node_type in ('hybrid', 'execution'): - self.instances_by_hostname[task.execution_node].remaining_capacity -= task.task_impact + self.instances_by_hostname[task.execution_node].consume_capacity(task.task_impact) if control_instance and control_instance.node_type in ('hybrid', 'control'): - self.instances_by_hostname[task.controller_node].remaining_capacity -= settings.AWX_CONTROL_NODE_TASK_IMPACT + self.instances_by_hostname[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT) def __getitem__(self, hostname): return self.instances_by_hostname.get(hostname) @@ -54,14 +64,16 @@ class TaskManagerInstances: class TaskManagerInstanceGroups: """A class representing minimal data the task manager needs to represent an InstanceGroup.""" - def __init__(self, instances_by_hostname=None, instance_groups=None): + def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None): self.instance_groups = dict() self.controlplane_ig = None if instance_groups is not None: # for testing self.instance_groups = instance_groups else: - for instance_group in InstanceGroup.objects.prefetch_related('instances').only('name', 'instances'): + if instance_groups_queryset is None: + instance_groups_queryset = InstanceGroup.objects.prefetch_related('instances').only('name', 'instances') + for instance_group in instance_groups_queryset: if instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: self.controlplane_ig = instance_group self.instance_groups[instance_group.name] = dict( @@ -70,6 +82,14 @@ class TaskManagerInstanceGroups: ], ) + def get_remaining_capacity(self, group_name): + instances = self.instance_groups[group_name]['instances'] + return sum(inst.remaining_capacity for inst in instances) + + def get_consumed_capacity(self, group_name): + instances = self.instance_groups[group_name]['instances'] + return sum(inst.consumed_capacity for inst in instances) + def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False): impact = impact if impact else task.task_impact capacity_type = capacity_type if capacity_type else task.capacity_type diff --git a/awx/main/tests/functional/task_management/test_capacity.py b/awx/main/tests/functional/task_management/test_capacity.py index d50833fd8e..28e7e8c655 100644 --- a/awx/main/tests/functional/task_management/test_capacity.py +++ b/awx/main/tests/functional/task_management/test_capacity.py @@ -4,9 +4,10 @@ from awx.main.models import ( Instance, InstanceGroup, ) +from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances -class TestCapacityMapping(TransactionTestCase): +class TestInstanceGroupInstanceMapping(TransactionTestCase): def sample_cluster(self): ig_small = InstanceGroup.objects.create(name='ig_small') ig_large = InstanceGroup.objects.create(name='ig_large') @@ -21,10 +22,12 @@ class TestCapacityMapping(TransactionTestCase): def test_mapping(self): self.sample_cluster() - with self.assertNumQueries(2): - inst_map, ig_map = InstanceGroup.objects.capacity_mapping() - assert inst_map['i1'] == set(['ig_small']) - assert inst_map['i2'] == set(['ig_large', 'default']) - assert ig_map['ig_small'] == set(['ig_small']) - assert ig_map['ig_large'] == set(['ig_large', 'default']) - assert ig_map['default'] == set(['ig_large', 'default']) + with self.assertNumQueries(3): + instances = TaskManagerInstances([]) # empty task list + instance_groups = TaskManagerInstanceGroups(instances_by_hostname=instances) + + ig_instance_map = instance_groups.instance_groups + + assert set(i.hostname for i in ig_instance_map['ig_small']['instances']) == set(['i1']) + assert set(i.hostname for i in ig_instance_map['ig_large']['instances']) == set(['i2', 'i3']) + assert set(i.hostname for i in ig_instance_map['default']['instances']) == set(['i2']) diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py index 2b7cc2399b..740cbc193e 100644 --- a/awx/main/tests/unit/test_capacity.py +++ b/awx/main/tests/unit/test_capacity.py @@ -1,6 +1,6 @@ import pytest -from awx.main.models import InstanceGroup +from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances class FakeMeta(object): @@ -52,9 +52,9 @@ def sample_cluster(): ig_small = InstanceGroup(name='ig_small') ig_large = InstanceGroup(name='ig_large') default = InstanceGroup(name='default') - i1 = Instance(hostname='i1', capacity=200) - i2 = Instance(hostname='i2', capacity=200) - i3 = Instance(hostname='i3', capacity=200) + i1 = Instance(hostname='i1', capacity=200, node_type='hybrid') + i2 = Instance(hostname='i2', capacity=200, node_type='hybrid') + i3 = Instance(hostname='i3', capacity=200, node_type='hybrid') ig_small.instances.add(i1) ig_large.instances.add(i2, i3) default.instances.add(i2) @@ -63,59 +63,66 @@ def sample_cluster(): return stand_up_cluster -def test_committed_capacity(sample_cluster): - default, ig_large, ig_small = sample_cluster() - tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True) - # Jobs submitted to either tower or ig_larg must count toward both - assert capacities['default']['committed_capacity'] == 43 * 2 - assert capacities['ig_large']['committed_capacity'] == 43 * 2 - assert capacities['ig_small']['committed_capacity'] == 43 +@pytest.fixture +def create_ig_manager(): + def _rf(ig_list, tasks): + instances = TaskManagerInstances(tasks, instances=set(inst for ig in ig_list for inst in ig.instance_list)) + + seed_igs = {} + for ig in ig_list: + seed_igs[ig.name] = {'instances': [instances[inst.hostname] for inst in ig.instance_list]} + + instance_groups = TaskManagerInstanceGroups(instance_groups=seed_igs) + return instance_groups + + return _rf -def test_running_capacity(sample_cluster): +@pytest.mark.parametrize('ig_name,consumed_capacity', [('default', 43), ('ig_large', 43 * 2), ('ig_small', 43)]) +def test_running_capacity(sample_cluster, ig_name, consumed_capacity, create_ig_manager): default, ig_large, ig_small = sample_cluster() + ig_list = [default, ig_large, ig_small] tasks = [Job(status='running', execution_node='i1'), Job(status='running', execution_node='i2'), Job(status='running', execution_node='i3')] - capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True) - # Tower is only given 1 instance - assert capacities['default']['running_capacity'] == 43 - # Large IG has 2 instances - assert capacities['ig_large']['running_capacity'] == 43 * 2 - assert capacities['ig_small']['running_capacity'] == 43 + + instance_groups_mgr = create_ig_manager(ig_list, tasks) + + assert instance_groups_mgr.get_consumed_capacity(ig_name) == consumed_capacity -def test_offline_node_running(sample_cluster): +def test_offline_node_running(sample_cluster, create_ig_manager): """ Assure that algorithm doesn't explode if a job is marked running in an offline node """ default, ig_large, ig_small = sample_cluster() ig_small.instance_list[0].capacity = 0 - tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks) - assert capacities['ig_small']['consumed_execution_capacity'] == 43 + tasks = [Job(status='running', execution_node='i1')] + instance_groups_mgr = create_ig_manager([default, ig_large, ig_small], tasks) + assert instance_groups_mgr.get_consumed_capacity('ig_small') == 43 + assert instance_groups_mgr.get_remaining_capacity('ig_small') == 0 -def test_offline_node_waiting(sample_cluster): +def test_offline_node_waiting(sample_cluster, create_ig_manager): """ Same but for a waiting job """ default, ig_large, ig_small = sample_cluster() ig_small.instance_list[0].capacity = 0 - tasks = [Job(status='waiting', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks) - assert capacities['ig_small']['consumed_execution_capacity'] == 43 + tasks = [Job(status='waiting', execution_node='i1')] + instance_groups_mgr = create_ig_manager([default, ig_large, ig_small], tasks) + assert instance_groups_mgr.get_consumed_capacity('ig_small') == 43 + assert instance_groups_mgr.get_remaining_capacity('ig_small') == 0 -def test_RBAC_reduced_filter(sample_cluster): +def test_RBAC_reduced_filter(sample_cluster, create_ig_manager): """ User can see jobs that are running in `ig_small` and `ig_large` IGs, but user does not have permission to see those actual instance groups. Verify that this does not blow everything up. """ default, ig_large, ig_small = sample_cluster() - tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)] - capacities = InstanceGroup.objects.capacity_values(qs=[default], tasks=tasks, breakdown=True) + tasks = [Job(status='waiting', execution_node='i1'), Job(status='waiting', execution_node='i2'), Job(status='waiting', execution_node='i3')] + instance_groups_mgr = create_ig_manager([default], tasks) # Cross-links between groups not visible to current user, # so a naieve accounting of capacities is returned instead - assert capacities['default']['committed_capacity'] == 43 + assert instance_groups_mgr.get_consumed_capacity('default') == 43