mirror of
https://github.com/ansible/awx.git
synced 2026-02-25 23:16:01 -03:30
refactor and fix ssh_private_key and ssh_key_unlock validation
`clean_ssh_key_data` and `clean_ssh_key_unlock` no longer work because they're not actual fields on `model.Credential` anymore. This change refactors/moves their validation to a place that works (and makes more sense).
This commit is contained in:
@@ -28,7 +28,8 @@ from django.utils.encoding import smart_text
|
|||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
|
|
||||||
# jsonschema
|
# jsonschema
|
||||||
from jsonschema import Draft4Validator
|
from jsonschema import Draft4Validator, FormatChecker
|
||||||
|
import jsonschema.exceptions
|
||||||
|
|
||||||
# Django-JSONField
|
# Django-JSONField
|
||||||
from jsonfield import JSONField as upstream_JSONField
|
from jsonfield import JSONField as upstream_JSONField
|
||||||
@@ -36,6 +37,7 @@ from jsonbfield.fields import JSONField as upstream_JSONBField
|
|||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.utils.filters import DynamicFilter
|
from awx.main.utils.filters import DynamicFilter
|
||||||
|
from awx.main.validators import validate_ssh_private_key
|
||||||
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
|
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
|
||||||
from awx.main import utils
|
from awx.main import utils
|
||||||
|
|
||||||
@@ -349,6 +351,8 @@ class JSONSchemaField(JSONBField):
|
|||||||
defining `self.schema`.
|
defining `self.schema`.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
format_checker = FormatChecker()
|
||||||
|
|
||||||
# If an empty {} is provided, we still want to perform this schema
|
# If an empty {} is provided, we still want to perform this schema
|
||||||
# validation
|
# validation
|
||||||
empty_values=(None, '')
|
empty_values=(None, '')
|
||||||
@@ -362,7 +366,10 @@ class JSONSchemaField(JSONBField):
|
|||||||
def validate(self, value, model_instance):
|
def validate(self, value, model_instance):
|
||||||
super(JSONSchemaField, self).validate(value, model_instance)
|
super(JSONSchemaField, self).validate(value, model_instance)
|
||||||
errors = []
|
errors = []
|
||||||
for error in Draft4Validator(self.schema(model_instance)).iter_errors(value):
|
for error in Draft4Validator(
|
||||||
|
self.schema(model_instance),
|
||||||
|
format_checker=self.format_checker
|
||||||
|
).iter_errors(value):
|
||||||
if error.validator == 'pattern' and 'error' in error.schema:
|
if error.validator == 'pattern' and 'error' in error.schema:
|
||||||
error.message = error.schema['error'] % error.instance
|
error.message = error.schema['error'] % error.instance
|
||||||
errors.append(error)
|
errors.append(error)
|
||||||
@@ -390,6 +397,24 @@ class JSONSchemaField(JSONBField):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
@JSONSchemaField.format_checker.checks('ssh_private_key')
|
||||||
|
def format_ssh_private_key(value):
|
||||||
|
# Sanity check: GCE, in particular, provides JSON-encoded private
|
||||||
|
# keys, which developers will be tempted to copy and paste rather
|
||||||
|
# than JSON decode.
|
||||||
|
#
|
||||||
|
# These end in a unicode-encoded final character that gets double
|
||||||
|
# escaped due to being in a Python 2 bytestring, and that causes
|
||||||
|
# Python's key parsing to barf. Detect this issue and correct it.
|
||||||
|
if r'\u003d' in value:
|
||||||
|
value = value.replace(r'\u003d', '=')
|
||||||
|
try:
|
||||||
|
validate_ssh_private_key(value)
|
||||||
|
except jsonschema.exceptions.ValidationError as e:
|
||||||
|
raise jsonschema.exceptions.FormatError(e.message)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
class CredentialInputField(JSONSchemaField):
|
class CredentialInputField(JSONSchemaField):
|
||||||
"""
|
"""
|
||||||
Used to validate JSON for
|
Used to validate JSON for
|
||||||
@@ -428,8 +453,21 @@ class CredentialInputField(JSONSchemaField):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def validate(self, value, model_instance):
|
def validate(self, value, model_instance):
|
||||||
|
# decrypt secret values so we can validate their contents (i.e.,
|
||||||
|
# ssh_key_data format)
|
||||||
|
decrypted_values = {}
|
||||||
|
for k, v in value.items():
|
||||||
|
if all([
|
||||||
|
k in model_instance.credential_type.secret_fields,
|
||||||
|
v != '$encrypted$',
|
||||||
|
model_instance.pk
|
||||||
|
]):
|
||||||
|
decrypted_values[k] = utils.common.decrypt_field(model_instance, k)
|
||||||
|
else:
|
||||||
|
decrypted_values[k] = v
|
||||||
|
|
||||||
super(CredentialInputField, self).validate(
|
super(CredentialInputField, self).validate(
|
||||||
value, model_instance
|
decrypted_values, model_instance
|
||||||
)
|
)
|
||||||
|
|
||||||
errors = []
|
errors = []
|
||||||
@@ -442,6 +480,18 @@ class CredentialInputField(JSONSchemaField):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# `ssh_key_unlock` requirements are very specific and can't be
|
||||||
|
# represented without complicated JSON schema
|
||||||
|
if model_instance.credential_type.kind == 'ssh':
|
||||||
|
if model_instance.has_encrypted_ssh_key_data and not value.get('ssh_key_unlock'):
|
||||||
|
errors.append(
|
||||||
|
_('SSH key unlock must be set when SSH key is encrypted.')
|
||||||
|
)
|
||||||
|
if not model_instance.has_encrypted_ssh_key_data and value.get('ssh_key_unlock'):
|
||||||
|
errors.append(
|
||||||
|
_('SSH key unlock should not be set when SSH key is not encrypted.')
|
||||||
|
)
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
raise django_exceptions.ValidationError(
|
raise django_exceptions.ValidationError(
|
||||||
errors,
|
errors,
|
||||||
@@ -466,7 +516,8 @@ class CredentialTypeInputField(JSONSchemaField):
|
|||||||
'items': {
|
'items': {
|
||||||
'type': 'object',
|
'type': 'object',
|
||||||
'properties': {
|
'properties': {
|
||||||
'type': {'enum': ['string', 'number', 'ssh_private_key']},
|
'type': {'enum': ['string', 'number']},
|
||||||
|
'format': {'enum': ['ssh_private_key']},
|
||||||
'choices': {
|
'choices': {
|
||||||
'type': 'array',
|
'type': 'array',
|
||||||
'minItems': 1,
|
'minItems': 1,
|
||||||
|
|||||||
@@ -275,8 +275,8 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def kind(self):
|
def kind(self):
|
||||||
# TODO: remove the need for this helper property by removing its usage
|
# TODO 3.3: remove the need for this helper property by removing its
|
||||||
# throughout the codebase
|
# usage throughout the codebase
|
||||||
type_ = self.credential_type
|
type_ = self.credential_type
|
||||||
if type_.kind != 'cloud':
|
if type_.kind != 'cloud':
|
||||||
return type_.kind
|
return type_.kind
|
||||||
@@ -300,7 +300,7 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
#
|
#
|
||||||
@property
|
@property
|
||||||
def needs_ssh_password(self):
|
def needs_ssh_password(self):
|
||||||
return self.kind == 'ssh' and self.password == 'ASK'
|
return self.credential_type.kind == 'ssh' and self.password == 'ASK'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def has_encrypted_ssh_key_data(self):
|
def has_encrypted_ssh_key_data(self):
|
||||||
@@ -319,17 +319,17 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def needs_ssh_key_unlock(self):
|
def needs_ssh_key_unlock(self):
|
||||||
if self.kind == 'ssh' and self.ssh_key_unlock in ('ASK', ''):
|
if self.credential_type.kind == 'ssh' and self.ssh_key_unlock in ('ASK', ''):
|
||||||
return self.has_encrypted_ssh_key_data
|
return self.has_encrypted_ssh_key_data
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def needs_become_password(self):
|
def needs_become_password(self):
|
||||||
return self.kind == 'ssh' and self.become_password == 'ASK'
|
return self.credential_type.kind == 'ssh' and self.become_password == 'ASK'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def needs_vault_password(self):
|
def needs_vault_password(self):
|
||||||
return self.kind == 'vault' and self.vault_password == 'ASK'
|
return self.credential_type.kind == 'vault' and self.vault_password == 'ASK'
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def passwords_needed(self):
|
def passwords_needed(self):
|
||||||
@@ -339,41 +339,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
needed.append(field)
|
needed.append(field)
|
||||||
return needed
|
return needed
|
||||||
|
|
||||||
#
|
|
||||||
# TODO: all of these required fields should be captured in schema
|
|
||||||
# definitions and these clean_ methods should be removed
|
|
||||||
#
|
|
||||||
def clean_ssh_key_data(self):
|
|
||||||
if self.pk:
|
|
||||||
ssh_key_data = decrypt_field(self, 'ssh_key_data')
|
|
||||||
else:
|
|
||||||
ssh_key_data = self.ssh_key_data
|
|
||||||
if ssh_key_data:
|
|
||||||
# Sanity check: GCE, in particular, provides JSON-encoded private
|
|
||||||
# keys, which developers will be tempted to copy and paste rather
|
|
||||||
# than JSON decode.
|
|
||||||
#
|
|
||||||
# These end in a unicode-encoded final character that gets double
|
|
||||||
# escaped due to being in a Python 2 bytestring, and that causes
|
|
||||||
# Python's key parsing to barf. Detect this issue and correct it.
|
|
||||||
if r'\u003d' in ssh_key_data:
|
|
||||||
ssh_key_data = ssh_key_data.replace(r'\u003d', '=')
|
|
||||||
self.ssh_key_data = ssh_key_data
|
|
||||||
|
|
||||||
# Validate the private key to ensure that it looks like something
|
|
||||||
# that we can accept.
|
|
||||||
validate_ssh_private_key(ssh_key_data)
|
|
||||||
return self.ssh_key_data # No need to return decrypted version here.
|
|
||||||
|
|
||||||
def clean_ssh_key_unlock(self):
|
|
||||||
if self.has_encrypted_ssh_key_data and not self.ssh_key_unlock:
|
|
||||||
raise ValidationError(_('SSH key unlock must be set when SSH key '
|
|
||||||
'is encrypted.'))
|
|
||||||
if not self.has_encrypted_ssh_key_data and self.ssh_key_unlock:
|
|
||||||
raise ValidationError(_('SSH key unlock should not be set when '
|
|
||||||
'SSH key is not encrypted.'))
|
|
||||||
return self.ssh_key_unlock
|
|
||||||
|
|
||||||
def _password_field_allows_ask(self, field):
|
def _password_field_allows_ask(self, field):
|
||||||
return field in self.credential_type.askable_fields
|
return field in self.credential_type.askable_fields
|
||||||
|
|
||||||
@@ -621,6 +586,7 @@ def ssh(cls):
|
|||||||
'id': 'ssh_key_data',
|
'id': 'ssh_key_data',
|
||||||
'label': 'SSH Private Key',
|
'label': 'SSH Private Key',
|
||||||
'type': 'string',
|
'type': 'string',
|
||||||
|
'format': 'ssh_private_key',
|
||||||
'secret': True,
|
'secret': True,
|
||||||
'multiline': True
|
'multiline': True
|
||||||
}, {
|
}, {
|
||||||
|
|||||||
@@ -7,6 +7,9 @@ from django.core.exceptions import ValidationError
|
|||||||
from awx.main.utils.common import decrypt_field
|
from awx.main.utils.common import decrypt_field
|
||||||
from awx.main.models import Credential, CredentialType
|
from awx.main.models import Credential, CredentialType
|
||||||
|
|
||||||
|
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'
|
||||||
|
EXAMPLE_ENCRYPTED_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nProc-Type: 4,ENCRYPTED\nxyz==\n-----END PRIVATE KEY-----'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_default_cred_types():
|
def test_default_cred_types():
|
||||||
@@ -60,7 +63,7 @@ def test_cloud_kind_uniqueness():
|
|||||||
({'fields': [{'id': 'username', 'label': 'Username'}, {'id': 'username', 'label': 'Username 2'}]}, False), # noqa
|
({'fields': [{'id': 'username', 'label': 'Username'}, {'id': 'username', 'label': 'Username 2'}]}, False), # noqa
|
||||||
({'fields': [{'id': '$invalid$', 'label': 'Invalid'}]}, False), # noqa
|
({'fields': [{'id': '$invalid$', 'label': 'Invalid'}]}, False), # noqa
|
||||||
({'fields': [{'id': 'password', 'label': 'Password', 'type': 'number'}]}, True),
|
({'fields': [{'id': 'password', 'label': 'Password', 'type': 'number'}]}, True),
|
||||||
({'fields': [{'id': 'ssh_key', 'label': 'SSH Key', 'type': 'ssh_private_key'}]}, True), # noqa
|
({'fields': [{'id': 'ssh_key', 'label': 'SSH Key', 'type': 'string', 'format': 'ssh_private_key'}]}, True), # noqa
|
||||||
({'fields': [{'id': 'other', 'label': 'Other', 'type': 'boolean'}]}, False),
|
({'fields': [{'id': 'other', 'label': 'Other', 'type': 'boolean'}]}, False),
|
||||||
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': True}]}, True),
|
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': True}]}, True),
|
||||||
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': 'bad'}]}, False), # noqa
|
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': 'bad'}]}, False), # noqa
|
||||||
@@ -181,6 +184,37 @@ def test_credential_creation_validation_failure(organization_factory):
|
|||||||
cred.full_clean()
|
cred.full_clean()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('ssh_key_data, ssh_key_unlock, valid', [
|
||||||
|
[EXAMPLE_PRIVATE_KEY, None, True], # unencrypted key, no unlock pass
|
||||||
|
[EXAMPLE_PRIVATE_KEY, 'super-secret', False], # unencrypted key, unlock pass
|
||||||
|
[EXAMPLE_ENCRYPTED_PRIVATE_KEY, 'super-secret', True], # encrypted key, unlock pass
|
||||||
|
[EXAMPLE_ENCRYPTED_PRIVATE_KEY, None, False], # encrypted key, no unlock pass
|
||||||
|
[None, None, True], # no key, no unlock pass
|
||||||
|
[None, 'super-secret', False], # no key, unlock pass
|
||||||
|
['INVALID-KEY-DATA', None, False], # invalid key data
|
||||||
|
[EXAMPLE_PRIVATE_KEY.replace('=', '\u003d'), None, True], # automatically fix JSON-encoded GCE keys
|
||||||
|
])
|
||||||
|
def test_ssh_key_data_validation(credentialtype_ssh, organization, ssh_key_data, ssh_key_unlock, valid):
|
||||||
|
inputs = {}
|
||||||
|
if ssh_key_data:
|
||||||
|
inputs['ssh_key_data'] = ssh_key_data
|
||||||
|
if ssh_key_unlock:
|
||||||
|
inputs['ssh_key_unlock'] = ssh_key_unlock
|
||||||
|
cred = Credential(
|
||||||
|
credential_type=credentialtype_ssh,
|
||||||
|
name="Best credential ever",
|
||||||
|
inputs=inputs,
|
||||||
|
organization=organization
|
||||||
|
)
|
||||||
|
cred.save()
|
||||||
|
if valid:
|
||||||
|
cred.full_clean()
|
||||||
|
else:
|
||||||
|
with pytest.raises(ValidationError):
|
||||||
|
cred.full_clean()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_credential_encryption(organization_factory, credentialtype_ssh):
|
def test_credential_encryption(organization_factory, credentialtype_ssh):
|
||||||
org = organization_factory('test').organization
|
org = organization_factory('test').organization
|
||||||
|
|||||||
@@ -126,7 +126,11 @@ ordered fields for that type:
|
|||||||
|
|
||||||
"help_text": "User-facing short text describing the field.",
|
"help_text": "User-facing short text describing the field.",
|
||||||
|
|
||||||
"type": ("string" | "number" | "ssh_private_key") # required,
|
"type": ("string" | "number") # required,
|
||||||
|
|
||||||
|
"format": "ssh_private_key" # optional, can be used to enforce data
|
||||||
|
# format validity for SSH private keys
|
||||||
|
# data
|
||||||
|
|
||||||
"secret": true, # if true, the field will be treated
|
"secret": true, # if true, the field will be treated
|
||||||
# as sensitive and stored encrypted
|
# as sensitive and stored encrypted
|
||||||
|
|||||||
Reference in New Issue
Block a user