Merge pull request #6541 from wwitzel3/issue-826

Re-Encrypt all of our existing encrypted fields.
This commit is contained in:
Ryan Petrello
2017-06-13 09:42:56 -04:00
committed by GitHub
22 changed files with 493 additions and 204 deletions

View File

@@ -2,7 +2,7 @@ import mock # noqa
import pytest
from awx.main.models.credential import Credential, CredentialType
from awx.main.utils.common import decrypt_field
from awx.main.utils import decrypt_field
from awx.api.versioning import reverse
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'

View File

@@ -4,7 +4,7 @@
import pytest
from django.core.exceptions import ValidationError
from awx.main.utils.common import decrypt_field
from awx.main.utils import decrypt_field
from awx.main.models import Credential, CredentialType
from rest_framework import serializers

View File

@@ -7,7 +7,7 @@ from django.apps import apps
from awx.main.models import Credential, CredentialType
from awx.main.migrations._credentialtypes import migrate_to_v2_credentials
from awx.main.utils.common import decrypt_field
from awx.main.utils import decrypt_field
from awx.main.migrations._credentialtypes import _disassociate_non_insights_projects
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'
@@ -319,7 +319,7 @@ def test_insights_migration():
'username': 'bob',
'password': 'some-password',
})
assert cred.credential_type.name == 'Insights Basic Auth'
assert cred.inputs['username'] == 'bob'
assert cred.inputs['password'].startswith('$encrypted$')

View File

@@ -0,0 +1,82 @@
import json
import pytest
import mock
from django.apps import apps
from awx.main.models import (
UnifiedJob,
NotificationTemplate,
Credential,
)
from awx.main.models.credential import ssh
from awx.conf.migrations._reencrypt import encrypt_field
from awx.main.migrations._reencrypt import (
_notification_templates,
_credentials,
_unified_jobs,
)
from awx.main.utils import decrypt_field
@pytest.mark.django_db
def test_notification_template_migration():
with mock.patch('awx.main.models.notifications.encrypt_field', encrypt_field):
nt = NotificationTemplate.objects.create(notification_type='slack', notification_configuration=dict(token='test'))
assert nt.notification_configuration['token'].startswith('$encrypted$AES$')
_notification_templates(apps)
nt.refresh_from_db()
assert nt.notification_configuration['token'].startswith('$encrypted$AESCBC$')
assert decrypt_field(nt, 'notification_configuration', subfield='token') == 'test'
# This is here for a side-effect.
# Exception if the encryption type of AESCBC is not properly skipped, ensures
# our `startswith` calls don't have typos
_notification_templates(apps)
@pytest.mark.django_db
def test_credential_migration():
with mock.patch('awx.main.models.credential.encrypt_field', encrypt_field):
cred_type = ssh()
cred_type.save()
cred = Credential.objects.create(credential_type=cred_type, inputs=dict(password='test'))
assert cred.password.startswith('$encrypted$AES$')
_credentials(apps)
cred.refresh_from_db()
assert cred.password.startswith('$encrypted$AESCBC$')
assert decrypt_field(cred, 'password') == 'test'
# This is here for a side-effect.
# Exception if the encryption type of AESCBC is not properly skipped, ensures
# our `startswith` calls don't have typos
_credentials(apps)
@pytest.mark.django_db
def test_unified_job_migration():
with mock.patch('awx.main.models.base.encrypt_field', encrypt_field):
uj = UnifiedJob.objects.create(launch_type='manual', start_args=json.dumps({'test':'value'}))
assert uj.start_args.startswith('$encrypted$AES$')
_unified_jobs(apps)
uj.refresh_from_db()
assert uj.start_args.startswith('$encrypted$AESCBC$')
assert json.loads(decrypt_field(uj, 'start_args')) == {'test':'value'}
# This is here for a side-effect.
# Exception if the encryption type of AESCBC is not properly skipped, ensures
# our `startswith` calls don't have typos
_unified_jobs(apps)

View File

@@ -26,7 +26,7 @@ from awx.main.models import (
from awx.main import tasks
from awx.main.task_engine import TaskEnhancer
from awx.main.utils.common import encrypt_field
from awx.main.utils import encrypt_field
@contextmanager

View File

@@ -1,65 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Ansible, Inc.
# All Rights Reserved.
import pytest
from awx.conf.models import Setting
from awx.main.utils import common
def test_encrypt_field():
field = Setting(pk=123, value='ANSIBLE')
encrypted = field.value = common.encrypt_field(field, 'value')
assert encrypted == '$encrypted$AES$Ey83gcmMuBBT1OEq2lepnw=='
assert common.decrypt_field(field, 'value') == 'ANSIBLE'
def test_encrypt_field_without_pk():
field = Setting(value='ANSIBLE')
encrypted = field.value = common.encrypt_field(field, 'value')
assert encrypted == '$encrypted$AES$8uIzEoGyY6QJwoTWbMFGhw=='
assert common.decrypt_field(field, 'value') == 'ANSIBLE'
def test_encrypt_field_with_unicode_string():
value = u'Iñtërnâtiônàlizætiøn'
field = Setting(value=value)
encrypted = field.value = common.encrypt_field(field, 'value')
assert encrypted == '$encrypted$UTF8$AES$AESQbqOefpYcLC7x8yZ2aWG4FlXlS66JgavLbDp/DSM='
assert common.decrypt_field(field, 'value') == value
def test_encrypt_field_force_disable_unicode():
value = u"NothingSpecial"
field = Setting(value=value)
encrypted = field.value = common.encrypt_field(field, 'value', skip_utf8=True)
assert "UTF8" not in encrypted
assert common.decrypt_field(field, 'value') == value
def test_encrypt_subfield():
field = Setting(value={'name': 'ANSIBLE'})
encrypted = field.value = common.encrypt_field(field, 'value', subfield='name')
assert encrypted == '$encrypted$AES$8uIzEoGyY6QJwoTWbMFGhw=='
assert common.decrypt_field(field, 'value', subfield='name') == 'ANSIBLE'
def test_encrypt_field_with_ask():
encrypted = common.encrypt_field(Setting(value='ASK'), 'value', ask=True)
assert encrypted == 'ASK'
def test_encrypt_field_with_empty_value():
encrypted = common.encrypt_field(Setting(value=None), 'value')
assert encrypted is None
@pytest.mark.parametrize('input_, output', [
({"foo": "bar"}, {"foo": "bar"}),
('{"foo": "bar"}', {"foo": "bar"}),
('---\nfoo: bar', {"foo": "bar"}),
(4399, {}),
])
def test_parse_yaml_or_json(input_, output):
assert common.parse_yaml_or_json(input_) == output

View File

@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Ansible, Inc.
# All Rights Reserved.
import pytest
from awx.main.utils import common
@pytest.mark.parametrize('input_, output', [
({"foo": "bar"}, {"foo": "bar"}),
('{"foo": "bar"}', {"foo": "bar"}),
('---\nfoo: bar', {"foo": "bar"}),
(4399, {}),
])
def test_parse_yaml_or_json(input_, output):
assert common.parse_yaml_or_json(input_) == output

View File

@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Ansible, Inc.
# All Rights Reserved.
from awx.conf.models import Setting
from awx.main.utils import encryption
def test_encrypt_field():
field = Setting(pk=123, value='ANSIBLE')
encrypted = field.value = encryption.encrypt_field(field, 'value')
assert encryption.decrypt_field(field, 'value') == 'ANSIBLE'
assert encrypted.startswith('$encrypted$AESCBC$')
def test_encrypt_field_without_pk():
field = Setting(value='ANSIBLE')
encrypted = field.value = encryption.encrypt_field(field, 'value')
assert encryption.decrypt_field(field, 'value') == 'ANSIBLE'
assert encrypted.startswith('$encrypted$AESCBC$')
def test_encrypt_field_with_unicode_string():
value = u'Iñtërnâtiônàlizætiøn'
field = Setting(value=value)
encrypted = field.value = encryption.encrypt_field(field, 'value')
assert encryption.decrypt_field(field, 'value') == value
assert encrypted.startswith('$encrypted$UTF8$AESCBC$')
def test_encrypt_field_force_disable_unicode():
value = u"NothingSpecial"
field = Setting(value=value)
encrypted = field.value = encryption.encrypt_field(field, 'value', skip_utf8=True)
assert "UTF8" not in encrypted
assert encryption.decrypt_field(field, 'value') == value
def test_encrypt_subfield():
field = Setting(value={'name': 'ANSIBLE'})
encrypted = field.value = encryption.encrypt_field(field, 'value', subfield='name')
assert encryption.decrypt_field(field, 'value', subfield='name') == 'ANSIBLE'
assert encrypted.startswith('$encrypted$AESCBC$')
def test_encrypt_field_with_ask():
encrypted = encryption.encrypt_field(Setting(value='ASK'), 'value', ask=True)
assert encrypted == 'ASK'
def test_encrypt_field_with_empty_value():
encrypted = encryption.encrypt_field(Setting(value=None), 'value')
assert encrypted is None