Merge pull request #1922 from chrismeyersfsu/improvement-accurate_capacity

per-instance accurate capacity consumption
This commit is contained in:
Chris Meyers
2018-06-04 14:36:58 -04:00
committed by GitHub
14 changed files with 278 additions and 84 deletions

View File

@@ -702,7 +702,8 @@ class UnifiedJobSerializer(BaseSerializer):
model = UnifiedJob
fields = ('*', 'unified_job_template', 'launch_type', 'status',
'failed', 'started', 'finished', 'elapsed', 'job_args',
'job_cwd', 'job_env', 'job_explanation', 'execution_node',
'job_cwd', 'job_env', 'job_explanation',
'execution_node', 'controller_node',
'result_traceback', 'event_processing_finished')
extra_kwargs = {
'unified_job_template': {
@@ -3434,7 +3435,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class Meta:
model = WorkflowJob
fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous',
'-execution_node', '-event_processing_finished',)
'-execution_node', '-event_processing_finished', '-controller_node',)
def get_related(self, obj):
res = super(WorkflowJobSerializer, self).get_related(obj)
@@ -3463,7 +3464,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer):
class Meta:
fields = ('*', '-execution_node',)
fields = ('*', '-execution_node', '-controller_node',)
class WorkflowJobCancelSerializer(WorkflowJobSerializer):

View File

@@ -468,13 +468,11 @@ class IsolatedManager(object):
return OutputEventFilter(job_event_callback)
def run(self, instance, host, private_data_dir, proot_temp_dir):
def run(self, instance, private_data_dir, proot_temp_dir):
"""
Run a job on an isolated host.
:param instance: a `model.Job` instance
:param host: the hostname (or IP address) to run the
isolated job on
:param private_data_dir: an absolute path on the local file system
where job-specific data should be written
(i.e., `/tmp/ansible_awx_xyz/`)
@@ -486,7 +484,7 @@ class IsolatedManager(object):
`ansible-playbook` run.
"""
self.instance = instance
self.host = host
self.host = instance.execution_node
self.private_data_dir = private_data_dir
self.proot_temp_dir = proot_temp_dir
status, rc = self.dispatch()

View File

@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-05-25 18:58
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0039_v330_custom_venv_help_text'),
]
operations = [
migrations.AddField(
model_name='unifiedjob',
name='controller_node',
field=models.TextField(blank=True, default=b'', editable=False, help_text='The instance that managed the isolated execution environment.'),
),
]

View File

@@ -1,6 +1,8 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import six
import random
from decimal import Decimal
from django.core.exceptions import ValidationError
@@ -11,7 +13,6 @@ from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from django.utils.timezone import now, timedelta
import six
from solo.models import SingletonModel
from awx import __version__ as awx_application_version
@@ -92,6 +93,10 @@ class Instance(BaseModel):
return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname,
status__in=('running', 'waiting')))
@property
def remaining_capacity(self):
return self.capacity - self.consumed_capacity
@property
def role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
@@ -187,6 +192,31 @@ class InstanceGroup(BaseModel, RelatedJobsMixin):
validate_queuename(self.name)
return self.name
def fit_task_to_most_remaining_capacity_instance(self, task):
instance_most_capacity = None
for i in self.instances.filter(capacity__gt=0).order_by('hostname'):
if i.remaining_capacity >= task.task_impact and \
(instance_most_capacity is None or
i.remaining_capacity > instance_most_capacity.remaining_capacity):
instance_most_capacity = i
return instance_most_capacity
def find_largest_idle_instance(self):
largest_instance = None
for i in self.instances.filter(capacity__gt=0).order_by('hostname'):
if i.jobs_running == 0:
if largest_instance is None:
largest_instance = i
elif i.capacity > largest_instance.capacity:
largest_instance = i
return largest_instance
def choose_online_controller_node(self):
return random.choice(list(self.controller
.instances
.filter(capacity__gt=0)
.values_list('hostname', flat=True)))
class TowerScheduleState(SingletonModel):
schedule_last_run = models.DateTimeField(auto_now_add=True)

View File

@@ -507,7 +507,8 @@ class StdoutMaxBytesExceeded(Exception):
self.supported = supported
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin):
class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique,
UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin):
'''
Concrete base class for unified job run by the task engine.
'''
@@ -571,6 +572,12 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
editable=False,
help_text=_("The node the job executed on."),
)
controller_node = models.TextField(
blank=True,
default='',
editable=False,
help_text=_("The instance that managed the isolated execution environment."),
)
notifications = models.ManyToManyField(
'Notification',
editable=False,
@@ -1228,17 +1235,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
raise RuntimeError("Expected celery_task_id to be set on model.")
kwargs['task_id'] = self.celery_task_id
task_class = self._get_task_class()
from awx.main.models.ha import InstanceGroup
ig = InstanceGroup.objects.get(name=queue)
args = [self.pk]
if ig.controller_id:
if self.supports_isolation(): # case of jobs and ad hoc commands
isolated_instance = ig.instances.order_by('-capacity').first()
args.append(isolated_instance.hostname)
else: # proj & inv updates, system jobs run on controller
queue = ig.controller.name
kwargs['queue'] = queue
task_class().apply_async(args, opts, **kwargs)
task_class().apply_async([self.pk], opts, **kwargs)
def start(self, error_callback, success_callback, **kwargs):
'''
@@ -1400,3 +1398,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
r['{}_schedule_id'.format(name)] = self.schedule.pk
r['{}_schedule_name'.format(name)] = self.schedule.name
return r
def get_celery_queue_name(self):
return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE
def is_isolated(self):
return bool(self.controller_node)

View File

@@ -7,6 +7,7 @@ import logging
import uuid
import json
import six
import random
from sets import Set
# Django
@@ -234,7 +235,7 @@ class TaskManager():
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
def start_task(self, task, rampart_group, dependent_tasks=None):
def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
from awx.main.tasks import handle_work_error, handle_work_success
dependent_tasks = dependent_tasks or []
@@ -265,11 +266,21 @@ class TaskManager():
elif not task.supports_isolation() and rampart_group.controller_id:
# non-Ansible jobs on isolated instances run on controller
task.instance_group = rampart_group.controller
logger.info('Submitting isolated %s to queue %s via %s.',
task.log_format, task.instance_group_id, rampart_group.controller_id)
task.execution_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True)))
logger.info(six.text_type('Submitting isolated {} to queue {}.').format(
task.log_format, task.instance_group.name, task.execution_node))
elif task.supports_isolation() and rampart_group.controller_id:
task.instance_group = rampart_group
task.execution_node = instance.hostname
task.controller_node = rampart_group.choose_online_controller_node()
logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format(
task.log_format, task.execution_node, task.controller_node))
else:
task.instance_group = rampart_group
logger.info('Submitting %s to instance group %s.', task.log_format, task.instance_group_id)
if instance is not None:
task.execution_node = instance.hostname
logger.info(six.text_type('Submitting {} to <instance group, instance> <{},{}>.').format(
task.log_format, task.instance_group_id, task.execution_node))
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()
@@ -280,11 +291,10 @@ class TaskManager():
def post_commit():
task.websocket_emit_status(task.status)
if task.status != 'failed':
if rampart_group is not None:
actual_queue=rampart_group.name
else:
actual_queue=settings.CELERY_DEFAULT_QUEUE
task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue)
task.start_celery_task(opts,
error_callback=error_handler,
success_callback=success_handler,
queue=task.get_celery_queue_name())
connection.on_commit(post_commit)
@@ -433,17 +443,32 @@ class TaskManager():
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
idle_instance_that_fits = None
for rampart_group in preferred_instance_groups:
if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
if self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug(six.text_type("Skipping group {} capacity <= 0").format(rampart_group.name))
continue
if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug(six.text_type("Starting dependent {} in group {}").format(task.log_format, rampart_group.name))
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
if execution_instance:
logger.debug(six.text_type("Starting dependent {} in group {} instance {}").format(
task.log_format, rampart_group.name, execution_instance.hostname))
elif not execution_instance and idle_instance_that_fits:
execution_instance = idle_instance_that_fits
logger.debug(six.text_type("Starting dependent {} in group {} on idle instance {}").format(
task.log_format, rampart_group.name, execution_instance.hostname))
if execution_instance:
self.graph[rampart_group.name]['graph'].add_job(task)
tasks_to_fail = filter(lambda t: t != task, dependency_tasks)
tasks_to_fail += [dependent_task]
self.start_task(task, rampart_group, tasks_to_fail)
self.start_task(task, rampart_group, tasks_to_fail, execution_instance)
found_acceptable_queue = True
break
else:
logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format(
rampart_group.name, task.log_format, task.task_impact))
if not found_acceptable_queue:
logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
@@ -455,25 +480,35 @@ class TaskManager():
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
idle_instance_that_fits = None
if isinstance(task, WorkflowJob):
self.start_task(task, None, task.get_jobs_fail_chain())
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue
for rampart_group in preferred_instance_groups:
if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if remaining_capacity <= 0:
logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format(
rampart_group.name, remaining_capacity))
continue
if not self.would_exceed_capacity(task, rampart_group.name):
logger.debug(six.text_type("Starting {} in group {} (remaining_capacity={})").format(
task.log_format, rampart_group.name, remaining_capacity))
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
if execution_instance:
logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
elif not execution_instance and idle_instance_that_fits:
execution_instance = idle_instance_that_fits
logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
if execution_instance:
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain())
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
break
else:
logger.debug(six.text_type("Not enough capacity to run {} on {} (remaining_capacity={})").format(
task.log_format, rampart_group.name, remaining_capacity))
logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format(
rampart_group.name, task.log_format, task.task_impact))
if not found_acceptable_queue:
logger.debug(six.text_type("{} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))

View File

@@ -868,14 +868,11 @@ class BaseTask(Task):
'''
@with_path_cleanup
def run(self, pk, isolated_host=None, **kwargs):
def run(self, pk, **kwargs):
'''
Run the job/task and capture its output.
'''
execution_node = settings.CLUSTER_HOST_ID
if isolated_host is not None:
execution_node = isolated_host
instance = self.update_model(pk, status='running', execution_node=execution_node,
instance = self.update_model(pk, status='running',
start_args='') # blank field to remove encrypted passwords
instance.websocket_emit_status("running")
@@ -884,8 +881,8 @@ class BaseTask(Task):
extra_update_fields = {}
event_ct = 0
stdout_handle = None
try:
kwargs['isolated'] = isolated_host is not None
self.pre_run_hook(instance, **kwargs)
if instance.cancel_flag:
instance = self.update_model(instance.pk, status='canceled')
@@ -945,7 +942,7 @@ class BaseTask(Task):
credential, env, safe_env, args, safe_args, kwargs['private_data_dir']
)
if isolated_host is None:
if instance.is_isolated() is False:
stdout_handle = self.get_stdout_handle(instance)
else:
stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle(
@@ -961,7 +958,7 @@ class BaseTask(Task):
ssh_key_path = self.get_ssh_key_path(instance, **kwargs)
# If we're executing on an isolated host, don't bother adding the
# key to the agent in this environment
if ssh_key_path and isolated_host is None:
if ssh_key_path and instance.is_isolated() is False:
ssh_auth_sock = os.path.join(kwargs['private_data_dir'], 'ssh_auth.sock')
args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock)
safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock)
@@ -981,11 +978,11 @@ class BaseTask(Task):
proot_cmd=getattr(settings, 'AWX_PROOT_CMD', 'bwrap'),
)
instance = self.update_model(instance.pk, output_replacements=output_replacements)
if isolated_host:
if instance.is_isolated() is True:
manager_instance = isolated_manager.IsolatedManager(
args, cwd, env, stdout_handle, ssh_key_path, **_kw
)
status, rc = manager_instance.run(instance, isolated_host,
status, rc = manager_instance.run(instance,
kwargs['private_data_dir'],
kwargs.get('proot_temp_dir'))
else:
@@ -1336,7 +1333,7 @@ class RunJob(BaseTask):
job_request_id = '' if self.request.id is None else self.request.id
pu_ig = job.instance_group
pu_en = job.execution_node
if kwargs['isolated']:
if job.is_isolated() is True:
pu_ig = pu_ig.controller
pu_en = settings.CLUSTER_HOST_ID
local_project_sync = job.project.create_project_update(

View File

@@ -88,7 +88,7 @@ class TestIsolatedRuns:
with mock.patch.object(job, '_get_task_class') as task_class:
task_class.return_value = MockTaskClass
job.start_celery_task([], error_callback, success_callback, 'thepentagon')
mock_async.assert_called_with([job.id, 'iso2'], [],
mock_async.assert_called_with([job.id], [],
link_error=error_callback,
link=success_callback,
queue='thepentagon',
@@ -100,7 +100,7 @@ class TestIsolatedRuns:
with mock.patch.object(job, '_get_task_class') as task_class:
task_class.return_value = MockTaskClass
job.start_celery_task([], error_callback, success_callback, 'thepentagon')
mock_async.assert_called_with([job.id, 'iso1'], [],
mock_async.assert_called_with([job.id], [],
link_error=error_callback,
link=success_callback,
queue='thepentagon',

View File

@@ -31,7 +31,7 @@ def test_multi_group_basic_job_launch(instance_factory, default_instance_group,
mock_task_impact.return_value = 500
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, []), mock.call(j2, ig2, [])])
TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j2, ig2, [], i2)])
@@ -65,15 +65,18 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
pu = p.project_updates.first()
TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [j1])
TaskManager.start_task.assert_called_once_with(pu,
default_instance_group,
[j1],
default_instance_group.instances.all()[0])
pu.finished = pu.created + timedelta(seconds=1)
pu.status = "successful"
pu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_any_call(j1, ig1, [])
TaskManager.start_task.assert_any_call(j2, ig2, [])
TaskManager.start_task.assert_any_call(j1, ig1, [], i1)
TaskManager.start_task.assert_any_call(j2, ig2, [], i2)
assert TaskManager.start_task.call_count == 2
@@ -85,7 +88,7 @@ def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_in
wfj.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(wfj, None, [])
TaskManager.start_task.assert_called_once_with(wfj, None, [], None)
assert wfj.instance_group is None
@@ -131,8 +134,9 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig1, []),
mock.call(j2, ig2, [])])
mock_job.assert_has_calls([mock.call(j1, ig1, [], i1),
mock.call(j1_1, ig1, [], i1),
mock.call(j2, ig2, [], i2)])
assert mock_job.call_count == 3
@@ -163,7 +167,8 @@ def test_failover_group_run(instance_factory, default_instance_group, mocker,
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig2, [])])
mock_job.assert_has_calls([mock.call(j1, ig1, [], i1),
mock.call(j1_1, ig2, [], i2)])
assert mock_job.call_count == 2

View File

@@ -18,6 +18,7 @@ from awx.main.models.notifications import JobNotificationMixin
@pytest.mark.django_db
def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start"])
@@ -26,11 +27,12 @@ def test_single_job_scheduler_launch(default_instance_group, job_template_factor
j.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
@pytest.mark.django_db
def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start", "job_should_not_start"])
@@ -42,16 +44,17 @@ def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_temp
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mocker.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance)
@pytest.mark.django_db
def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start", "job_should_not_start"])
@@ -68,12 +71,13 @@ def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group,
j2.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, []),
mock.call(j2, default_instance_group, [])])
TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, [], instance),
mock.call(j2, default_instance_group, [], instance)])
@pytest.mark.django_db
def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory, mocker):
instance = default_instance_group.instances.all()[0]
objects1 = job_template_factory('jt1', organization='org1', project='proj1',
inventory='inv1', credential='cred1',
jobs=["job_should_start"])
@@ -91,20 +95,20 @@ def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory
mock_task_impact.return_value = 500
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j1, default_instance_group, [])
mock_job.assert_called_once_with(j1, default_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once_with(j2, default_instance_group, [])
mock_job.assert_called_once_with(j2, default_instance_group, [], instance)
@pytest.mark.django_db
def test_single_job_dependencies_project_launch(default_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start"])
instance = default_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
@@ -121,12 +125,12 @@ def test_single_job_dependencies_project_launch(default_instance_group, job_temp
mock_pu.assert_called_once_with(j)
pu = [x for x in p.project_updates.all()]
assert len(pu) == 1
TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [j])
TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [j], instance)
pu[0].status = "successful"
pu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
@pytest.mark.django_db
@@ -134,6 +138,7 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group,
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start"])
instance = default_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
@@ -151,12 +156,12 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group,
mock_iu.assert_called_once_with(j, ii)
iu = [x for x in ii.inventory_updates.all()]
assert len(iu) == 1
TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j])
TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j], instance)
iu[0].status = "successful"
iu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
@pytest.mark.django_db
@@ -164,6 +169,7 @@ def test_job_dependency_with_already_updated(default_instance_group, job_templat
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["job_should_start"])
instance = default_instance_group.instances.all()[0]
j = objects.jobs["job_should_start"]
j.status = 'pending'
j.save()
@@ -185,11 +191,12 @@ def test_job_dependency_with_already_updated(default_instance_group, job_templat
mock_iu.assert_not_called()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance)
@pytest.mark.django_db
def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory):
instance = default_instance_group.instances.all()[0]
objects = job_template_factory('jt', organization='org1', project='proj',
inventory='inv', credential='cred',
jobs=["first_job", "second_job"])
@@ -218,8 +225,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory
TaskManager().schedule()
pu = p.project_updates.first()
iu = ii.inventory_updates.first()
TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1]),
mock.call(iu, default_instance_group, [pu, j1])])
TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1], instance),
mock.call(iu, default_instance_group, [pu, j1], instance)])
pu.status = "successful"
pu.finished = pu.created + timedelta(seconds=1)
pu.save()
@@ -228,12 +235,12 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory
iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance)
j1.status = "successful"
j1.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [])
TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance)
pu = [x for x in p.project_updates.all()]
iu = [x for x in ii.inventory_updates.all()]
assert len(pu) == 1

View File

@@ -0,0 +1,85 @@
import pytest
import mock
from mock import Mock
from awx.main.models import (
Job,
InstanceGroup,
)
def T(impact):
j = mock.Mock(Job())
j.task_impact = impact
return j
def Is(param):
'''
param:
[remaining_capacity1, remaining_capacity2, remaining_capacity3, ...]
[(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...]
'''
instances = []
if isinstance(param[0], tuple):
for (jobs_running, capacity) in param:
inst = Mock()
inst.capacity = capacity
inst.jobs_running = jobs_running
instances.append(inst)
else:
for i in param:
inst = Mock()
inst.remaining_capacity = i
instances.append(inst)
return instances
class TestInstanceGroup(object):
@pytest.mark.parametrize('task,instances,instance_fit_index,reason', [
(T(100), Is([100]), 0, "Only one, pick it"),
(T(100), Is([100, 100]), 0, "Two equally good fits, pick the first"),
(T(100), Is([50, 100]), 1, "First instance not as good as second instance"),
(T(100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."),
(T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
])
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=lambda x: instances))):
ig = InstanceGroup(id=10)
if instance_fit_index is None:
assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason
else:
assert ig.fit_task_to_most_remaining_capacity_instance(task) == \
instances[instance_fit_index], reason
@pytest.mark.parametrize('instances,instance_fit_index,reason', [
(Is([(0, 100)]), 0, "One idle instance, pick it"),
(Is([(1, 100)]), None, "One un-idle instance, pick nothing"),
(Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"),
(Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"),
(Is([(0, 0)]), None, "One idle but down instance, don't pick it"),
])
def test_find_largest_idle_instance(self, instances, instance_fit_index, reason):
def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances)
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=filter_offline_instances))):
ig = InstanceGroup(id=10)
if instance_fit_index is None:
assert ig.find_largest_idle_instance() is None, reason
else:
assert ig.find_largest_idle_instance() == \
instances[instance_fit_index], reason

View File

@@ -196,7 +196,7 @@ def parse_extra_vars(args):
return extra_vars
class TestJobExecution:
class TestJobExecution(object):
"""
For job runs, test that `ansible-playbook` is invoked with the proper
arguments, environment variables, and pexpect passwords for a variety of
@@ -440,7 +440,7 @@ class TestGenericRun(TestJobExecution):
with pytest.raises(Exception):
self.task.run(self.pk)
for c in [
mock.call(self.pk, execution_node=settings.CLUSTER_HOST_ID, status='running', start_args=''),
mock.call(self.pk, status='running', start_args=''),
mock.call(self.pk, status='canceled')
]:
assert c in self.task.update_model.call_args_list
@@ -626,7 +626,14 @@ class TestAdhocRun(TestJobExecution):
class TestIsolatedExecution(TestJobExecution):
REMOTE_HOST = 'some-isolated-host'
ISOLATED_HOST = 'some-isolated-host'
ISOLATED_CONTROLLER_HOST = 'some-isolated-controller-host'
def get_instance(self):
instance = super(TestIsolatedExecution, self).get_instance()
instance.controller_node = self.ISOLATED_CONTROLLER_HOST
instance.execution_node = self.ISOLATED_HOST
return instance
def test_with_ssh_credentials(self):
ssh = CredentialType.defaults['ssh']()
@@ -659,12 +666,12 @@ class TestIsolatedExecution(TestJobExecution):
f.write(data)
return ('successful', 0)
self.run_pexpect.side_effect = _mock_job_artifacts
self.task.run(self.pk, self.REMOTE_HOST)
self.task.run(self.pk)
playbook_run = self.run_pexpect.call_args_list[0][0]
assert ' '.join(playbook_run[0]).startswith(' '.join([
'ansible-playbook', 'run_isolated.yml', '-u', settings.AWX_ISOLATED_USERNAME,
'-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT), '-i', self.REMOTE_HOST + ',',
'-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT), '-i', self.ISOLATED_HOST + ',',
'-e',
]))
extra_vars = playbook_run[0][playbook_run[0].index('-e') + 1]
@@ -705,7 +712,7 @@ class TestIsolatedExecution(TestJobExecution):
with mock.patch('requests.get') as mock_get:
mock_get.return_value = mock.Mock(content=inventory)
with pytest.raises(Exception):
self.task.run(self.pk, self.REMOTE_HOST)
self.task.run(self.pk, self.ISOLATED_HOST)
class TestJobCredentials(TestJobExecution):

View File

@@ -14,4 +14,6 @@ services:
- "../awx/main/expect:/awx_devel"
- "../awx/lib:/awx_lib"
- "/sys/fs/cgroup:/sys/fs/cgroup:ro"
privileged: true
tmpfs:
- "/tmp:exec"
- "/run"

View File

@@ -27,4 +27,7 @@ RUN ssh-keygen -A
RUN mkdir -p /root/.ssh
RUN touch /root/.ssh/authorized_keys
STOPSIGNAL SIGRTMIN+3
CMD ["/usr/sbin/init"]