diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 74a1c78112..f8050da8a6 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3564,6 +3564,7 @@ class InstanceSerializer(BaseSerializer): class InstanceGroupSerializer(BaseSerializer): + consumed_capacity = serializers.SerializerMethodField() percent_capacity_remaining = serializers.SerializerMethodField() jobs_running = serializers.SerializerMethodField() instances = serializers.SerializerMethodField() @@ -3581,17 +3582,37 @@ class InstanceGroupSerializer(BaseSerializer): res['controller'] = self.reverse('api:instance_group_detail', kwargs={'pk': obj.controller_id}) return res + def get_jobs_qs(self): + # Store running jobs queryset in context, so it will be shared in ListView + if 'running_jobs' not in self.context: + self.context['running_jobs'] = UnifiedJob.objects.filter( + status__in=('running', 'waiting')) + return self.context['running_jobs'] + + def get_capacity_dict(self): + # Store capacity values (globally computed) in the context + if 'capacity_map' not in self.context: + ig_qs = None + if self.parent: # Is ListView: + ig_qs = self.parent.instance + self.context['capacity_map'] = InstanceGroup.objects.capacity_values( + qs=ig_qs, tasks=self.get_jobs_qs(), breakdown=True) + return self.context['capacity_map'] + def get_consumed_capacity(self, obj): - return obj.consumed_capacity + return self.get_capacity_dict()[obj.name]['consumed_capacity'] def get_percent_capacity_remaining(self, obj): - if not obj.capacity or obj.consumed_capacity == obj.capacity: + if not obj.capacity: return 0.0 else: - return float("{0:.2f}".format(((float(obj.capacity) - float(obj.consumed_capacity)) / (float(obj.capacity))) * 100)) + return float("{0:.2f}".format( + ((float(obj.capacity) - float(self.get_consumed_capacity(obj))) / (float(obj.capacity))) * 100) + ) def get_jobs_running(self, obj): - return UnifiedJob.objects.filter(instance_group=obj, status__in=('running', 'waiting',)).count() + jobs_qs = self.get_jobs_qs() + return sum(1 for job in jobs_qs if job.instance_group_id == obj.id) def get_instances(self, obj): return obj.instances.count() diff --git a/awx/main/access.py b/awx/main/access.py index 4a7ddc2c41..badd3f415f 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -377,9 +377,11 @@ class InstanceAccess(BaseAccess): def get_queryset(self): if self.user.is_superuser or self.user.is_system_auditor: - return Instance.objects.all().distinct() + qs = Instance.objects.all().distinct() else: - return Instance.objects.filter(rampart_groups__in=self.user.get_queryset(InstanceGroup)).distinct() + qs = Instance.objects.filter( + rampart_groups__in=self.user.get_queryset(InstanceGroup)).distinct() + return qs.prefetch_related('rampart_groups') def can_add(self, data): return False @@ -397,9 +399,11 @@ class InstanceGroupAccess(BaseAccess): def get_queryset(self): if self.user.is_superuser or self.user.is_system_auditor: - return InstanceGroup.objects.all() + qs = InstanceGroup.objects.all() else: - return InstanceGroup.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role')) + qs = InstanceGroup.objects.filter( + organization__in=Organization.accessible_pk_qs(self.user, 'admin_role')) + return qs.prefetch_related('instances') def can_add(self, data): return False diff --git a/awx/main/managers.py b/awx/main/managers.py index d1f351bb6c..155a46fa25 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -3,6 +3,7 @@ import sys from datetime import timedelta +import logging from django.db import models from django.utils.timezone import now @@ -11,7 +12,9 @@ from django.conf import settings from awx.main.utils.filters import SmartFilter -___all__ = ['HostManager', 'InstanceManager'] +___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager'] + +logger = logging.getLogger('awx.main.managers') class HostManager(models.Manager): @@ -48,6 +51,19 @@ class HostManager(models.Manager): return qs +def get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping): + # Create IG mapping by union of all groups their instances are members of + ig_ig_mapping = {} + for group_name in ig_instance_mapping.keys(): + ig_ig_set = set() + for instance_hostname in ig_instance_mapping[group_name]: + ig_ig_set |= instance_ig_mapping[instance_hostname] + else: + ig_ig_set.add(group_name) # Group contains no instances, return self + ig_ig_mapping[group_name] = ig_ig_set + return ig_ig_mapping + + class InstanceManager(models.Manager): """A custom manager class for the Instance model. @@ -79,3 +95,100 @@ class InstanceManager(models.Manager): def my_role(self): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing return "tower" + + +class InstanceGroupManager(models.Manager): + """A custom manager class for the Instance model. + + Used for global capacity calculations + """ + + def capacity_mapping(self, qs=None): + """ + Another entry-point to Instance manager method by same name + """ + if qs is None: + qs = self.all().prefetch_related('instances') + instance_ig_mapping = {} + ig_instance_mapping = {} + # Create dictionaries that represent basic m2m memberships + for group in qs: + ig_instance_mapping[group.name] = set( + instance.hostname for instance in group.instances.all() if + instance.capacity != 0 + ) + for inst in group.instances.all(): + if inst.capacity == 0: + continue + instance_ig_mapping.setdefault(inst.hostname, set()) + instance_ig_mapping[inst.hostname].add(group.name) + # Get IG capacity overlap mapping + ig_ig_mapping = get_ig_ig_mapping(ig_instance_mapping, instance_ig_mapping) + + return instance_ig_mapping, ig_ig_mapping + + @staticmethod + def zero_out_group(graph, name, breakdown): + if name not in graph: + graph[name] = {} + graph[name]['consumed_capacity'] = 0 + if breakdown: + graph[name]['committed_capacity'] = 0 + graph[name]['running_capacity'] = 0 + + def capacity_values(self, qs=None, tasks=None, breakdown=False, graph=None): + """ + Returns a dictionary of capacity values for all IGs + """ + if qs is None: # Optionally BYOQS - bring your own queryset + qs = self.all().prefetch_related('instances') + instance_ig_mapping, ig_ig_mapping = self.capacity_mapping(qs=qs) + + if tasks is None: + tasks = self.model.unifiedjob_set.related.related_model.objects.filter( + status__in=('running', 'waiting')) + + if graph is None: + graph = {group.name: {} for group in qs} + for group_name in graph: + self.zero_out_group(graph, group_name, breakdown) + for t in tasks: + # TODO: dock capacity for isolated job management tasks running in queue + impact = t.task_impact + if t.status == 'waiting' or not t.execution_node: + # Subtract capacity from any peer groups that share instances + if not t.instance_group: + logger.warning('Excluded %s from capacity algorithm ' + '(missing instance_group).', t.log_format) + impacted_groups = [] + elif t.instance_group.name not in ig_ig_mapping: + # Waiting job in group with 0 capacity has no collateral impact + impacted_groups = [t.instance_group.name] + else: + impacted_groups = ig_ig_mapping[t.instance_group.name] + for group_name in impacted_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) + graph[group_name]['consumed_capacity'] += impact + if breakdown: + graph[group_name]['committed_capacity'] += impact + elif t.status == 'running': + # Subtract capacity from all groups that contain the instance + if t.execution_node not in instance_ig_mapping: + logger.warning('Detected %s running inside lost instance, ' + 'may still be waiting for reaper.', t.log_format) + if t.instance_group: + impacted_groups = [t.instance_group.name] + else: + impacted_groups = [] + else: + impacted_groups = instance_ig_mapping[t.execution_node] + for group_name in impacted_groups: + if group_name not in graph: + self.zero_out_group(graph, group_name, breakdown) + graph[group_name]['consumed_capacity'] += impact + if breakdown: + graph[group_name]['running_capacity'] += impact + else: + logger.error('Programming error, %s not in ["running", "waiting"]', t.log_format) + return graph diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 509e37b4ac..f2e57f7a07 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -11,7 +11,7 @@ from django.utils.timezone import now, timedelta from solo.models import SingletonModel from awx.api.versioning import reverse -from awx.main.managers import InstanceManager +from awx.main.managers import InstanceManager, InstanceGroupManager from awx.main.models.inventory import InventoryUpdate from awx.main.models.jobs import Job from awx.main.models.projects import ProjectUpdate @@ -66,6 +66,8 @@ class Instance(models.Model): class InstanceGroup(models.Model): """A model representing a Queue/Group of AWX Instances.""" + objects = InstanceGroupManager() + name = models.CharField(max_length=250, unique=True) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) @@ -89,12 +91,7 @@ class InstanceGroup(models.Model): @property def capacity(self): - return sum([x[0] for x in self.instances.values_list('capacity')]) - - @property - def consumed_capacity(self): - return sum(x.task_impact for x in UnifiedJob.objects.filter(instance_group=self, - status__in=('running', 'waiting'))) + return sum([inst.capacity for inst in self.instances.all()]) class Meta: app_label = 'main' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 2337170091..a6f477539c 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -38,10 +38,10 @@ class TaskManager(): def __init__(self): self.graph = dict() - for rampart_group in InstanceGroup.objects.all(): + for rampart_group in InstanceGroup.objects.prefetch_related('instances'): self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), capacity_total=rampart_group.capacity, - capacity_used=0) + consumed_capacity=0) def is_job_blocked(self, task): # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph @@ -403,15 +403,21 @@ class TaskManager(): preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False for rampart_group in preferred_instance_groups: - if self.get_remaining_capacity(rampart_group.name) <= 0: - logger.debug("Skipping group %s capacity <= 0", rampart_group.name) + remaining_capacity = self.get_remaining_capacity(rampart_group.name) + if remaining_capacity <= 0: + logger.debug("Skipping group %s, remaining_capacity %s <= 0", + rampart_group.name, remaining_capacity) continue if not self.would_exceed_capacity(task, rampart_group.name): - logger.debug("Starting %s in group %s", task.log_format, rampart_group.name) + logger.debug("Starting %s in group %s (remaining_capacity=%s)", + task.log_format, rampart_group.name, remaining_capacity) self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain()) found_acceptable_queue = True break + else: + logger.debug("Not enough capacity to run %s on %s (remaining_capacity=%s)", + task.log_format, rampart_group.name, remaining_capacity) if not found_acceptable_queue: logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format) @@ -486,32 +492,29 @@ class TaskManager(): self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time) - def calculate_capacity_used(self, tasks): - for rampart_group in self.graph: - self.graph[rampart_group]['capacity_used'] = 0 - for t in tasks: - # TODO: dock capacity for isolated job management tasks running in queue - for group_actual in InstanceGroup.objects.filter(instances__hostname=t.execution_node).values_list('name'): - if group_actual[0] in self.graph: - self.graph[group_actual[0]]['capacity_used'] += t.task_impact + def calculate_capacity_consumed(self, tasks): + self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) def would_exceed_capacity(self, task, instance_group): - current_capacity = self.graph[instance_group]['capacity_used'] + current_capacity = self.graph[instance_group]['consumed_capacity'] capacity_total = self.graph[instance_group]['capacity_total'] if current_capacity == 0: return False return (task.task_impact + current_capacity > capacity_total) def consume_capacity(self, task, instance_group): - self.graph[instance_group]['capacity_used'] += task.task_impact + logger.debug('%s consumed %s capacity units from %s with prior total of %s', + task.log_format, task.task_impact, instance_group, + self.graph[instance_group]['consumed_capacity']) + self.graph[instance_group]['consumed_capacity'] += task.task_impact def get_remaining_capacity(self, instance_group): - return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['capacity_used']) + return (self.graph[instance_group]['capacity_total'] - self.graph[instance_group]['consumed_capacity']) def process_tasks(self, all_sorted_tasks): running_tasks = filter(lambda t: t.status in ['waiting', 'running'], all_sorted_tasks) - self.calculate_capacity_used(running_tasks) + self.calculate_capacity_consumed(running_tasks) self.process_running_tasks(running_tasks) @@ -540,12 +543,13 @@ class TaskManager(): return finished_wfjs def schedule(self): - logger.debug("Starting Schedule") with transaction.atomic(): # Lock with advisory_lock('task_manager_lock', wait=False) as acquired: if acquired is False: + logger.debug("Not running scheduler, another task holds lock") return + logger.debug("Starting Scheduler") self.cleanup_inconsistent_celery_tasks() finished_wfjs = self._schedule() diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 48b80cdbb6..ae97f367be 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -3,7 +3,7 @@ import logging # Celery -from celery import task +from celery import Task, task # AWX from awx.main.scheduler import TaskManager @@ -15,6 +15,12 @@ logger = logging.getLogger('awx.main.scheduler') # updated model, the call to schedule() may get stale data. +class LogErrorsTask(Task): + def on_failure(self, exc, task_id, args, kwargs, einfo): + logger.exception('Task {} encountered exception.'.format(self.name), exc_info=exc) + super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo) + + @task def run_job_launch(job_id): TaskManager().schedule() @@ -25,7 +31,7 @@ def run_job_complete(job_id): TaskManager().schedule() -@task +@task(base=LogErrorsTask) def run_task_manager(): logger.debug("Running Tower task manager.") TaskManager().schedule() diff --git a/awx/main/tests/functional/task_management/test_capacity.py b/awx/main/tests/functional/task_management/test_capacity.py new file mode 100644 index 0000000000..7b7b7d7dc0 --- /dev/null +++ b/awx/main/tests/functional/task_management/test_capacity.py @@ -0,0 +1,34 @@ +import pytest + +from django.test import TransactionTestCase + +from awx.main.models import ( + Instance, + InstanceGroup, +) + + +@pytest.mark.django_db +class TestCapacityMapping(TransactionTestCase): + + def sample_cluster(self): + ig_small = InstanceGroup.objects.create(name='ig_small') + ig_large = InstanceGroup.objects.create(name='ig_large') + tower = InstanceGroup.objects.create(name='tower') + i1 = Instance.objects.create(hostname='i1', capacity=200) + i2 = Instance.objects.create(hostname='i2', capacity=200) + i3 = Instance.objects.create(hostname='i3', capacity=200) + ig_small.instances.add(i1) + ig_large.instances.add(i2, i3) + tower.instances.add(i2) + return [tower, ig_large, ig_small] + + def test_mapping(self): + self.sample_cluster() + with self.assertNumQueries(2): + inst_map, ig_map = InstanceGroup.objects.capacity_mapping() + assert inst_map['i1'] == set(['ig_small']) + assert inst_map['i2'] == set(['ig_large', 'tower']) + assert ig_map['ig_small'] == set(['ig_small']) + assert ig_map['ig_large'] == set(['ig_large', 'tower']) + assert ig_map['tower'] == set(['ig_large', 'tower']) diff --git a/awx/main/tests/unit/test_capacity.py b/awx/main/tests/unit/test_capacity.py new file mode 100644 index 0000000000..7817521f2e --- /dev/null +++ b/awx/main/tests/unit/test_capacity.py @@ -0,0 +1,135 @@ +import pytest + +from awx.main.models import InstanceGroup + + +class FakeObject(object): + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + +class Job(FakeObject): + task_impact = 43 + + def log_format(self): + return 'job 382 (fake)' + + +@pytest.fixture +def sample_cluster(): + def stand_up_cluster(): + + class Instances(FakeObject): + def add(self, *args): + for instance in args: + self.obj.instance_list.append(instance) + + def all(self): + return self.obj.instance_list + + class InstanceGroup(FakeObject): + + def __init__(self, **kwargs): + super(InstanceGroup, self).__init__(**kwargs) + self.instance_list = [] + + @property + def instances(self): + mgr = Instances(obj=self) + return mgr + + + class Instance(FakeObject): + pass + + + ig_small = InstanceGroup(name='ig_small') + ig_large = InstanceGroup(name='ig_large') + tower = InstanceGroup(name='tower') + i1 = Instance(hostname='i1', capacity=200) + i2 = Instance(hostname='i2', capacity=200) + i3 = Instance(hostname='i3', capacity=200) + ig_small.instances.add(i1) + ig_large.instances.add(i2, i3) + tower.instances.add(i2) + return [tower, ig_large, ig_small] + return stand_up_cluster + + +def test_committed_capacity(sample_cluster): + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='waiting', instance_group=tower), + Job(status='waiting', instance_group=ig_large), + Job(status='waiting', instance_group=ig_small) + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True + ) + # Jobs submitted to either tower or ig_larg must count toward both + assert capacities['tower']['committed_capacity'] == 43 * 2 + assert capacities['ig_large']['committed_capacity'] == 43 * 2 + assert capacities['ig_small']['committed_capacity'] == 43 + + +def test_running_capacity(sample_cluster): + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='running', execution_node='i1'), + Job(status='running', execution_node='i2'), + Job(status='running', execution_node='i3') + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True + ) + # Tower is only given 1 instance + assert capacities['tower']['running_capacity'] == 43 + # Large IG has 2 instances + assert capacities['ig_large']['running_capacity'] == 43 * 2 + assert capacities['ig_small']['running_capacity'] == 43 + + +def test_offline_node_running(sample_cluster): + """ + Assure that algorithm doesn't explode if a job is marked running + in an offline node + """ + tower, ig_large, ig_small = sample_cluster() + ig_small.instance_list[0].capacity = 0 + tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks) + assert capacities['ig_small']['consumed_capacity'] == 43 + + +def test_offline_node_waiting(sample_cluster): + """ + Same but for a waiting job + """ + tower, ig_large, ig_small = sample_cluster() + ig_small.instance_list[0].capacity = 0 + tasks = [Job(status='waiting', instance_group=ig_small)] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower, ig_large, ig_small], tasks=tasks) + assert capacities['ig_small']['consumed_capacity'] == 43 + + +def test_RBAC_reduced_filter(sample_cluster): + """ + User can see jobs that are running in `ig_small` and `ig_large` IGs, + but user does not have permission to see those actual instance groups. + Verify that this does not blow everything up. + """ + tower, ig_large, ig_small = sample_cluster() + tasks = [ + Job(status='waiting', instance_group=tower), + Job(status='waiting', instance_group=ig_large), + Job(status='waiting', instance_group=ig_small) + ] + capacities = InstanceGroup.objects.capacity_values( + qs=[tower], tasks=tasks, breakdown=True + ) + # Cross-links between groups not visible to current user, + # so a naieve accounting of capacities is returned instead + assert capacities['tower']['committed_capacity'] == 43 diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index f76e77862b..65b7607bb4 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -20,7 +20,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {})) @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) - @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist) @mock.patch('awx.main.scheduler.logger') def test_instance_does_not_exist(self, logger_mock, *args): @@ -36,7 +36,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(cache, 'get', return_value=None) @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []})) - @mock.patch.object(InstanceGroup.objects, 'all', return_value=[]) + @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(TaskManager, 'get_running_tasks') @mock.patch('awx.main.scheduler.logger') def test_save_failed(self, logger_mock, get_running_tasks, *args):