mirror of
https://github.com/ansible/awx.git
synced 2026-04-06 18:49:21 -02:30
add credential input access methods
This commit is contained in:
@@ -385,6 +385,7 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
def encrypt_field(self, field, ask):
|
def encrypt_field(self, field, ask):
|
||||||
if not hasattr(self, field):
|
if not hasattr(self, field):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
encrypted = encrypt_field(self, field, ask=ask)
|
encrypted = encrypt_field(self, field, ask=ask)
|
||||||
if encrypted:
|
if encrypted:
|
||||||
self.inputs[field] = encrypted
|
self.inputs[field] = encrypted
|
||||||
@@ -415,12 +416,12 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
type_alias = self.credential_type.name
|
type_alias = self.credential_type.name
|
||||||
else:
|
else:
|
||||||
type_alias = self.credential_type_id
|
type_alias = self.credential_type_id
|
||||||
if self.kind == 'vault' and self.inputs.get('vault_id', None):
|
if self.kind == 'vault' and self.has_input('vault_id'):
|
||||||
if display:
|
if display:
|
||||||
fmt_str = six.text_type('{} (id={})')
|
fmt_str = six.text_type('{} (id={})')
|
||||||
else:
|
else:
|
||||||
fmt_str = six.text_type('{}_{}')
|
fmt_str = six.text_type('{}_{}')
|
||||||
return fmt_str.format(type_alias, self.inputs.get('vault_id'))
|
return fmt_str.format(type_alias, self.get_input('vault_id'))
|
||||||
return six.text_type(type_alias)
|
return six.text_type(type_alias)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@@ -430,6 +431,29 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
ret[cred.unique_hash()] = cred
|
ret[cred.unique_hash()] = cred
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
def get_input(self, field_name, **kwargs):
|
||||||
|
"""
|
||||||
|
Get an injectable and decrypted value for an input field.
|
||||||
|
|
||||||
|
Retrieves the value for a given credential input field name. Return
|
||||||
|
values for secret input fields are decrypted. If the credential doesn't
|
||||||
|
have an input value defined for the given field name, an AttributeError
|
||||||
|
is raised unless a default value is provided.
|
||||||
|
|
||||||
|
:param field_name(str): The name of the input field.
|
||||||
|
:param default(optional[str]): A default return value to use.
|
||||||
|
"""
|
||||||
|
if field_name in self.credential_type.secret_fields:
|
||||||
|
return decrypt_field(self, field_name)
|
||||||
|
if field_name in self.inputs:
|
||||||
|
return self.inputs[field_name]
|
||||||
|
if 'default' in kwargs:
|
||||||
|
return kwargs['default']
|
||||||
|
raise AttributeError(field_name)
|
||||||
|
|
||||||
|
def has_input(self, field_name):
|
||||||
|
return field_name in self.inputs and self.inputs[field_name] not in ('', None)
|
||||||
|
|
||||||
|
|
||||||
class CredentialType(CommonModelNameNotUnique):
|
class CredentialType(CommonModelNameNotUnique):
|
||||||
'''
|
'''
|
||||||
@@ -611,8 +635,9 @@ class CredentialType(CommonModelNameNotUnique):
|
|||||||
safe_namespace[field_name] = namespace[field_name] = value
|
safe_namespace[field_name] = namespace[field_name] = value
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
value = credential.get_input(field_name)
|
||||||
|
|
||||||
if field_name in self.secret_fields:
|
if field_name in self.secret_fields:
|
||||||
value = decrypt_field(credential, field_name)
|
|
||||||
safe_namespace[field_name] = '**********'
|
safe_namespace[field_name] = '**********'
|
||||||
elif len(value):
|
elif len(value):
|
||||||
safe_namespace[field_name] = value
|
safe_namespace[field_name] = value
|
||||||
|
|||||||
@@ -327,3 +327,51 @@ def test_credential_update_with_prior(organization_factory, credentialtype_ssh):
|
|||||||
assert cred.inputs['username'] == 'joe'
|
assert cred.inputs['username'] == 'joe'
|
||||||
assert cred.inputs['password'].startswith('$encrypted$')
|
assert cred.inputs['password'].startswith('$encrypted$')
|
||||||
assert decrypt_field(cred, 'password') == 'testing123'
|
assert decrypt_field(cred, 'password') == 'testing123'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_credential_get_input(organization_factory):
|
||||||
|
organization = organization_factory('test').organization
|
||||||
|
type_ = CredentialType(
|
||||||
|
kind='vault',
|
||||||
|
name='somevault',
|
||||||
|
managed_by_tower=True,
|
||||||
|
inputs={
|
||||||
|
'fields': [{
|
||||||
|
'id': 'vault_password',
|
||||||
|
'type': 'string',
|
||||||
|
'secret': True,
|
||||||
|
}, {
|
||||||
|
'id': 'vault_id',
|
||||||
|
'type': 'string',
|
||||||
|
'secret': False
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
)
|
||||||
|
type_.save()
|
||||||
|
|
||||||
|
cred = Credential(
|
||||||
|
organization=organization,
|
||||||
|
credential_type=type_,
|
||||||
|
name="Bob's Credential",
|
||||||
|
inputs={'vault_password': 'testing321'}
|
||||||
|
)
|
||||||
|
cred.save()
|
||||||
|
cred.full_clean()
|
||||||
|
|
||||||
|
assert isinstance(cred, Credential)
|
||||||
|
# verify expected exception is raised when attempting to access an unset
|
||||||
|
# input without providing a default
|
||||||
|
with pytest.raises(AttributeError):
|
||||||
|
cred.get_input('vault_id')
|
||||||
|
# verify that the provided default is used for unset inputs
|
||||||
|
assert cred.get_input('vault_id', default='foo') == 'foo'
|
||||||
|
# verify expected exception is raised when attempting to access an undefined
|
||||||
|
# input without providing a default
|
||||||
|
with pytest.raises(AttributeError):
|
||||||
|
cred.get_input('field_not_on_credential_type')
|
||||||
|
# verify that the provided default is used for undefined inputs
|
||||||
|
assert cred.get_input('field_not_on_credential_type', default='bar') == 'bar'
|
||||||
|
# verify return values for encrypted secret fields are decrypted
|
||||||
|
assert cred.inputs['vault_password'].startswith('$encrypted$')
|
||||||
|
assert cred.get_input('vault_password') == 'testing321'
|
||||||
|
|||||||
Reference in New Issue
Block a user