mirror of
https://github.com/ansible/awx.git
synced 2026-03-04 02:01:01 -03:30
@@ -409,6 +409,8 @@ def format_ssh_private_key(value):
|
|||||||
# These end in a unicode-encoded final character that gets double
|
# These end in a unicode-encoded final character that gets double
|
||||||
# escaped due to being in a Python 2 bytestring, and that causes
|
# escaped due to being in a Python 2 bytestring, and that causes
|
||||||
# Python's key parsing to barf. Detect this issue and correct it.
|
# Python's key parsing to barf. Detect this issue and correct it.
|
||||||
|
if value == '$encrypted$':
|
||||||
|
return value
|
||||||
if r'\u003d' in value:
|
if r'\u003d' in value:
|
||||||
value = value.replace(r'\u003d', '=')
|
value = value.replace(r'\u003d', '=')
|
||||||
try:
|
try:
|
||||||
@@ -505,6 +507,19 @@ class CredentialInputField(JSONSchemaField):
|
|||||||
model_instance.credential_type.managed_by_tower is True and
|
model_instance.credential_type.managed_by_tower is True and
|
||||||
'ssh_key_unlock' in model_instance.credential_type.defined_fields
|
'ssh_key_unlock' in model_instance.credential_type.defined_fields
|
||||||
):
|
):
|
||||||
|
|
||||||
|
# in order to properly test the necessity of `ssh_key_unlock`, we
|
||||||
|
# need to know the real value of `ssh_key_data`; for a payload like:
|
||||||
|
# {
|
||||||
|
# 'ssh_key_data': '$encrypted$',
|
||||||
|
# 'ssh_key_unlock': 'do-you-need-me?',
|
||||||
|
# }
|
||||||
|
# ...we have to fetch the actual key value from the database
|
||||||
|
if model_instance.pk and model_instance.ssh_key_data == '$encrypted$':
|
||||||
|
model_instance.ssh_key_data = model_instance.__class__.objects.get(
|
||||||
|
pk=model_instance.pk
|
||||||
|
).ssh_key_data
|
||||||
|
|
||||||
if model_instance.has_encrypted_ssh_key_data and not value.get('ssh_key_unlock'):
|
if model_instance.has_encrypted_ssh_key_data and not value.get('ssh_key_unlock'):
|
||||||
errors['ssh_key_unlock'] = [_('must be set when SSH key is encrypted.')]
|
errors['ssh_key_unlock'] = [_('must be set when SSH key is encrypted.')]
|
||||||
if not model_instance.has_encrypted_ssh_key_data and value.get('ssh_key_unlock'):
|
if not model_instance.has_encrypted_ssh_key_data and value.get('ssh_key_unlock'):
|
||||||
|
|||||||
@@ -1293,6 +1293,131 @@ def test_field_removal(put, organization, admin, credentialtype_ssh, version, pa
|
|||||||
assert 'password' not in cred.inputs
|
assert 'password' not in cred.inputs
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('version, params', [
|
||||||
|
['v1', {
|
||||||
|
'name': 'Best credential ever',
|
||||||
|
'kind': 'ssh',
|
||||||
|
'username': 'joe',
|
||||||
|
'ssh_key_data': '$encrypted$',
|
||||||
|
}],
|
||||||
|
['v2', {
|
||||||
|
'name': 'Best credential ever',
|
||||||
|
'credential_type': 1,
|
||||||
|
'inputs': {
|
||||||
|
'username': 'joe',
|
||||||
|
'ssh_key_data': '$encrypted$',
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
])
|
||||||
|
def test_ssh_unlock_needed(put, organization, admin, credentialtype_ssh, version, params):
|
||||||
|
cred = Credential(
|
||||||
|
credential_type=credentialtype_ssh,
|
||||||
|
name='Best credential ever',
|
||||||
|
organization=organization,
|
||||||
|
inputs={
|
||||||
|
'username': u'joe',
|
||||||
|
'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY,
|
||||||
|
'ssh_key_unlock': 'unlock'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
cred.save()
|
||||||
|
|
||||||
|
params['organization'] = organization.id
|
||||||
|
response = put(
|
||||||
|
reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}),
|
||||||
|
params,
|
||||||
|
admin
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.data['inputs']['ssh_key_unlock'] == ['must be set when SSH key is encrypted.']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('version, params', [
|
||||||
|
['v1', {
|
||||||
|
'name': 'Best credential ever',
|
||||||
|
'kind': 'ssh',
|
||||||
|
'username': 'joe',
|
||||||
|
'ssh_key_data': '$encrypted$',
|
||||||
|
'ssh_key_unlock': 'superfluous-key-unlock',
|
||||||
|
}],
|
||||||
|
['v2', {
|
||||||
|
'name': 'Best credential ever',
|
||||||
|
'credential_type': 1,
|
||||||
|
'inputs': {
|
||||||
|
'username': 'joe',
|
||||||
|
'ssh_key_data': '$encrypted$',
|
||||||
|
'ssh_key_unlock': 'superfluous-key-unlock',
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
])
|
||||||
|
def test_ssh_unlock_not_needed(put, organization, admin, credentialtype_ssh, version, params):
|
||||||
|
cred = Credential(
|
||||||
|
credential_type=credentialtype_ssh,
|
||||||
|
name='Best credential ever',
|
||||||
|
organization=organization,
|
||||||
|
inputs={
|
||||||
|
'username': u'joe',
|
||||||
|
'ssh_key_data': EXAMPLE_PRIVATE_KEY,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
cred.save()
|
||||||
|
|
||||||
|
params['organization'] = organization.id
|
||||||
|
response = put(
|
||||||
|
reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}),
|
||||||
|
params,
|
||||||
|
admin
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert response.data['inputs']['ssh_key_unlock'] == ['should not be set when SSH key is not encrypted.']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('version, params', [
|
||||||
|
['v1', {
|
||||||
|
'name': 'Best credential ever',
|
||||||
|
'kind': 'ssh',
|
||||||
|
'username': 'joe',
|
||||||
|
'ssh_key_data': '$encrypted$',
|
||||||
|
'ssh_key_unlock': 'new-unlock',
|
||||||
|
}],
|
||||||
|
['v2', {
|
||||||
|
'name': 'Best credential ever',
|
||||||
|
'credential_type': 1,
|
||||||
|
'inputs': {
|
||||||
|
'username': 'joe',
|
||||||
|
'ssh_key_data': '$encrypted$',
|
||||||
|
'ssh_key_unlock': 'new-unlock',
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
])
|
||||||
|
def test_ssh_unlock_with_prior_value(put, organization, admin, credentialtype_ssh, version, params):
|
||||||
|
cred = Credential(
|
||||||
|
credential_type=credentialtype_ssh,
|
||||||
|
name='Best credential ever',
|
||||||
|
organization=organization,
|
||||||
|
inputs={
|
||||||
|
'username': u'joe',
|
||||||
|
'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY,
|
||||||
|
'ssh_key_unlock': 'old-unlock'
|
||||||
|
}
|
||||||
|
)
|
||||||
|
cred.save()
|
||||||
|
|
||||||
|
params['organization'] = organization.id
|
||||||
|
response = put(
|
||||||
|
reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}),
|
||||||
|
params,
|
||||||
|
admin
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
cred = Credential.objects.all()[:1].get()
|
||||||
|
assert decrypt_field(cred, 'ssh_key_unlock') == 'new-unlock'
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# test secret encryption/decryption
|
# test secret encryption/decryption
|
||||||
#
|
#
|
||||||
|
|||||||
Reference in New Issue
Block a user