do not choose offline instances

This commit is contained in:
chris meyers
2018-06-01 15:36:24 -04:00
parent 9d732cdbdf
commit b94cf379f6
5 changed files with 101 additions and 9 deletions

View File

@@ -1,6 +1,8 @@
# Copyright (c) 2015 Ansible, Inc. # Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved. # All Rights Reserved.
import six
import random
from decimal import Decimal from decimal import Decimal
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
@@ -11,7 +13,6 @@ from django.utils.translation import ugettext_lazy as _
from django.conf import settings from django.conf import settings
from django.utils.timezone import now, timedelta from django.utils.timezone import now, timedelta
import six
from solo.models import SingletonModel from solo.models import SingletonModel
from awx import __version__ as awx_application_version from awx import __version__ as awx_application_version
@@ -193,7 +194,7 @@ class InstanceGroup(BaseModel, RelatedJobsMixin):
def fit_task_to_most_remaining_capacity_instance(self, task): def fit_task_to_most_remaining_capacity_instance(self, task):
instance_most_capacity = None instance_most_capacity = None
for i in self.instances.order_by('hostname'): for i in self.instances.filter(capacity__gt=0).order_by('hostname'):
if i.remaining_capacity >= task.task_impact and \ if i.remaining_capacity >= task.task_impact and \
(instance_most_capacity is None or (instance_most_capacity is None or
i.remaining_capacity > instance_most_capacity.remaining_capacity): i.remaining_capacity > instance_most_capacity.remaining_capacity):
@@ -202,7 +203,7 @@ class InstanceGroup(BaseModel, RelatedJobsMixin):
def find_largest_idle_instance(self): def find_largest_idle_instance(self):
largest_instance = None largest_instance = None
for i in self.instances.order_by('hostname'): for i in self.instances.filter(capacity__gt=0).order_by('hostname'):
if i.jobs_running == 0: if i.jobs_running == 0:
if largest_instance is None: if largest_instance is None:
largest_instance = i largest_instance = i
@@ -210,6 +211,12 @@ class InstanceGroup(BaseModel, RelatedJobsMixin):
largest_instance = i largest_instance = i
return largest_instance return largest_instance
def choose_online_controller_node(self):
return random.choice(list(self.controller
.instances
.filter(capacity__gt=0)
.values_list('hostname', flat=True)))
class TowerScheduleState(SingletonModel): class TowerScheduleState(SingletonModel):
schedule_last_run = models.DateTimeField(auto_now_add=True) schedule_last_run = models.DateTimeField(auto_now_add=True)

View File

@@ -1402,5 +1402,5 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def get_celery_queue_name(self): def get_celery_queue_name(self):
return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE
def get_isolated_execution_node_name(self): def is_isolated(self):
return self.execution_node if self.controller_node else None return bool(self.controller_node)

View File

@@ -270,17 +270,15 @@ class TaskManager():
logger.info(six.text_type('Submitting isolated {} to queue {}.').format( logger.info(six.text_type('Submitting isolated {} to queue {}.').format(
task.log_format, task.instance_group.name, task.execution_node)) task.log_format, task.instance_group.name, task.execution_node))
elif task.supports_isolation() and rampart_group.controller_id: elif task.supports_isolation() and rampart_group.controller_id:
# TODO: Select from only online nodes in the controller node
task.instance_group = rampart_group task.instance_group = rampart_group
task.execution_node = instance.hostname task.execution_node = instance.hostname
task.controller_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True))) task.controller_node = rampart_group.choose_online_controller_node()
logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format( logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format(
task.log_format, task.execution_node, task.controller_node)) task.log_format, task.execution_node, task.controller_node))
else: else:
task.instance_group = rampart_group task.instance_group = rampart_group
if instance is not None: if instance is not None:
task.execution_node = instance.hostname task.execution_node = instance.hostname
logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format))
logger.info(six.text_type('Submitting {} to <instance group, instance> <{},{}>.').format( logger.info(six.text_type('Submitting {} to <instance group, instance> <{},{}>.').format(
task.log_format, task.instance_group_id, task.execution_node)) task.log_format, task.instance_group_id, task.execution_node))
with disable_activity_stream(): with disable_activity_stream():

View File

@@ -0,0 +1,85 @@
import pytest
import mock
from mock import Mock
from awx.main.models import (
Job,
InstanceGroup,
)
def T(impact):
j = mock.Mock(Job())
j.task_impact = impact
return j
def Is(param):
'''
param:
[remaining_capacity1, remaining_capacity2, remaining_capacity3, ...]
[(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...]
'''
instances = []
if isinstance(param[0], tuple):
for (jobs_running, capacity) in param:
inst = Mock()
inst.capacity = capacity
inst.jobs_running = jobs_running
instances.append(inst)
else:
for i in param:
inst = Mock()
inst.remaining_capacity = i
instances.append(inst)
return instances
class TestInstanceGroup(object):
@pytest.mark.parametrize('task,instances,instance_fit_index,reason', [
(T(100), Is([100]), 0, "Only one, pick it"),
(T(100), Is([100, 100]), 0, "Two equally good fits, pick the first"),
(T(100), Is([50, 100]), 1, "First instance not as good as second instance"),
(T(100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."),
(T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
])
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=lambda x: instances))):
ig = InstanceGroup(id=10)
if instance_fit_index is None:
assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason
else:
assert ig.fit_task_to_most_remaining_capacity_instance(task) == \
instances[instance_fit_index], reason
@pytest.mark.parametrize('instances,instance_fit_index,reason', [
(Is([(0, 100)]), 0, "One idle instance, pick it"),
(Is([(1, 100)]), None, "One un-idle instance, pick nothing"),
(Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"),
(Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"),
(Is([(0, 0)]), None, "One idle but down instance, don't pick it"),
])
def test_find_largest_idle_instance(self, instances, instance_fit_index, reason):
def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances)
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=filter_offline_instances))):
ig = InstanceGroup(id=10)
if instance_fit_index is None:
assert ig.find_largest_idle_instance() is None, reason
else:
assert ig.find_largest_idle_instance() == \
instances[instance_fit_index], reason

View File

@@ -627,10 +627,12 @@ class TestAdhocRun(TestJobExecution):
class TestIsolatedExecution(TestJobExecution): class TestIsolatedExecution(TestJobExecution):
ISOLATED_HOST = 'some-isolated-host' ISOLATED_HOST = 'some-isolated-host'
ISOLATED_CONTROLLER_HOST = 'some-isolated-controller-host'
def get_instance(self): def get_instance(self):
instance = super(TestIsolatedExecution, self).get_instance() instance = super(TestIsolatedExecution, self).get_instance()
instance.get_isolated_execution_node_name = mock.Mock(return_value=self.ISOLATED_HOST) instance.controller_node = self.ISOLATED_CONTROLLER_HOST
instance.execution_node = self.ISOLATED_HOST
return instance return instance
def test_with_ssh_credentials(self): def test_with_ssh_credentials(self):