Introduce a new CredentialTemplate model

Credentials now have a required CredentialType, which defines inputs
(i.e., username, password) and injectors (i.e., assign the username to
SOME_ENV_VARIABLE at job runtime)

This commit only implements the model changes necessary to support the
new inputs model, and includes code for the credential serializer that
allows backwards-compatible support for /api/v1/credentials/; tasks.py
still needs to be updated to actually respect CredentialType injectors.

This change *will* break the UI for credentials (because it needs to be
updated to use the new v2 endpoint).

see: #5877
see: #5876
see: #5805
This commit is contained in:
Ryan Petrello
2017-03-30 14:47:48 -04:00
parent 4931bec1be
commit ba259e0ad4
30 changed files with 3103 additions and 467 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,205 @@
import json
import pytest
from awx.main.models.credential import CredentialType
from awx.api.versioning import reverse
@pytest.mark.django_db
def test_list_as_unauthorized_xfail(get):
response = get(reverse('api:credential_type_list'))
assert response.status_code == 401
@pytest.mark.django_db
def test_list_as_normal_user(get, alice):
response = get(reverse('api:credential_type_list'), alice)
assert response.status_code == 200
assert response.data['count'] == 0
@pytest.mark.django_db
def test_list_as_admin(get, admin):
response = get(reverse('api:credential_type_list'), admin)
assert response.status_code == 200
assert response.data['count'] == 0
@pytest.mark.django_db
def test_create_as_unauthorized_xfail(get, post):
response = post(reverse('api:credential_type_list'), {
'name': 'Custom Credential Type',
})
assert response.status_code == 401
@pytest.mark.django_db
def test_update_as_unauthorized_xfail(patch):
ssh = CredentialType.defaults['ssh']()
ssh.save()
response = patch(
reverse('api:credential_type_detail', kwargs={'pk': ssh.pk}),
{
'name': 'Some Other Name'
}
)
assert response.status_code == 401
@pytest.mark.django_db
def test_delete_as_unauthorized_xfail(delete):
ssh = CredentialType.defaults['ssh']()
ssh.save()
response = delete(
reverse('api:credential_type_detail', kwargs={'pk': ssh.pk}),
)
assert response.status_code == 401
@pytest.mark.django_db
def test_create_as_normal_user_xfail(get, post, alice):
response = post(reverse('api:credential_type_list'), {
'name': 'Custom Credential Type',
}, alice)
assert response.status_code == 403
assert get(reverse('api:credential_type_list'), alice).data['count'] == 0
@pytest.mark.django_db
def test_create_as_admin(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'Custom Credential Type',
'inputs': {},
'injectors': {}
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
assert response.data['results'][0]['name'] == 'Custom Credential Type'
assert response.data['results'][0]['inputs'] == {}
assert response.data['results'][0]['injectors'] == {}
assert response.data['results'][0]['managed_by_tower'] is False
@pytest.mark.django_db
def test_create_managed_by_tower_readonly(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'Custom Credential Type',
'inputs': {},
'injectors': {},
'managed_by_tower': True
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
assert response.data['results'][0]['managed_by_tower'] is False
@pytest.mark.django_db
def test_create_with_valid_inputs(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string',
'secret': True,
'ask_at_runtime': True
}]
},
'injectors': {}
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
fields = response.data['results'][0]['inputs']['fields']
assert len(fields) == 1
assert fields[0]['id'] == 'api_token'
assert fields[0]['label'] == 'API Token'
assert fields[0]['ask_at_runtime'] is True
assert fields[0]['secret'] is True
assert fields[0]['type'] == 'string'
@pytest.mark.django_db
def test_create_with_invalid_inputs_xfail(post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {'feeelds': {},},
'injectors': {}
}, admin)
assert response.status_code == 400
assert "'feeelds' was unexpected" in json.dumps(response.data)
@pytest.mark.django_db
def test_create_with_valid_injectors(get, post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string',
'secret': True,
'ask_at_runtime': True
}]
},
'injectors': {
'env': {
'ANSIBLE_MY_CLOUD_TOKEN': '{{api_token}}'
}
}
}, admin)
assert response.status_code == 201
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
injectors = response.data['results'][0]['injectors']
assert len(injectors) == 1
assert injectors['env'] == {
'ANSIBLE_MY_CLOUD_TOKEN': '{{api_token}}'
}
@pytest.mark.django_db
def test_create_with_invalid_injectors_xfail(post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {},
'injectors': {'nonsense': 123}
}, admin)
assert response.status_code == 400
@pytest.mark.django_db
def test_create_with_undefined_template_variable_xfail(post, admin):
response = post(reverse('api:credential_type_list'), {
'kind': 'cloud',
'name': 'MyCloud',
'inputs': {
'fields': [{
'id': 'api_token',
'label': 'API Token',
'type': 'string',
'secret': True,
'ask_at_runtime': True
}]
},
'injectors': {
'env': {'ANSIBLE_MY_CLOUD_TOKEN': '{{api_tolkien}}'}
}
}, admin)
assert response.status_code == 400
assert "'api_tolkien' is undefined" in json.dumps(response.data)

View File

@@ -10,8 +10,15 @@ from awx.api.versioning import reverse
@pytest.fixture
def runtime_data(organization):
cred_obj = Credential.objects.create(name='runtime-cred', kind='ssh', username='test_user2', password='pas4word2')
def runtime_data(organization, credentialtype_ssh):
cred_obj = Credential.objects.create(
name='runtime-cred',
credential_type=credentialtype_ssh,
inputs={
'username': 'test_user2',
'password': 'pas4word2'
}
)
inv_obj = organization.inventories.create(name="runtime-inv")
return dict(
extra_vars='{"job_launch_var": 4}',

View File

@@ -28,7 +28,7 @@ from rest_framework.test import (
force_authenticate,
)
from awx.main.models.credential import Credential
from awx.main.models.credential import CredentialType, Credential
from awx.main.models.jobs import JobTemplate
from awx.main.models.inventory import (
Group,
@@ -191,18 +191,43 @@ def organization(instance):
@pytest.fixture
def credential():
return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret')
def credentialtype_ssh():
ssh = CredentialType.defaults['ssh']()
ssh.save()
return ssh
@pytest.fixture
def machine_credential():
return Credential.objects.create(name='machine-cred', kind='ssh', username='test_user', password='pas4word')
def credentialtype_aws():
aws = CredentialType.defaults['aws']()
aws.save()
return aws
@pytest.fixture
def org_credential(organization):
return Credential.objects.create(kind='aws', name='test-cred', username='something', password='secret', organization=organization)
def credentialtype_net():
net = CredentialType.defaults['net']()
net.save()
return net
@pytest.fixture
def credential(credentialtype_aws):
return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
inputs={'username': 'something', 'password': 'secret'})
@pytest.fixture
def machine_credential(credentialtype_ssh):
return Credential.objects.create(credential_type=credentialtype_ssh, name='machine-cred',
inputs={'username': 'test_user', 'password': 'pas4word'})
@pytest.fixture
def org_credential(organization, credentialtype_aws):
return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
inputs={'username': 'something', 'password': 'secret'},
organization=organization)
@pytest.fixture

View File

@@ -0,0 +1,289 @@
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.
import pytest
from django.core.exceptions import ValidationError
from awx.main.utils.common import decrypt_field
from awx.main.models import Credential, CredentialType
@pytest.mark.django_db
def test_default_cred_types():
assert sorted(CredentialType.defaults.keys()) == [
'aws',
'azure',
'azure_rm',
'cloudforms',
'gce',
'net',
'openstack',
'rackspace',
'satellite6',
'scm',
'ssh',
'vault',
'vmware',
]
for type_ in CredentialType.defaults.values():
assert type_().managed_by_tower is True
@pytest.mark.django_db
@pytest.mark.parametrize('kind', ['net', 'scm', 'ssh', 'vault'])
def test_cred_type_kind_uniqueness(kind):
"""
non-cloud credential types are exclusive_on_kind (you can only use *one* of
them at a time)
"""
assert CredentialType.defaults[kind]().unique_by_kind is True
@pytest.mark.django_db
def test_cloud_kind_uniqueness():
"""
you can specify more than one cloud credential type (as long as they have
different names so you don't e.g., use ec2 twice")
"""
assert CredentialType.defaults['aws']().unique_by_kind is False
@pytest.mark.django_db
@pytest.mark.parametrize('input_, valid', [
({}, True),
({'fields': []}, True),
({'fields': {}}, False),
({'fields': 123}, False),
({'fields': [{'id': 'username', 'label': 'Username', 'foo': 'bar'}]}, False),
({'fields': [{'id': 'username', 'label': 'Username', 'type': 'string'}]}, True),
({'fields': [{'id': 'username', 'label': 'Username', 'help_text': 1}]}, False),
({'fields': [{'id': 'username', 'label': 'Username', 'help_text': 'Help Text'}]}, True), # noqa
({'fields': [{'id': 'password', 'label': 'Password', 'type': 'number'}]}, True),
({'fields': [{'id': 'ssh_key', 'label': 'SSH Key', 'type': 'ssh_private_key'}]}, True), # noqa
({'fields': [{'id': 'other', 'label': 'Other', 'type': 'boolean'}]}, False),
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': True}]}, True),
({'fields': [{'id': 'certificate', 'label': 'Cert', 'multiline': 'bad'}]}, False), # noqa
({'fields': [{'id': 'token', 'label': 'Token', 'secret': True}]}, True),
({'fields': [{'id': 'token', 'label': 'Token', 'secret': 'bad'}]}, False),
({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': True}]}, True),
({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': 'bad'}]}, False), # noqa
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': 'not-a-list'}]}, False), # noqa
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': []}]}, False),
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': ['su', 'sudo']}]}, True), # noqa
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': ['dup', 'dup']}]}, False), # noqa
])
def test_cred_type_input_schema_validity(input_, valid):
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs=input_
)
if valid is False:
with pytest.raises(ValidationError):
type_.full_clean()
else:
type_.full_clean()
@pytest.mark.django_db
@pytest.mark.parametrize('injectors, valid', [
({}, True),
({'invalid-injector': {}}, False),
({'file': 123}, False),
({'file': {}}, False),
({'file': {'template': '{{username}}'}}, True),
({'file': {'foo': 'bar'}}, False),
({'ssh': 123}, False),
({'ssh': {}}, False),
({'ssh': {'public': 'PUB'}}, False),
({'ssh': {'private': 'PRIV'}}, False),
({'ssh': {'public': 'PUB', 'private': 'PRIV'}}, True),
({'ssh': {'public': 'PUB', 'private': 'PRIV', 'a': 'b'}}, False),
({'password': {}}, False),
({'password': {'key': 'Password:'}}, False),
({'password': {'value': '{{pass}}'}}, False),
({'password': {'key': 'Password:', 'value': '{{pass}}'}}, True),
({'password': {'key': 'Password:', 'value': '{{pass}}', 'a': 'b'}}, False),
({'env': 123}, False),
({'env': {}}, True),
({'env': {'AWX_SECRET': '{{awx_secret}}'}}, True),
({'env': {'AWX_SECRET_99': '{{awx_secret}}'}}, True),
({'env': {'99': '{{awx_secret}}'}}, False),
({'env': {'AWX_SECRET=': '{{awx_secret}}'}}, False),
({'extra_vars': 123}, False),
({'extra_vars': {}}, True),
({'extra_vars': {'hostname': '{{host}}'}}, True),
({'extra_vars': {'hostname_99': '{{host}}'}}, True),
({'extra_vars': {'99': '{{host}}'}}, False),
({'extra_vars': {'99=': '{{host}}'}}, False),
])
def test_cred_type_injectors_schema(injectors, valid):
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs={
'fields': [
{'id': 'username', 'type': 'string', 'label': '_'},
{'id': 'pass', 'type': 'string', 'label': '_'},
{'id': 'awx_secret', 'type': 'string', 'label': '_'},
{'id': 'host', 'type': 'string', 'label': '_'},
]
},
injectors=injectors
)
if valid is False:
with pytest.raises(ValidationError):
type_.full_clean()
else:
type_.full_clean()
@pytest.mark.django_db
def test_credential_creation(organization_factory):
org = organization_factory('test').organization
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs={
'fields': [{
'id': 'username',
'label': 'Username for SomeCloud',
'type': 'string'
}]
}
)
type_.save()
cred = Credential(credential_type=type_, name="Bob's Credential",
inputs={'username': 'bob'}, organization=org)
cred.save()
cred.full_clean()
assert isinstance(cred, Credential)
assert cred.name == "Bob's Credential"
assert cred.inputs['username'] == cred.username == 'bob'
@pytest.mark.django_db
def test_credential_creation_validation_failure(organization_factory):
org = organization_factory('test').organization
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed_by_tower=True,
inputs={
'fields': [{
'id': 'username',
'label': 'Username for SomeCloud',
'type': 'string'
}]
}
)
type_.save()
with pytest.raises(ValidationError):
cred = Credential(credential_type=type_, name="Bob's Credential",
inputs={'user': 'wrong-key'}, organization=org)
cred.save()
cred.full_clean()
@pytest.mark.django_db
def test_credential_encryption(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'testing123'
@pytest.mark.django_db
def test_credential_encryption_with_ask(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'ASK'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['password'] == 'ASK'
@pytest.mark.django_db
def test_credential_with_multiple_secrets(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'ssh_key_data': 'SOMEKEY', 'ssh_key_unlock': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['ssh_key_data'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_data') == 'SOMEKEY'
assert cred.inputs['ssh_key_unlock'].startswith('$encrypted$')
assert decrypt_field(cred, 'ssh_key_unlock') == 'testing123'
@pytest.mark.django_db
def test_credential_update(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
cred.inputs['password'] = 'newpassword'
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'newpassword'
@pytest.mark.django_db
def test_credential_update_with_prior(organization_factory, credentialtype_ssh):
org = organization_factory('test').organization
cred = Credential(
credential_type=credentialtype_ssh,
name="Bob's Credential",
inputs={'password': 'testing123'},
organization=org
)
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
cred.inputs['username'] = 'joe'
cred.inputs['password'] = '$encrypted$'
cred.save()
assert Credential.objects.count() == 1
cred = Credential.objects.all()[:1].get()
assert cred.inputs['username'] == 'joe'
assert cred.inputs['password'].startswith('$encrypted$')
assert decrypt_field(cred, 'password') == 'testing123'

View File

@@ -5,12 +5,12 @@ from awx.main.models import Credential
@pytest.mark.django_db
def test_cred_unique_org_name_kind(organization_factory):
def test_cred_unique_org_name_kind(organization_factory, credentialtype_ssh):
objects = organization_factory("test")
cred = Credential(name="test", kind="net", organization=objects.organization)
cred = Credential(name="test", credential_type=credentialtype_ssh, organization=objects.organization)
cred.save()
with pytest.raises(IntegrityError):
cred = Credential(name="test", kind="net", organization=objects.organization)
cred = Credential(name="test", credential_type=credentialtype_ssh, organization=objects.organization)
cred.save()

View File

@@ -21,12 +21,12 @@ def test_credential_migration_user(credential, user, permissions):
@pytest.mark.django_db
def test_two_teams_same_cred_name(organization_factory):
def test_two_teams_same_cred_name(organization_factory, credentialtype_net):
objects = organization_factory("test",
teams=["team1", "team2"])
cred1 = Credential.objects.create(name="test", kind="net", deprecated_team=objects.teams.team1)
cred2 = Credential.objects.create(name="test", kind="net", deprecated_team=objects.teams.team2)
cred1 = Credential.objects.create(name="test", credential_type=credentialtype_net, deprecated_team=objects.teams.team1)
cred2 = Credential.objects.create(name="test", credential_type=credentialtype_net, deprecated_team=objects.teams.team2)
rbac.migrate_credential(apps, None)
@@ -119,7 +119,7 @@ def test_credential_access_auditor(credential, organization_factory):
@pytest.mark.django_db
def test_credential_access_admin(user, team, credential):
def test_credential_access_admin(user, team, credential, credentialtype_aws):
u = user('org-admin', False)
team.organization.admin_role.members.add(u)
@@ -137,7 +137,7 @@ def test_credential_access_admin(user, team, credential):
credential.admin_role.parents.add(team.admin_role)
credential.save()
cred = Credential.objects.create(kind='aws', name='test-cred')
cred = Credential.objects.create(credential_type=credentialtype_aws, name='test-cred')
cred.deprecated_team = team
cred.save()

View File

@@ -51,13 +51,20 @@ class TestJobRelaunchAccess:
return jt.create_unified_job()
@pytest.fixture
def job_with_prompts(self, machine_credential, inventory, organization):
def job_with_prompts(self, machine_credential, inventory, organization, credentialtype_ssh):
jt = JobTemplate.objects.create(
name='test-job-template-prompts', credential=machine_credential, inventory=inventory,
ask_tags_on_launch=True, ask_variables_on_launch=True, ask_skip_tags_on_launch=True,
ask_limit_on_launch=True, ask_job_type_on_launch=True, ask_inventory_on_launch=True,
ask_credential_on_launch=True)
new_cred = Credential.objects.create(name='new-cred', kind='ssh', username='test_user', password='pas4word')
new_cred = Credential.objects.create(
name='new-cred',
credential_type=credentialtype_ssh,
inputs={
'username': 'test_user',
'password': 'pas4word'
}
)
new_inv = Inventory.objects.create(name='new-inv', organization=organization)
return jt.create_unified_job(credential=new_cred, inventory=new_inv)

View File

@@ -8,7 +8,7 @@ from awx.main.migrations import _old_access as old_access
@pytest.mark.django_db
def test_project_migration():
def test_project_migration(credentialtype_ssh):
'''
o1 o2 o3 with o1 -- i1 o2 -- i2
@@ -59,7 +59,7 @@ def test_project_migration():
o2 = Organization.objects.create(name='o2')
o3 = Organization.objects.create(name='o3')
c1 = Credential.objects.create(name='c1')
c1 = Credential.objects.create(name='c1', credential_type=credentialtype_ssh)
project_name = unicode("\xc3\xb4", "utf-8")
p1 = Project.objects.create(name=project_name, credential=c1)