Merge pull request #2882 from AlanCoding/just_credential2

[option2] move inventory source vault credential validation from view to model
This commit is contained in:
Alan Rominger 2018-08-21 13:24:50 -04:00 committed by GitHub
commit 598449c2ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 124 additions and 10 deletions

View File

@ -97,7 +97,7 @@ class DeprecatedCredentialField(serializers.IntegerField):
kwargs['allow_null'] = True
kwargs['default'] = None
kwargs['min_value'] = 1
kwargs['help_text'] = 'This resource has been deprecated and will be removed in a future release'
kwargs.setdefault('help_text', 'This resource has been deprecated and will be removed in a future release')
super(DeprecatedCredentialField, self).__init__(**kwargs)
def to_internal_value(self, pk):

View File

@ -1906,7 +1906,9 @@ class CustomInventoryScriptSerializer(BaseSerializer):
class InventorySourceOptionsSerializer(BaseSerializer):
credential = DeprecatedCredentialField()
credential = DeprecatedCredentialField(
help_text=_('Cloud credential to use for inventory updates.')
)
class Meta:
fields = ('*', 'source', 'source_path', 'source_script', 'source_vars', 'credential',

View File

@ -2883,17 +2883,14 @@ class InventorySourceCredentialsList(SubListAttachDetachAPIView):
relationship = 'credentials'
def is_valid_relation(self, parent, sub, created=False):
# Inventory source credentials are exclusive with all other credentials
# subject to change for https://github.com/ansible/awx/issues/277
# or https://github.com/ansible/awx/issues/223
if parent.credentials.exists():
return {'msg': _("Source already has credential assigned.")}
error = InventorySource.cloud_credential_validation(parent.source, sub)
if error:
return {'msg': error}
if sub.credential_type == 'vault':
# TODO: support this
return {"msg": _("Vault credentials are not yet supported for inventory sources.")}
else:
# Cloud credentials are exclusive with all other cloud credentials
cloud_cred_qs = parent.credentials.exclude(credential_type__kind='vault')
if cloud_cred_qs.exists():
return {'msg': _("Source already has cloud credential assigned.")}
return None

View File

@ -1262,6 +1262,11 @@ class InventorySourceOptions(BaseModel):
'Credentials of type machine, source control, insights and vault are '
'disallowed for custom inventory sources.'
)
elif source == 'scm' and cred and cred.credential_type.kind in ('insights', 'vault'):
return _(
'Credentials of type insights and vault are '
'disallowed for scm inventory sources.'
)
return None
def get_inventory_plugin_name(self):

View File

@ -365,6 +365,116 @@ def test_inventory_source_vars_prohibition(post, inventory, admin_user):
assert 'FOOBAR' in r.data['source_vars'][0]
@pytest.mark.django_db
class TestInventorySourceCredential:
def test_need_cloud_credential(self, inventory, admin_user, post):
"""Test that a cloud-based source requires credential"""
r = post(
url=reverse('api:inventory_source_list'),
data={'inventory': inventory.pk, 'name': 'foo', 'source': 'openstack'},
expect=400,
user=admin_user
)
assert 'Credential is required for a cloud source' in r.data['credential'][0]
def test_ec2_no_credential(self, inventory, admin_user, post):
"""Test that an ec2 inventory source can be added with no credential"""
post(
url=reverse('api:inventory_source_list'),
data={'inventory': inventory.pk, 'name': 'fobar', 'source': 'ec2'},
expect=201,
user=admin_user
)
def test_validating_credential_type(self, organization, inventory, admin_user, post):
"""Test that cloud sources must use their respective credential type"""
from awx.main.models.credential import Credential, CredentialType
openstack = CredentialType.defaults['openstack']()
openstack.save()
os_cred = Credential.objects.create(
credential_type=openstack, name='bar', organization=organization)
r = post(
url=reverse('api:inventory_source_list'),
data={
'inventory': inventory.pk, 'name': 'fobar', 'source': 'ec2',
'credential': os_cred.pk
},
expect=400,
user=admin_user
)
assert 'Cloud-based inventory sources (such as ec2)' in r.data['credential'][0]
assert 'require credentials for the matching cloud service' in r.data['credential'][0]
def test_vault_credential_not_allowed(self, project, inventory, vault_credential, admin_user, post):
"""Vault credentials cannot be associated via the deprecated field"""
# TODO: when feature is added, add tests to use the related credentials
# endpoint for multi-vault attachment
r = post(
url=reverse('api:inventory_source_list'),
data={
'inventory': inventory.pk, 'name': 'fobar', 'source': 'scm',
'source_project': project.pk, 'source_path': '',
'credential': vault_credential.pk
},
expect=400,
user=admin_user
)
assert 'Credentials of type insights and vault' in r.data['credential'][0]
assert 'disallowed for scm inventory sources' in r.data['credential'][0]
def test_vault_credential_not_allowed_via_related(
self, project, inventory, vault_credential, admin_user, post):
"""Vault credentials cannot be associated via related endpoint"""
inv_src = InventorySource.objects.create(
inventory=inventory, name='foobar', source='scm',
source_project=project, source_path=''
)
r = post(
url=reverse('api:inventory_source_credentials_list', kwargs={'pk': inv_src.pk}),
data={
'id': vault_credential.pk
},
expect=400,
user=admin_user
)
assert 'Credentials of type insights and vault' in r.data['msg']
assert 'disallowed for scm inventory sources' in r.data['msg']
def test_credentials_relationship_mapping(self, project, inventory, organization, admin_user, post, patch):
"""The credentials relationship is used to manage the cloud credential
this test checks that replacement works"""
from awx.main.models.credential import Credential, CredentialType
openstack = CredentialType.defaults['openstack']()
openstack.save()
os_cred = Credential.objects.create(
credential_type=openstack, name='bar', organization=organization)
r = post(
url=reverse('api:inventory_source_list'),
data={
'inventory': inventory.pk, 'name': 'fobar', 'source': 'scm',
'source_project': project.pk, 'source_path': '',
'credential': os_cred.pk
},
expect=201,
user=admin_user
)
aws = CredentialType.defaults['aws']()
aws.save()
aws_cred = Credential.objects.create(
credential_type=aws, name='bar2', organization=organization)
inv_src = InventorySource.objects.get(pk=r.data['id'])
assert list(inv_src.credentials.values_list('id', flat=True)) == [os_cred.pk]
patch(
url=inv_src.get_absolute_url(),
data={
'credential': aws_cred.pk
},
expect=200,
user=admin_user
)
assert list(inv_src.credentials.values_list('id', flat=True)) == [aws_cred.pk]
@pytest.mark.django_db
class TestControlledBySCM:
'''