mirror of
https://github.com/ansible/awx.git
synced 2026-02-01 09:38:10 -03:30
286 lines
12 KiB
Python
286 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import pytest
|
|
from unittest import mock
|
|
|
|
# AWX
|
|
from awx.main.models import Host, Inventory, InventorySource, InventoryUpdate, CredentialType, Credential, Job
|
|
from awx.main.constants import CLOUD_PROVIDERS
|
|
from awx.main.utils.filters import SmartFilter
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestInventoryScript:
|
|
def test_hostvars(self, inventory):
|
|
inventory.hosts.create(name='ahost', variables={"foo": "bar"})
|
|
assert inventory.get_script_data(hostvars=True)['_meta']['hostvars']['ahost'] == {'foo': 'bar'}
|
|
|
|
def test_towervars(self, inventory):
|
|
host = inventory.hosts.create(name='ahost')
|
|
assert inventory.get_script_data(hostvars=True, towervars=True)['_meta']['hostvars']['ahost'] == {
|
|
'remote_tower_enabled': 'true',
|
|
'remote_tower_id': host.id,
|
|
'remote_host_enabled': 'true',
|
|
'remote_host_id': host.id,
|
|
}
|
|
|
|
def test_all_group(self, inventory):
|
|
inventory.groups.create(name='all', variables={'a1': 'a1'})
|
|
# make sure we return a1 details in output
|
|
data = inventory.get_script_data()
|
|
assert 'all' in data
|
|
assert data['all'] == {'vars': {'a1': 'a1'}}
|
|
|
|
def test_empty_group(self, inventory):
|
|
inventory.groups.create(name='ghost')
|
|
data = inventory.get_script_data()
|
|
# canonical behavior same as ansible-inventory
|
|
# group not provided top-level to avoid host / group confusion
|
|
# still list group as a child of the all group
|
|
assert 'ghost' not in data
|
|
assert 'ghost' in data['all']['children']
|
|
|
|
def test_empty_group_with_vars(self, inventory):
|
|
inventory.groups.create(name='ghost2', variables={'foo': 'bar'})
|
|
data = inventory.get_script_data()
|
|
# must be top-level key so group vars can be provided
|
|
assert 'ghost2' in data
|
|
assert data['ghost2']['vars'] == {'foo': 'bar'}
|
|
assert 'ghost2' in data['all']['children']
|
|
|
|
def test_grandparent_group(self, inventory):
|
|
g1 = inventory.groups.create(name='g1', variables={'v1': 'v1'})
|
|
g2 = inventory.groups.create(name='g2', variables={'v2': 'v2'})
|
|
h1 = inventory.hosts.create(name='h1')
|
|
# h1 becomes indirect member of g1 group
|
|
g1.children.add(g2)
|
|
g2.hosts.add(h1)
|
|
# make sure we return g1 details in output
|
|
data = inventory.get_script_data(hostvars=1)
|
|
assert 'g1' in data
|
|
assert 'g2' in data
|
|
assert data['g1'] == {'children': ['g2'], 'vars': {'v1': 'v1'}}
|
|
assert data['g2'] == {'hosts': ['h1'], 'vars': {'v2': 'v2'}}
|
|
|
|
def test_slice_subset(self, inventory):
|
|
for i in range(3):
|
|
inventory.hosts.create(name='host{}'.format(i))
|
|
for i in range(3):
|
|
assert inventory.get_script_data(slice_number=i + 1, slice_count=3) == {'all': {'hosts': ['host{}'.format(i)]}}
|
|
|
|
def test_slice_subset_with_groups(self, inventory):
|
|
hosts = []
|
|
for i in range(3):
|
|
host = inventory.hosts.create(name='host{}'.format(i))
|
|
hosts.append(host)
|
|
g1 = inventory.groups.create(name='contains_all_hosts')
|
|
for host in hosts:
|
|
g1.hosts.add(host)
|
|
g2 = inventory.groups.create(name='contains_two_hosts')
|
|
for host in hosts[:2]:
|
|
g2.hosts.add(host)
|
|
for i in range(3):
|
|
expected_data = {
|
|
'contains_all_hosts': {'hosts': ['host{}'.format(i)]},
|
|
}
|
|
if i < 2:
|
|
expected_data['contains_two_hosts'] = {'hosts': ['host{}'.format(i)]}
|
|
data = inventory.get_script_data(slice_number=i + 1, slice_count=3)
|
|
data.pop('all')
|
|
assert data == expected_data
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestActiveCount:
|
|
def test_host_active_count(self, organization):
|
|
inv1 = Inventory.objects.create(name='inv1', organization=organization)
|
|
inv2 = Inventory.objects.create(name='inv2', organization=organization)
|
|
assert Host.objects.active_count() == 0
|
|
inv1.hosts.create(name='host1')
|
|
inv2.hosts.create(name='host1')
|
|
assert Host.objects.active_count() == 1
|
|
inv1.hosts.create(name='host2')
|
|
assert Host.objects.active_count() == 2
|
|
|
|
def test_active_count_minus_tower(self, inventory):
|
|
inventory.hosts.create(name='locally-managed-host')
|
|
source = inventory.inventory_sources.create(name='tower-source', source='controller')
|
|
source.hosts.create(name='remotely-managed-host', inventory=inventory)
|
|
assert Host.objects.active_count() == 1
|
|
|
|
def test_host_case_insensitivity(self, organization):
|
|
inv1 = Inventory.objects.create(name='inv1', organization=organization)
|
|
inv2 = Inventory.objects.create(name='inv2', organization=organization)
|
|
assert Host.objects.active_count() == 0
|
|
inv1.hosts.create(name='host1')
|
|
inv2.hosts.create(name='Host1')
|
|
assert Host.objects.active_count() == 1
|
|
inv1.hosts.create(name='host2')
|
|
assert Host.objects.active_count() == 2
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestSCMUpdateFeatures:
|
|
def test_source_location(self, scm_inventory_source):
|
|
# Combines project directory with the inventory file specified
|
|
inventory_update = InventoryUpdate(inventory_source=scm_inventory_source, source_path=scm_inventory_source.source_path)
|
|
p = scm_inventory_source.source_project
|
|
assert inventory_update.get_actual_source_path().endswith(f'_{p.id}__test_proj/inventory_file')
|
|
|
|
def test_no_unwanted_updates(self, scm_inventory_source):
|
|
# Changing the non-sensitive fields should not trigger update
|
|
with mock.patch.object(scm_inventory_source.source_project, 'update') as mck_update:
|
|
scm_inventory_source.name = 'edited_inventory'
|
|
scm_inventory_source.description = "I'm testing this!"
|
|
scm_inventory_source.save()
|
|
assert not mck_update.called
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestRelatedJobs:
|
|
def test_inventory_related(self, inventory):
|
|
job = Job.objects.create(inventory=inventory)
|
|
assert job.id in [jerb.id for jerb in inventory._get_related_jobs()]
|
|
|
|
def test_related_group_jobs(self, group):
|
|
job = Job.objects.create(inventory=group.inventory)
|
|
assert job.id in [jerb.id for jerb in group._get_related_jobs()]
|
|
|
|
def test_related_group_update(self, group):
|
|
src = group.inventory_sources.create(name='foo', source='ec2')
|
|
job = InventoryUpdate.objects.create(inventory_source=src, source=src.source)
|
|
assert job.id in [jerb.id for jerb in group._get_related_jobs()]
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestInventorySourceInjectors:
|
|
def test_extra_credentials(self, project, credential):
|
|
inventory_source = InventorySource.objects.create(name='foo', source='scm', source_project=project)
|
|
inventory_source.credentials.add(credential)
|
|
assert inventory_source.get_cloud_credential() == credential # for serializer
|
|
assert inventory_source.get_extra_credentials() == [credential]
|
|
|
|
inventory_source.source = 'ec2'
|
|
assert inventory_source.get_cloud_credential() == credential
|
|
assert inventory_source.get_extra_credentials() == []
|
|
|
|
def test_all_cloud_sources_covered(self):
|
|
"""Code in several places relies on the fact that the older
|
|
CLOUD_PROVIDERS constant contains the same names as what are
|
|
defined within the injectors
|
|
"""
|
|
# slight exception case for constructed, because it has a FQCN but is not a cloud source
|
|
assert set(CLOUD_PROVIDERS) | set(['constructed']) == set(InventorySource.injectors.keys())
|
|
|
|
@pytest.mark.parametrize('source,filename', [('ec2', 'aws_ec2.yml'), ('openstack', 'openstack.yml'), ('gce', 'gcp_compute.yml')])
|
|
def test_plugin_filenames(self, source, filename):
|
|
"""It is important that the filenames for inventory plugin files
|
|
are named correctly, because Ansible will reject files that do
|
|
not have these exact names
|
|
"""
|
|
injector = InventorySource.injectors[source]()
|
|
assert injector.filename == filename
|
|
|
|
@pytest.mark.parametrize(
|
|
'source,proper_name',
|
|
[
|
|
('ec2', 'amazon.aws.aws_ec2'),
|
|
('openstack', 'openstack.cloud.openstack'),
|
|
('gce', 'google.cloud.gcp_compute'),
|
|
('azure_rm', 'azure.azcollection.azure_rm'),
|
|
('vmware', 'community.vmware.vmware_vm_inventory'),
|
|
('rhv', 'ovirt.ovirt.ovirt'),
|
|
('satellite6', 'theforeman.foreman.foreman'),
|
|
('insights', 'redhatinsights.insights.insights'),
|
|
('controller', 'awx.awx.tower'),
|
|
],
|
|
)
|
|
def test_plugin_proper_names(self, source, proper_name):
|
|
injector = InventorySource.injectors[source]()
|
|
assert injector.get_proper_name() == proper_name
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_custom_source_custom_credential(organization):
|
|
credential_type = CredentialType.objects.create(
|
|
kind='cloud', name='MyCloud', inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string', 'secret': True}]}
|
|
)
|
|
credential = Credential.objects.create(name='my cred', credential_type=credential_type, organization=organization, inputs={'api_token': 'secret'})
|
|
inv_source = InventorySource.objects.create(source='scm')
|
|
inv_source.credentials.add(credential)
|
|
assert inv_source.get_cloud_credential() == credential
|
|
|
|
|
|
@pytest.fixture
|
|
def setup_ec2_gce(organization):
|
|
ec2_inv = Inventory.objects.create(name='test_ec2', organization=organization)
|
|
|
|
ec2_source = ec2_inv.inventory_sources.create(name='test_ec2_source', source='ec2')
|
|
for i in range(2):
|
|
ec2_host = ec2_inv.hosts.create(name='test_ec2_{0}'.format(i))
|
|
ec2_host.inventory_sources.add(ec2_source)
|
|
ec2_inv.save()
|
|
|
|
gce_inv = Inventory.objects.create(name='test_gce', organization=organization)
|
|
|
|
gce_source = gce_inv.inventory_sources.create(name='test_gce_source', source='gce')
|
|
gce_host = gce_inv.hosts.create(name='test_gce_host')
|
|
gce_host.inventory_sources.add(gce_source)
|
|
gce_inv.save()
|
|
|
|
|
|
@pytest.fixture
|
|
def setup_inventory_groups(inventory, group_factory):
|
|
groupA = group_factory('test_groupA')
|
|
groupB = group_factory('test_groupB')
|
|
|
|
host = Host.objects.create(name='single_host', inventory=inventory)
|
|
|
|
groupA.hosts.add(host)
|
|
groupA.save()
|
|
|
|
groupB.hosts.add(host)
|
|
groupB.save()
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_inventory_update_name(inventory, inventory_source):
|
|
iu = inventory_source.update()
|
|
assert inventory_source.name != inventory.name
|
|
assert iu.name == inventory.name + ' - ' + inventory_source.name
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_inventory_name_with_unicode(inventory, inventory_source):
|
|
inventory.name = 'オオオ'
|
|
inventory.save()
|
|
iu = inventory_source.update()
|
|
assert iu.name.startswith(inventory.name)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_inventory_update_excessively_long_name(inventory, inventory_source):
|
|
inventory.name = 'a' * 400 # field max length 512
|
|
inventory_source.name = 'b' * 400
|
|
iu = inventory_source.update()
|
|
assert inventory_source.name != inventory.name
|
|
assert iu.name.startswith(inventory.name)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
class TestHostManager:
|
|
def test_host_filter_not_smart(self, setup_ec2_gce, organization):
|
|
smart_inventory = Inventory(name='smart', organization=organization, host_filter='inventory_sources__source=ec2')
|
|
assert len(smart_inventory.hosts.all()) == 0
|
|
|
|
def test_host_distinctness(self, setup_inventory_groups, organization):
|
|
"""
|
|
two criteria would both yield the same host, check that we only get 1 copy here
|
|
"""
|
|
assert list(SmartFilter.query_from_string('name=single_host or name__startswith=single_')) == [Host.objects.get(name='single_host')]
|
|
|
|
# Things we can not easily test due to SQLite backend:
|
|
# 2 organizations with host of same name only has 1 entry in smart inventory
|
|
# smart inventory in 1 organization does not include host from another
|
|
# smart inventory correctly returns hosts in filter in same organization
|