Set the GCE_ZONE env variable to source_regions.

Fix when an empty key is provided and add tests for validate_ssh_private_key
This commit is contained in:
Wayne Witzel III
2016-01-25 16:44:12 -05:00
parent 3982f34392
commit 58499175d6
4 changed files with 198 additions and 126 deletions

View File

@@ -165,7 +165,7 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique):
else:
ssh_key_data = self.ssh_key_data
try:
key_data = self._validate_ssh_private_key(ssh_key_data)
key_data = validate_ssh_private_key(ssh_key_data)
except ValidationError:
return False
else:
@@ -238,128 +238,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique):
raise ValidationError('Project name required for OpenStack credential.')
return project
def _validate_ssh_private_key(self, data):
"""Validate that the given SSH private key or certificate is,
in fact, valid.
"""
# Map the X in BEGIN X PRIVATE KEY to the key type (ssh-keygen -t).
# Tower jobs using OPENSSH format private keys may still fail if the
# system SSH implementation lacks support for this format.
key_types = {
'RSA': 'rsa',
'DSA': 'dsa',
'EC': 'ecdsa',
'OPENSSH': 'ed25519',
'': 'rsa1',
}
# Key properties to return if valid.
key_data = {
'key_type': None, # Key type (from above mapping).
'key_seg': '', # Key segment (all text including begin/end).
'key_b64': '', # Key data as base64.
'key_bin': '', # Key data as binary.
'key_enc': None, # Boolean, whether key is encrypted.
'cert_seg': '', # Cert segment (all text including begin/end).
'cert_b64': '', # Cert data as base64.
'cert_bin': '', # Cert data as binary.
}
data = data.strip()
validation_error = ValidationError('Invalid private key')
# Sanity check: We may potentially receive a full PEM certificate,
# and we want to accept these.
cert_begin_re = r'(-{4,})\s*BEGIN\s+CERTIFICATE\s*(-{4,})'
cert_end_re = r'(-{4,})\s*END\s+CERTIFICATE\s*(-{4,})'
cert_begin_match = re.search(cert_begin_re, data)
cert_end_match = re.search(cert_end_re, data)
if cert_begin_match and not cert_end_match:
raise validation_error
elif not cert_begin_match and cert_end_match:
raise validation_error
elif cert_begin_match and cert_end_match:
cert_dashes = set([cert_begin_match.groups()[0], cert_begin_match.groups()[1],
cert_end_match.groups()[0], cert_end_match.groups()[1]])
if len(cert_dashes) != 1:
raise validation_error
key_data['cert_seg'] = data[cert_begin_match.start():cert_end_match.end()]
# Find the private key, and also ensure that it internally matches
# itself.
# Set up the valid private key header and footer.
begin_re = r'(-{4,})\s*BEGIN\s+([A-Z0-9]+)?\s*PRIVATE\sKEY\s*(-{4,})'
end_re = r'(-{4,})\s*END\s+([A-Z0-9]+)?\s*PRIVATE\sKEY\s*(-{4,})'
begin_match = re.search(begin_re, data)
end_match = re.search(end_re, data)
if not begin_match or not end_match:
raise validation_error
# Ensure that everything, such as dash counts and key type, lines up,
# and raise an error if it does not.
dashes = set([begin_match.groups()[0], begin_match.groups()[2],
end_match.groups()[0], end_match.groups()[2]])
if len(dashes) != 1:
raise validation_error
if begin_match.groups()[1] != end_match.groups()[1]:
raise validation_error
key_type = begin_match.groups()[1]
try:
key_data['key_type'] = key_types[key_type]
except KeyError:
raise ValidationError('Invalid private key: unsupported type %s' % key_type)
# The private key data begins and ends with the private key.
key_data['key_seg'] = data[begin_match.start():end_match.end()]
# Establish that we are able to base64 decode the private key;
# if we can't, then it's not a valid key.
#
# If we got a certificate, validate that also, in the same way.
header_re = re.compile(r'^(.+?):\s*?(.+?)(\\??)$')
for segment_name in ('cert', 'key'):
segment_to_validate = key_data['%s_seg' % segment_name]
# If we have nothing; skip this one.
# We've already validated that we have a private key above,
# so we don't need to do it again.
if not segment_to_validate:
continue
# Ensure that this segment is valid base64 data.
base64_data = ''
line_continues = False
lines = segment_to_validate.splitlines()
for line in lines[1:-1]:
line = line.strip()
if not line:
continue
if line_continues:
line_continues = line.endswith('\\')
continue
line_match = header_re.match(line)
if line_match:
line_continues = line.endswith('\\')
continue
base64_data += line
try:
decoded_data = base64.b64decode(base64_data)
if not decoded_data:
raise validation_error
key_data['%s_b64' % segment_name] = base64_data
key_data['%s_bin' % segment_name] = decoded_data
except TypeError:
raise validation_error
# Determine if key is encrypted.
if key_data['key_type'] == 'ed25519':
# See https://github.com/openssh/openssh-portable/blob/master/sshkey.c#L3218
# Decoded key data starts with magic string (null-terminated), four byte
# length field, followed by the ciphername -- if ciphername is anything
# other than 'none' the key is encrypted.
key_data['key_enc'] = not bool(key_data['key_bin'].startswith('openssh-key-v1\x00\x00\x00\x00\x04none'))
else:
key_data['key_enc'] = bool('ENCRYPTED' in key_data['key_seg'])
return key_data
def clean_ssh_key_data(self):
if self.pk:
ssh_key_data = decrypt_field(self, 'ssh_key_data')
@@ -379,7 +257,7 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique):
# Validate the private key to ensure that it looks like something
# that we can accept.
self._validate_ssh_private_key(ssh_key_data)
validate_ssh_private_key(ssh_key_data)
return self.ssh_key_data # No need to return decrypted version here.
def clean_ssh_key_unlock(self):
@@ -471,3 +349,124 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique):
update_fields.append('cloud')
super(Credential, self).save(*args, **kwargs)
def validate_ssh_private_key(data):
"""Validate that the given SSH private key or certificate is,
in fact, valid.
"""
# Map the X in BEGIN X PRIVATE KEY to the key type (ssh-keygen -t).
# Tower jobs using OPENSSH format private keys may still fail if the
# system SSH implementation lacks support for this format.
key_types = {
'RSA': 'rsa',
'DSA': 'dsa',
'EC': 'ecdsa',
'OPENSSH': 'ed25519',
'': 'rsa1',
}
# Key properties to return if valid.
key_data = {
'key_type': None, # Key type (from above mapping).
'key_seg': '', # Key segment (all text including begin/end).
'key_b64': '', # Key data as base64.
'key_bin': '', # Key data as binary.
'key_enc': None, # Boolean, whether key is encrypted.
'cert_seg': '', # Cert segment (all text including begin/end).
'cert_b64': '', # Cert data as base64.
'cert_bin': '', # Cert data as binary.
}
data = data.strip()
validation_error = ValidationError('Invalid private key')
# Sanity check: We may potentially receive a full PEM certificate,
# and we want to accept these.
cert_begin_re = r'(-{4,})\s*BEGIN\s+CERTIFICATE\s*(-{4,})'
cert_end_re = r'(-{4,})\s*END\s+CERTIFICATE\s*(-{4,})'
cert_begin_match = re.search(cert_begin_re, data)
cert_end_match = re.search(cert_end_re, data)
if cert_begin_match and not cert_end_match:
raise validation_error
elif not cert_begin_match and cert_end_match:
raise validation_error
elif cert_begin_match and cert_end_match:
cert_dashes = set([cert_begin_match.groups()[0], cert_begin_match.groups()[1],
cert_end_match.groups()[0], cert_end_match.groups()[1]])
if len(cert_dashes) != 1:
raise validation_error
key_data['cert_seg'] = data[cert_begin_match.start():cert_end_match.end()]
# Find the private key, and also ensure that it internally matches
# itself.
# Set up the valid private key header and footer.
begin_re = r'(-{4,})\s*BEGIN\s+([A-Z0-9]+)?\s*PRIVATE\sKEY\s*(-{4,})'
end_re = r'(-{4,})\s*END\s+([A-Z0-9]+)?\s*PRIVATE\sKEY\s*(-{4,})'
begin_match = re.search(begin_re, data)
end_match = re.search(end_re, data)
if not begin_match or not end_match:
raise validation_error
# Ensure that everything, such as dash counts and key type, lines up,
# and raise an error if it does not.
dashes = set([begin_match.groups()[0], begin_match.groups()[2],
end_match.groups()[0], end_match.groups()[2]])
if len(dashes) != 1:
raise validation_error
if begin_match.groups()[1] != end_match.groups()[1]:
raise validation_error
key_type = begin_match.groups()[1] or ''
try:
key_data['key_type'] = key_types[key_type]
except KeyError:
raise ValidationError('Invalid private key: unsupported type %s' % key_type)
# The private key data begins and ends with the private key.
key_data['key_seg'] = data[begin_match.start():end_match.end()]
# Establish that we are able to base64 decode the private key;
# if we can't, then it's not a valid key.
#
# If we got a certificate, validate that also, in the same way.
header_re = re.compile(r'^(.+?):\s*?(.+?)(\\??)$')
for segment_name in ('cert', 'key'):
segment_to_validate = key_data['%s_seg' % segment_name]
# If we have nothing; skip this one.
# We've already validated that we have a private key above,
# so we don't need to do it again.
if not segment_to_validate:
continue
# Ensure that this segment is valid base64 data.
base64_data = ''
line_continues = False
lines = segment_to_validate.splitlines()
for line in lines[1:-1]:
line = line.strip()
if not line:
continue
if line_continues:
line_continues = line.endswith('\\')
continue
line_match = header_re.match(line)
if line_match:
line_continues = line.endswith('\\')
continue
base64_data += line
try:
decoded_data = base64.b64decode(base64_data)
if not decoded_data:
raise validation_error
key_data['%s_b64' % segment_name] = base64_data
key_data['%s_bin' % segment_name] = decoded_data
except TypeError:
raise validation_error
# Determine if key is encrypted.
if key_data['key_type'] == 'ed25519':
# See https://github.com/openssh/openssh-portable/blob/master/sshkey.c#L3218
# Decoded key data starts with magic string (null-terminated), four byte
# length field, followed by the ciphername -- if ciphername is anything
# other than 'none' the key is encrypted.
key_data['key_enc'] = not bool(key_data['key_bin'].startswith('openssh-key-v1\x00\x00\x00\x00\x04none'))
else:
key_data['key_enc'] = bool('ENCRYPTED' in key_data['key_seg'])
return key_data