mirror of
https://github.com/ansible/awx.git
synced 2026-01-13 02:50:02 -03:30
Merge pull request #105 from ryanpetrello/fix-7323
add validation errors for certain dependent credential fields
This commit is contained in:
commit
25f16f9409
@ -4,6 +4,7 @@
|
||||
# Python
|
||||
import copy
|
||||
import json
|
||||
import re
|
||||
import six
|
||||
|
||||
from jinja2 import Environment, StrictUndefined
|
||||
@ -467,6 +468,7 @@ class CredentialInputField(JSONSchemaField):
|
||||
return {
|
||||
'type': 'object',
|
||||
'properties': properties,
|
||||
'dependencies': model_instance.credential_type.inputs.get('dependencies', {}),
|
||||
'additionalProperties': False,
|
||||
}
|
||||
|
||||
@ -504,6 +506,28 @@ class CredentialInputField(JSONSchemaField):
|
||||
).iter_errors(decrypted_values):
|
||||
if error.validator == 'pattern' and 'error' in error.schema:
|
||||
error.message = error.schema['error'] % error.instance
|
||||
if error.validator == 'dependencies':
|
||||
# replace the default error messaging w/ a better i18n string
|
||||
# I wish there was a better way to determine the parameters of
|
||||
# this validation failure, but the exception jsonschema raises
|
||||
# doesn't include them as attributes (just a hard-coded error
|
||||
# string)
|
||||
match = re.search(
|
||||
# 'foo' is a dependency of 'bar'
|
||||
"'" # apostrophe
|
||||
"([^']+)" # one or more non-apostrophes (first group)
|
||||
"'[\w ]+'" # one or more words/spaces
|
||||
"([^']+)", # second group
|
||||
error.message,
|
||||
)
|
||||
if match:
|
||||
label, extraneous = match.groups()
|
||||
if error.schema['properties'].get(label):
|
||||
label = error.schema['properties'][label]['label']
|
||||
errors[extraneous] = [
|
||||
_('cannot be set unless "%s" is set') % label
|
||||
]
|
||||
continue
|
||||
if 'id' not in error.schema:
|
||||
# If the error is not for a specific field, it's specific to
|
||||
# `inputs` in general
|
||||
@ -542,11 +566,12 @@ class CredentialInputField(JSONSchemaField):
|
||||
|
||||
if model_instance.has_encrypted_ssh_key_data and not value.get('ssh_key_unlock'):
|
||||
errors['ssh_key_unlock'] = [_('must be set when SSH key is encrypted.')]
|
||||
if value.get('ssh_key_unlock'):
|
||||
if not model_instance.ssh_key_data:
|
||||
errors['ssh_key_unlock'] = [_('should not be set when SSH key is empty.')]
|
||||
elif not model_instance.has_encrypted_ssh_key_data:
|
||||
errors['ssh_key_unlock'] = [_('should not be set when SSH key is not encrypted.')]
|
||||
if all([
|
||||
model_instance.ssh_key_data,
|
||||
value.get('ssh_key_unlock'),
|
||||
not model_instance.has_encrypted_ssh_key_data
|
||||
]):
|
||||
errors['ssh_key_unlock'] = [_('should not be set when SSH key is not encrypted.')]
|
||||
|
||||
if errors:
|
||||
raise serializers.ValidationError({
|
||||
@ -601,6 +626,14 @@ class CredentialTypeInputField(JSONSchemaField):
|
||||
}
|
||||
|
||||
def validate(self, value, model_instance):
|
||||
if isinstance(value, dict) and 'dependencies' in value and \
|
||||
not model_instance.managed_by_tower:
|
||||
raise django_exceptions.ValidationError(
|
||||
_("'dependencies' is not supported for custom credentials."),
|
||||
code='invalid',
|
||||
params={'value': value},
|
||||
)
|
||||
|
||||
super(CredentialTypeInputField, self).validate(
|
||||
value, model_instance
|
||||
)
|
||||
|
||||
@ -632,7 +632,10 @@ def ssh(cls):
|
||||
'type': 'string',
|
||||
'secret': True,
|
||||
'ask_at_runtime': True
|
||||
}]
|
||||
}],
|
||||
'dependencies': {
|
||||
'ssh_key_unlock': ['ssh_key_data'],
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
@ -665,7 +668,10 @@ def scm(cls):
|
||||
'label': 'Private Key Passphrase',
|
||||
'type': 'string',
|
||||
'secret': True
|
||||
}]
|
||||
}],
|
||||
'dependencies': {
|
||||
'ssh_key_unlock': ['ssh_key_data'],
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
@ -725,7 +731,11 @@ def net(cls):
|
||||
'label': 'Authorize Password',
|
||||
'type': 'string',
|
||||
'secret': True,
|
||||
}]
|
||||
}],
|
||||
'dependencies': {
|
||||
'ssh_key_unlock': ['ssh_key_data'],
|
||||
'authorize_password': ['authorize'],
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import itertools
|
||||
import re
|
||||
|
||||
import mock # noqa
|
||||
import pytest
|
||||
@ -711,7 +712,7 @@ def test_inputs_cannot_contain_extra_fields(get, post, organization, admin, cred
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('field_name, field_value', itertools.product(
|
||||
['username', 'password', 'ssh_key_data', 'ssh_key_unlock', 'become_method', 'become_username', 'become_password'], # noqa
|
||||
['username', 'password', 'ssh_key_data', 'become_method', 'become_username', 'become_password'], # noqa
|
||||
['', None]
|
||||
))
|
||||
def test_nullish_field_data(get, post, organization, admin, field_name, field_value):
|
||||
@ -762,6 +763,33 @@ def test_falsey_field_data(get, post, organization, admin, field_value):
|
||||
assert cred.authorize is False
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('kind, extraneous', [
|
||||
['ssh', 'ssh_key_unlock'],
|
||||
['scm', 'ssh_key_unlock'],
|
||||
['net', 'ssh_key_unlock'],
|
||||
['net', 'authorize_password'],
|
||||
])
|
||||
def test_field_dependencies(get, post, organization, admin, kind, extraneous):
|
||||
_type = CredentialType.defaults[kind]()
|
||||
_type.save()
|
||||
params = {
|
||||
'name': 'Best credential ever',
|
||||
'credential_type': _type.pk,
|
||||
'organization': organization.id,
|
||||
'inputs': {extraneous: 'not needed'}
|
||||
}
|
||||
response = post(
|
||||
reverse('api:credential_list', kwargs={'version': 'v2'}),
|
||||
params,
|
||||
admin
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert re.search('cannot be set unless .+ is set.', response.content)
|
||||
|
||||
assert Credential.objects.count() == 0
|
||||
|
||||
|
||||
#
|
||||
# SCM Credentials
|
||||
#
|
||||
|
||||
@ -80,16 +80,15 @@ def test_update_managed_by_tower_xfail(patch, delete, admin):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_update_credential_type_in_use_xfail(patch, delete, admin):
|
||||
ssh = CredentialType.defaults['ssh']()
|
||||
ssh.managed_by_tower = False
|
||||
ssh.save()
|
||||
Credential(credential_type=ssh, name='My SSH Key').save()
|
||||
_type = CredentialType(kind='cloud', inputs={'fields': []})
|
||||
_type.save()
|
||||
Credential(credential_type=_type, name='My Custom Cred').save()
|
||||
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': ssh.pk})
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': _type.pk})
|
||||
response = patch(url, {'name': 'Some Other Name'}, admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': ssh.pk})
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': _type.pk})
|
||||
response = patch(url, {'inputs': {}}, admin)
|
||||
assert response.status_code == 403
|
||||
|
||||
@ -98,11 +97,10 @@ def test_update_credential_type_in_use_xfail(patch, delete, admin):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_update_credential_type_success(get, patch, delete, admin):
|
||||
ssh = CredentialType.defaults['ssh']()
|
||||
ssh.managed_by_tower = False
|
||||
ssh.save()
|
||||
_type = CredentialType(kind='cloud')
|
||||
_type.save()
|
||||
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': ssh.pk})
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': _type.pk})
|
||||
response = patch(url, {'name': 'Some Other Name'}, admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
@ -163,6 +161,21 @@ def test_create_managed_by_tower_readonly(get, post, admin):
|
||||
assert response.data['results'][0]['managed_by_tower'] is False
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_dependencies_not_supported(get, post, admin):
|
||||
response = post(reverse('api:credential_type_list'), {
|
||||
'kind': 'cloud',
|
||||
'name': 'Custom Credential Type',
|
||||
'inputs': {'dependencies': {'foo': ['bar']}},
|
||||
'injectors': {},
|
||||
}, admin)
|
||||
assert response.status_code == 400
|
||||
assert response.data['inputs'] == ["'dependencies' is not supported for custom credentials."]
|
||||
|
||||
response = get(reverse('api:credential_type_list'), admin)
|
||||
assert response.data['count'] == 0
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('kind', ['cloud', 'net'])
|
||||
def test_create_valid_kind(kind, get, post, admin):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user