From b94cf379f61f781ac496a4484cf5f59ae53c4615 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 1 Jun 2018 15:36:24 -0400 Subject: [PATCH] do not choose offline instances --- awx/main/models/ha.py | 13 +++- awx/main/models/unified_jobs.py | 4 +- awx/main/scheduler/task_manager.py | 4 +- awx/main/tests/unit/models/test_ha.py | 85 +++++++++++++++++++++++++++ awx/main/tests/unit/test_tasks.py | 4 +- 5 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 awx/main/tests/unit/models/test_ha.py diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index f158582871..16589c0a77 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -1,6 +1,8 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +import six +import random from decimal import Decimal from django.core.exceptions import ValidationError @@ -11,7 +13,6 @@ from django.utils.translation import ugettext_lazy as _ from django.conf import settings from django.utils.timezone import now, timedelta -import six from solo.models import SingletonModel from awx import __version__ as awx_application_version @@ -193,7 +194,7 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): def fit_task_to_most_remaining_capacity_instance(self, task): instance_most_capacity = None - for i in self.instances.order_by('hostname'): + for i in self.instances.filter(capacity__gt=0).order_by('hostname'): if i.remaining_capacity >= task.task_impact and \ (instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity): @@ -202,7 +203,7 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): def find_largest_idle_instance(self): largest_instance = None - for i in self.instances.order_by('hostname'): + for i in self.instances.filter(capacity__gt=0).order_by('hostname'): if i.jobs_running == 0: if largest_instance is None: largest_instance = i @@ -210,6 +211,12 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): largest_instance = i return largest_instance + def choose_online_controller_node(self): + return random.choice(list(self.controller + .instances + .filter(capacity__gt=0) + .values_list('hostname', flat=True))) + class TowerScheduleState(SingletonModel): schedule_last_run = models.DateTimeField(auto_now_add=True) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 62c5fc4e42..1be11ebcca 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1402,5 +1402,5 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def get_celery_queue_name(self): return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE - def get_isolated_execution_node_name(self): - return self.execution_node if self.controller_node else None + def is_isolated(self): + return bool(self.controller_node) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 9ad7a79652..198f0e3652 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -270,17 +270,15 @@ class TaskManager(): logger.info(six.text_type('Submitting isolated {} to queue {}.').format( task.log_format, task.instance_group.name, task.execution_node)) elif task.supports_isolation() and rampart_group.controller_id: - # TODO: Select from only online nodes in the controller node task.instance_group = rampart_group task.execution_node = instance.hostname - task.controller_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True))) + task.controller_node = rampart_group.choose_online_controller_node() logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format( task.log_format, task.execution_node, task.controller_node)) else: task.instance_group = rampart_group if instance is not None: task.execution_node = instance.hostname - logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format)) logger.info(six.text_type('Submitting {} to <{},{}>.').format( task.log_format, task.instance_group_id, task.execution_node)) with disable_activity_stream(): diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py new file mode 100644 index 0000000000..4ceb83c77a --- /dev/null +++ b/awx/main/tests/unit/models/test_ha.py @@ -0,0 +1,85 @@ +import pytest +import mock +from mock import Mock + +from awx.main.models import ( + Job, + InstanceGroup, +) + + +def T(impact): + j = mock.Mock(Job()) + j.task_impact = impact + return j + + +def Is(param): + ''' + param: + [remaining_capacity1, remaining_capacity2, remaining_capacity3, ...] + [(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...] + ''' + + instances = [] + if isinstance(param[0], tuple): + for (jobs_running, capacity) in param: + inst = Mock() + inst.capacity = capacity + inst.jobs_running = jobs_running + instances.append(inst) + else: + for i in param: + inst = Mock() + inst.remaining_capacity = i + instances.append(inst) + return instances + + +class TestInstanceGroup(object): + @pytest.mark.parametrize('task,instances,instance_fit_index,reason', [ + (T(100), Is([100]), 0, "Only one, pick it"), + (T(100), Is([100, 100]), 0, "Two equally good fits, pick the first"), + (T(100), Is([50, 100]), 1, "First instance not as good as second instance"), + (T(100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."), + (T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"), + ]) + def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): + with mock.patch.object(InstanceGroup, + 'instances', + Mock(spec_set=['filter'], + filter=lambda *args, **kargs: Mock(spec_set=['order_by'], + order_by=lambda x: instances))): + ig = InstanceGroup(id=10) + + if instance_fit_index is None: + assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason + else: + assert ig.fit_task_to_most_remaining_capacity_instance(task) == \ + instances[instance_fit_index], reason + + + @pytest.mark.parametrize('instances,instance_fit_index,reason', [ + (Is([(0, 100)]), 0, "One idle instance, pick it"), + (Is([(1, 100)]), None, "One un-idle instance, pick nothing"), + (Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"), + (Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"), + (Is([(0, 0)]), None, "One idle but down instance, don't pick it"), + ]) + def test_find_largest_idle_instance(self, instances, instance_fit_index, reason): + def filter_offline_instances(*args): + return filter(lambda i: i.capacity > 0, instances) + + with mock.patch.object(InstanceGroup, + 'instances', + Mock(spec_set=['filter'], + filter=lambda *args, **kargs: Mock(spec_set=['order_by'], + order_by=filter_offline_instances))): + ig = InstanceGroup(id=10) + + if instance_fit_index is None: + assert ig.find_largest_idle_instance() is None, reason + else: + assert ig.find_largest_idle_instance() == \ + instances[instance_fit_index], reason + diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 0920767c57..ccb84c2fa1 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -627,10 +627,12 @@ class TestAdhocRun(TestJobExecution): class TestIsolatedExecution(TestJobExecution): ISOLATED_HOST = 'some-isolated-host' + ISOLATED_CONTROLLER_HOST = 'some-isolated-controller-host' def get_instance(self): instance = super(TestIsolatedExecution, self).get_instance() - instance.get_isolated_execution_node_name = mock.Mock(return_value=self.ISOLATED_HOST) + instance.controller_node = self.ISOLATED_CONTROLLER_HOST + instance.execution_node = self.ISOLATED_HOST return instance def test_with_ssh_credentials(self):