mirror of
https://github.com/ansible/awx.git
synced 2026-02-21 05:00:07 -03:30
Merge pull request #6622 from wwitzel3/devel
Implement Fernet256 using AES-256-CBC
This commit is contained in:
@@ -4,7 +4,7 @@ import logging
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
from cryptography.fernet import Fernet, InvalidToken
|
from cryptography.fernet import Fernet, InvalidToken
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
from django.utils.encoding import smart_str
|
from django.utils.encoding import smart_str
|
||||||
|
|
||||||
|
|
||||||
@@ -13,6 +13,25 @@ __all__ = ['get_encryption_key', 'encrypt_field', 'decrypt_field', 'decrypt_valu
|
|||||||
logger = logging.getLogger('awx.main.utils.encryption')
|
logger = logging.getLogger('awx.main.utils.encryption')
|
||||||
|
|
||||||
|
|
||||||
|
class Fernet256(Fernet):
|
||||||
|
'''Not techincally Fernet, but uses the base of the Fernet spec and uses AES-256-CBC
|
||||||
|
instead of AES-128-CBC. All other functionality remain identical.
|
||||||
|
'''
|
||||||
|
def __init__(self, key, backend=None):
|
||||||
|
if backend is None:
|
||||||
|
backend = default_backend()
|
||||||
|
|
||||||
|
key = base64.urlsafe_b64decode(key)
|
||||||
|
if len(key) != 64:
|
||||||
|
raise ValueError(
|
||||||
|
"Fernet key must be 64 url-safe base64-encoded bytes."
|
||||||
|
)
|
||||||
|
|
||||||
|
self._signing_key = key[:32]
|
||||||
|
self._encryption_key = key[32:]
|
||||||
|
self._backend = backend
|
||||||
|
|
||||||
|
|
||||||
def get_encryption_key(field_name, pk=None):
|
def get_encryption_key(field_name, pk=None):
|
||||||
'''
|
'''
|
||||||
Generate key for encrypted password based on field name,
|
Generate key for encrypted password based on field name,
|
||||||
@@ -23,12 +42,12 @@ def get_encryption_key(field_name, pk=None):
|
|||||||
that is not database-persistent (like a read-only setting)
|
that is not database-persistent (like a read-only setting)
|
||||||
'''
|
'''
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
h = hashlib.sha256()
|
h = hashlib.sha512()
|
||||||
h.update(settings.SECRET_KEY)
|
h.update(settings.SECRET_KEY)
|
||||||
if pk is not None:
|
if pk is not None:
|
||||||
h.update(str(pk))
|
h.update(str(pk))
|
||||||
h.update(field_name)
|
h.update(field_name)
|
||||||
return base64.b64encode(h.digest())
|
return base64.urlsafe_b64encode(h.digest())
|
||||||
|
|
||||||
|
|
||||||
def encrypt_field(instance, field_name, ask=False, subfield=None, skip_utf8=False):
|
def encrypt_field(instance, field_name, ask=False, subfield=None, skip_utf8=False):
|
||||||
@@ -46,7 +65,7 @@ def encrypt_field(instance, field_name, ask=False, subfield=None, skip_utf8=Fals
|
|||||||
utf8 = type(value) == six.text_type
|
utf8 = type(value) == six.text_type
|
||||||
value = smart_str(value)
|
value = smart_str(value)
|
||||||
key = get_encryption_key(field_name, getattr(instance, 'pk', None))
|
key = get_encryption_key(field_name, getattr(instance, 'pk', None))
|
||||||
f = Fernet(key)
|
f = Fernet256(key)
|
||||||
encrypted = f.encrypt(value)
|
encrypted = f.encrypt(value)
|
||||||
b64data = base64.b64encode(encrypted)
|
b64data = base64.b64encode(encrypted)
|
||||||
tokens = ['$encrypted', 'AESCBC', b64data]
|
tokens = ['$encrypted', 'AESCBC', b64data]
|
||||||
@@ -67,7 +86,7 @@ def decrypt_value(encryption_key, value):
|
|||||||
if algo != 'AESCBC':
|
if algo != 'AESCBC':
|
||||||
raise ValueError('unsupported algorithm: %s' % algo)
|
raise ValueError('unsupported algorithm: %s' % algo)
|
||||||
encrypted = base64.b64decode(b64data)
|
encrypted = base64.b64decode(b64data)
|
||||||
f = Fernet(encryption_key)
|
f = Fernet256(encryption_key)
|
||||||
value = f.decrypt(encrypted)
|
value = f.decrypt(encrypted)
|
||||||
# If the encrypted string contained a UTF8 marker, decode the data
|
# If the encrypted string contained a UTF8 marker, decode the data
|
||||||
if utf8:
|
if utf8:
|
||||||
|
|||||||
Reference in New Issue
Block a user