From 240d629128fc2cc40f64151e32826695db2d44dc Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Mon, 12 Jun 2017 10:28:38 -0400 Subject: [PATCH] fix a bug in ssh key unlock validation see: #6553 --- awx/main/fields.py | 15 +++ .../tests/functional/api/test_credential.py | 125 ++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/awx/main/fields.py b/awx/main/fields.py index 5c06ba35d9..ab032ced84 100644 --- a/awx/main/fields.py +++ b/awx/main/fields.py @@ -409,6 +409,8 @@ def format_ssh_private_key(value): # These end in a unicode-encoded final character that gets double # escaped due to being in a Python 2 bytestring, and that causes # Python's key parsing to barf. Detect this issue and correct it. + if value == '$encrypted$': + return value if r'\u003d' in value: value = value.replace(r'\u003d', '=') try: @@ -505,6 +507,19 @@ class CredentialInputField(JSONSchemaField): model_instance.credential_type.managed_by_tower is True and 'ssh_key_unlock' in model_instance.credential_type.defined_fields ): + + # in order to properly test the necessity of `ssh_key_unlock`, we + # need to know the real value of `ssh_key_data`; for a payload like: + # { + # 'ssh_key_data': '$encrypted$', + # 'ssh_key_unlock': 'do-you-need-me?', + # } + # ...we have to fetch the actual key value from the database + if model_instance.pk and model_instance.ssh_key_data == '$encrypted$': + model_instance.ssh_key_data = model_instance.__class__.objects.get( + pk=model_instance.pk + ).ssh_key_data + if model_instance.has_encrypted_ssh_key_data and not value.get('ssh_key_unlock'): errors['ssh_key_unlock'] = [_('must be set when SSH key is encrypted.')] if not model_instance.has_encrypted_ssh_key_data and value.get('ssh_key_unlock'): diff --git a/awx/main/tests/functional/api/test_credential.py b/awx/main/tests/functional/api/test_credential.py index c552763772..be7521c4fd 100644 --- a/awx/main/tests/functional/api/test_credential.py +++ b/awx/main/tests/functional/api/test_credential.py @@ -1293,6 +1293,131 @@ def test_field_removal(put, organization, admin, credentialtype_ssh, version, pa assert 'password' not in cred.inputs +@pytest.mark.django_db +@pytest.mark.parametrize('version, params', [ + ['v1', { + 'name': 'Best credential ever', + 'kind': 'ssh', + 'username': 'joe', + 'ssh_key_data': '$encrypted$', + }], + ['v2', { + 'name': 'Best credential ever', + 'credential_type': 1, + 'inputs': { + 'username': 'joe', + 'ssh_key_data': '$encrypted$', + } + }] +]) +def test_ssh_unlock_needed(put, organization, admin, credentialtype_ssh, version, params): + cred = Credential( + credential_type=credentialtype_ssh, + name='Best credential ever', + organization=organization, + inputs={ + 'username': u'joe', + 'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY, + 'ssh_key_unlock': 'unlock' + } + ) + cred.save() + + params['organization'] = organization.id + response = put( + reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}), + params, + admin + ) + assert response.status_code == 400 + assert response.data['inputs']['ssh_key_unlock'] == ['must be set when SSH key is encrypted.'] + + +@pytest.mark.django_db +@pytest.mark.parametrize('version, params', [ + ['v1', { + 'name': 'Best credential ever', + 'kind': 'ssh', + 'username': 'joe', + 'ssh_key_data': '$encrypted$', + 'ssh_key_unlock': 'superfluous-key-unlock', + }], + ['v2', { + 'name': 'Best credential ever', + 'credential_type': 1, + 'inputs': { + 'username': 'joe', + 'ssh_key_data': '$encrypted$', + 'ssh_key_unlock': 'superfluous-key-unlock', + } + }] +]) +def test_ssh_unlock_not_needed(put, organization, admin, credentialtype_ssh, version, params): + cred = Credential( + credential_type=credentialtype_ssh, + name='Best credential ever', + organization=organization, + inputs={ + 'username': u'joe', + 'ssh_key_data': EXAMPLE_PRIVATE_KEY, + } + ) + cred.save() + + params['organization'] = organization.id + response = put( + reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}), + params, + admin + ) + assert response.status_code == 400 + assert response.data['inputs']['ssh_key_unlock'] == ['should not be set when SSH key is not encrypted.'] + + +@pytest.mark.django_db +@pytest.mark.parametrize('version, params', [ + ['v1', { + 'name': 'Best credential ever', + 'kind': 'ssh', + 'username': 'joe', + 'ssh_key_data': '$encrypted$', + 'ssh_key_unlock': 'new-unlock', + }], + ['v2', { + 'name': 'Best credential ever', + 'credential_type': 1, + 'inputs': { + 'username': 'joe', + 'ssh_key_data': '$encrypted$', + 'ssh_key_unlock': 'new-unlock', + } + }] +]) +def test_ssh_unlock_with_prior_value(put, organization, admin, credentialtype_ssh, version, params): + cred = Credential( + credential_type=credentialtype_ssh, + name='Best credential ever', + organization=organization, + inputs={ + 'username': u'joe', + 'ssh_key_data': EXAMPLE_ENCRYPTED_PRIVATE_KEY, + 'ssh_key_unlock': 'old-unlock' + } + ) + cred.save() + + params['organization'] = organization.id + response = put( + reverse('api:credential_detail', kwargs={'version': version, 'pk': cred.pk}), + params, + admin + ) + assert response.status_code == 200 + + cred = Credential.objects.all()[:1].get() + assert decrypt_field(cred, 'ssh_key_unlock') == 'new-unlock' + + # # test secret encryption/decryption #