mirror of
https://github.com/ansible/awx.git
synced 2026-04-14 14:39:26 -02:30
Merge pull request #5301 from ryanpetrello/fix-5276
fix several issues that are preventing usage of unicode as CTinT values
This commit is contained in:
@@ -6,6 +6,8 @@ import sys
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings, UserSettingsHolder
|
from django.conf import settings, UserSettingsHolder
|
||||||
from django.core.cache import cache as django_cache
|
from django.core.cache import cache as django_cache
|
||||||
@@ -88,7 +90,17 @@ class EncryptedCacheProxy(object):
|
|||||||
|
|
||||||
def get(self, key, **kwargs):
|
def get(self, key, **kwargs):
|
||||||
value = self.cache.get(key, **kwargs)
|
value = self.cache.get(key, **kwargs)
|
||||||
return self._handle_encryption(self.decrypter, key, value)
|
value = self._handle_encryption(self.decrypter, key, value)
|
||||||
|
|
||||||
|
# python-memcached auto-encodes unicode on cache set in python2
|
||||||
|
# https://github.com/linsomniac/python-memcached/issues/79
|
||||||
|
# https://github.com/linsomniac/python-memcached/blob/288c159720eebcdf667727a859ef341f1e908308/memcache.py#L961
|
||||||
|
if six.PY2 and isinstance(value, six.binary_type):
|
||||||
|
try:
|
||||||
|
six.text_type(value)
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
value = value.decode('utf-8')
|
||||||
|
return value
|
||||||
|
|
||||||
def set(self, key, value, **kwargs):
|
def set(self, key, value, **kwargs):
|
||||||
self.cache.set(
|
self.cache.set(
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
# Copyright (c) 2017 Ansible, Inc.
|
# Copyright (c) 2017 Ansible, Inc.
|
||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
|
|
||||||
@@ -10,6 +12,7 @@ from django.core.cache.backends.locmem import LocMemCache
|
|||||||
from django.core.exceptions import ImproperlyConfigured
|
from django.core.exceptions import ImproperlyConfigured
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
import pytest
|
import pytest
|
||||||
|
import six
|
||||||
|
|
||||||
from awx.conf import models, fields
|
from awx.conf import models, fields
|
||||||
from awx.conf.settings import SettingsWrapper, EncryptedCacheProxy, SETTING_CACHE_NOTSET
|
from awx.conf.settings import SettingsWrapper, EncryptedCacheProxy, SETTING_CACHE_NOTSET
|
||||||
@@ -60,6 +63,15 @@ def test_unregistered_setting(settings):
|
|||||||
assert settings.cache.get('DEBUG') is None
|
assert settings.cache.get('DEBUG') is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_cached_settings_unicode_is_auto_decoded(settings):
|
||||||
|
# https://github.com/linsomniac/python-memcached/issues/79
|
||||||
|
# https://github.com/linsomniac/python-memcached/blob/288c159720eebcdf667727a859ef341f1e908308/memcache.py#L961
|
||||||
|
|
||||||
|
value = six.u('Iñtërnâtiônàlizætiøn').encode('utf-8') # this simulates what python-memcached does on cache.set()
|
||||||
|
settings.cache.set('DEBUG', value)
|
||||||
|
assert settings.cache.get('DEBUG') == six.u('Iñtërnâtiônàlizætiøn')
|
||||||
|
|
||||||
|
|
||||||
def test_read_only_setting(settings):
|
def test_read_only_setting(settings):
|
||||||
settings.registry.register(
|
settings.registry.register(
|
||||||
'AWX_READ_ONLY',
|
'AWX_READ_ONLY',
|
||||||
@@ -239,6 +251,31 @@ def test_setting_from_db(settings, mocker):
|
|||||||
assert settings.cache.get('AWX_SOME_SETTING') == 'FROM_DB'
|
assert settings.cache.get('AWX_SOME_SETTING') == 'FROM_DB'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('encrypted', (True, False))
|
||||||
|
def test_setting_from_db_with_unicode(settings, mocker, encrypted):
|
||||||
|
settings.registry.register(
|
||||||
|
'AWX_SOME_SETTING',
|
||||||
|
field_class=fields.CharField,
|
||||||
|
category=_('System'),
|
||||||
|
category_slug='system',
|
||||||
|
default='DEFAULT',
|
||||||
|
encrypted=encrypted
|
||||||
|
)
|
||||||
|
# this simulates a bug in python-memcached; see https://github.com/linsomniac/python-memcached/issues/79
|
||||||
|
value = six.u('Iñtërnâtiônàlizætiøn').encode('utf-8')
|
||||||
|
|
||||||
|
setting_from_db = mocker.Mock(key='AWX_SOME_SETTING', value=value)
|
||||||
|
mocks = mocker.Mock(**{
|
||||||
|
'order_by.return_value': mocker.Mock(**{
|
||||||
|
'__iter__': lambda self: iter([setting_from_db]),
|
||||||
|
'first.return_value': setting_from_db
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
with mocker.patch('awx.conf.models.Setting.objects.filter', return_value=mocks):
|
||||||
|
assert settings.AWX_SOME_SETTING == six.u('Iñtërnâtiônàlizætiøn')
|
||||||
|
assert settings.cache.get('AWX_SOME_SETTING') == six.u('Iñtërnâtiônàlizætiøn')
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.defined_in_file(AWX_SOME_SETTING='DEFAULT')
|
@pytest.mark.defined_in_file(AWX_SOME_SETTING='DEFAULT')
|
||||||
def test_read_only_setting_assignment(settings):
|
def test_read_only_setting_assignment(settings):
|
||||||
"read-only settings cannot be overwritten"
|
"read-only settings cannot be overwritten"
|
||||||
|
|||||||
@@ -258,7 +258,8 @@ register(
|
|||||||
register(
|
register(
|
||||||
'LOG_AGGREGATOR_USERNAME',
|
'LOG_AGGREGATOR_USERNAME',
|
||||||
field_class=fields.CharField,
|
field_class=fields.CharField,
|
||||||
allow_null=True,
|
allow_blank=True,
|
||||||
|
default='',
|
||||||
label=_('Logging Aggregator Username'),
|
label=_('Logging Aggregator Username'),
|
||||||
help_text=_('Username for external log aggregator (if required).'),
|
help_text=_('Username for external log aggregator (if required).'),
|
||||||
category=_('Logging'),
|
category=_('Logging'),
|
||||||
@@ -268,7 +269,8 @@ register(
|
|||||||
register(
|
register(
|
||||||
'LOG_AGGREGATOR_PASSWORD',
|
'LOG_AGGREGATOR_PASSWORD',
|
||||||
field_class=fields.CharField,
|
field_class=fields.CharField,
|
||||||
allow_null=True,
|
allow_blank=True,
|
||||||
|
default='',
|
||||||
encrypted=True,
|
encrypted=True,
|
||||||
label=_('Logging Aggregator Password/Token'),
|
label=_('Logging Aggregator Password/Token'),
|
||||||
help_text=_('Password or authentication token for external log aggregator (if required).'),
|
help_text=_('Password or authentication token for external log aggregator (if required).'),
|
||||||
|
|||||||
@@ -1,24 +1,37 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright (c) 2017 Ansible, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
|
||||||
from awx.conf.models import Setting
|
from awx.conf.models import Setting
|
||||||
from awx.main.utils import common
|
from awx.main.utils import common
|
||||||
|
|
||||||
|
|
||||||
def test_encrypt_field():
|
def test_encrypt_field():
|
||||||
field = Setting(pk=123, value='ANSIBLE')
|
field = Setting(pk=123, value='ANSIBLE')
|
||||||
encrypted = common.encrypt_field(field, 'value')
|
encrypted = field.value = common.encrypt_field(field, 'value')
|
||||||
assert encrypted == '$encrypted$AES$Ey83gcmMuBBT1OEq2lepnw=='
|
assert encrypted == '$encrypted$AES$Ey83gcmMuBBT1OEq2lepnw=='
|
||||||
assert common.decrypt_field(field, 'value') == 'ANSIBLE'
|
assert common.decrypt_field(field, 'value') == 'ANSIBLE'
|
||||||
|
|
||||||
|
|
||||||
def test_encrypt_field_without_pk():
|
def test_encrypt_field_without_pk():
|
||||||
field = Setting(value='ANSIBLE')
|
field = Setting(value='ANSIBLE')
|
||||||
encrypted = common.encrypt_field(field, 'value')
|
encrypted = field.value = common.encrypt_field(field, 'value')
|
||||||
assert encrypted == '$encrypted$AES$8uIzEoGyY6QJwoTWbMFGhw=='
|
assert encrypted == '$encrypted$AES$8uIzEoGyY6QJwoTWbMFGhw=='
|
||||||
assert common.decrypt_field(field, 'value') == 'ANSIBLE'
|
assert common.decrypt_field(field, 'value') == 'ANSIBLE'
|
||||||
|
|
||||||
|
|
||||||
|
def test_encrypt_field_with_unicode_string():
|
||||||
|
value = u'Iñtërnâtiônàlizætiøn'
|
||||||
|
field = Setting(value=value)
|
||||||
|
encrypted = field.value = common.encrypt_field(field, 'value')
|
||||||
|
assert encrypted == '$encrypted$UTF8$AES$AESQbqOefpYcLC7x8yZ2aWG4FlXlS66JgavLbDp/DSM='
|
||||||
|
assert common.decrypt_field(field, 'value') == value
|
||||||
|
|
||||||
|
|
||||||
def test_encrypt_subfield():
|
def test_encrypt_subfield():
|
||||||
field = Setting(value={'name': 'ANSIBLE'})
|
field = Setting(value={'name': 'ANSIBLE'})
|
||||||
encrypted = common.encrypt_field(field, 'value', subfield='name')
|
encrypted = field.value = common.encrypt_field(field, 'value', subfield='name')
|
||||||
assert encrypted == '$encrypted$AES$8uIzEoGyY6QJwoTWbMFGhw=='
|
assert encrypted == '$encrypted$AES$8uIzEoGyY6QJwoTWbMFGhw=='
|
||||||
assert common.decrypt_field(field, 'value', subfield='name') == 'ANSIBLE'
|
assert common.decrypt_field(field, 'value', subfield='name') == 'ANSIBLE'
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,8 @@ import tempfile
|
|||||||
# Decorator
|
# Decorator
|
||||||
from decorator import decorator
|
from decorator import decorator
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
from django.db.models import ManyToManyField
|
from django.db.models import ManyToManyField
|
||||||
@@ -190,6 +192,7 @@ def encrypt_field(instance, field_name, ask=False, subfield=None):
|
|||||||
value = value[subfield]
|
value = value[subfield]
|
||||||
if not value or value.startswith('$encrypted$') or (ask and value == 'ASK'):
|
if not value or value.startswith('$encrypted$') or (ask and value == 'ASK'):
|
||||||
return value
|
return value
|
||||||
|
utf8 = type(value) == six.text_type
|
||||||
value = smart_str(value)
|
value = smart_str(value)
|
||||||
key = get_encryption_key(field_name, getattr(instance, 'pk', None))
|
key = get_encryption_key(field_name, getattr(instance, 'pk', None))
|
||||||
cipher = AES.new(key, AES.MODE_ECB)
|
cipher = AES.new(key, AES.MODE_ECB)
|
||||||
@@ -197,17 +200,31 @@ def encrypt_field(instance, field_name, ask=False, subfield=None):
|
|||||||
value += '\x00'
|
value += '\x00'
|
||||||
encrypted = cipher.encrypt(value)
|
encrypted = cipher.encrypt(value)
|
||||||
b64data = base64.b64encode(encrypted)
|
b64data = base64.b64encode(encrypted)
|
||||||
return '$encrypted$%s$%s' % ('AES', b64data)
|
tokens = ['$encrypted', 'AES', b64data]
|
||||||
|
if utf8:
|
||||||
|
# If the value to encrypt is utf-8, we need to add a marker so we
|
||||||
|
# know to decode the data when it's decrypted later
|
||||||
|
tokens.insert(1, 'UTF8')
|
||||||
|
return '$'.join(tokens)
|
||||||
|
|
||||||
|
|
||||||
def decrypt_value(encryption_key, value):
|
def decrypt_value(encryption_key, value):
|
||||||
algo, b64data = value[len('$encrypted$'):].split('$', 1)
|
raw_data = value[len('$encrypted$'):]
|
||||||
|
# If the encrypted string contains a UTF8 marker, discard it
|
||||||
|
utf8 = raw_data.startswith('UTF8$')
|
||||||
|
if utf8:
|
||||||
|
raw_data = raw_data[len('UTF8$'):]
|
||||||
|
algo, b64data = raw_data.split('$', 1)
|
||||||
if algo != 'AES':
|
if algo != 'AES':
|
||||||
raise ValueError('unsupported algorithm: %s' % algo)
|
raise ValueError('unsupported algorithm: %s' % algo)
|
||||||
encrypted = base64.b64decode(b64data)
|
encrypted = base64.b64decode(b64data)
|
||||||
cipher = AES.new(encryption_key, AES.MODE_ECB)
|
cipher = AES.new(encryption_key, AES.MODE_ECB)
|
||||||
value = cipher.decrypt(encrypted)
|
value = cipher.decrypt(encrypted)
|
||||||
return value.rstrip('\x00')
|
value = value.rstrip('\x00')
|
||||||
|
# If the encrypted string contained a UTF8 marker, decode the data
|
||||||
|
if utf8:
|
||||||
|
value = value.decode('utf-8')
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
def decrypt_field(instance, field_name, subfield=None):
|
def decrypt_field(instance, field_name, subfield=None):
|
||||||
|
|||||||
Reference in New Issue
Block a user