mirror of
https://github.com/ansible/awx.git
synced 2026-05-22 08:17:39 -02:30
add api for managing credential input sources
This commit is contained in:
@@ -30,8 +30,8 @@ from awx.main.utils import (
|
||||
)
|
||||
from awx.main.models import (
|
||||
ActivityStream, AdHocCommand, AdHocCommandEvent, Credential, CredentialType,
|
||||
CustomInventoryScript, Group, Host, Instance, InstanceGroup, Inventory,
|
||||
InventorySource, InventoryUpdate, InventoryUpdateEvent, Job, JobEvent,
|
||||
CredentialInputSource, CustomInventoryScript, Group, Host, Instance, InstanceGroup,
|
||||
Inventory, InventorySource, InventoryUpdate, InventoryUpdateEvent, Job, JobEvent,
|
||||
JobHostSummary, JobLaunchConfig, JobTemplate, Label, Notification,
|
||||
NotificationTemplate, Organization, Project, ProjectUpdate,
|
||||
ProjectUpdateEvent, Role, Schedule, SystemJob, SystemJobEvent,
|
||||
@@ -1163,6 +1163,19 @@ class CredentialAccess(BaseAccess):
|
||||
return self.can_change(obj, None)
|
||||
|
||||
|
||||
class CredentialInputSourceAccess(BaseAccess):
|
||||
'''
|
||||
I can see credential input sources when:
|
||||
- I'm a superuser (TODO: Update)
|
||||
I can create credential input sources when:
|
||||
- I'm a superuser (TODO: Update)
|
||||
I can delete credential input sources when:
|
||||
- I'm a superuser (TODO: Update)
|
||||
'''
|
||||
|
||||
model = CredentialInputSource
|
||||
|
||||
|
||||
class TeamAccess(BaseAccess):
|
||||
'''
|
||||
I can see a team when:
|
||||
|
||||
@@ -1339,6 +1339,10 @@ class CredentialInputSource(PrimordialModel):
|
||||
backend_kwargs[field_name] = value
|
||||
return backend(**backend_kwargs)
|
||||
|
||||
def get_absolute_url(self, request=None):
|
||||
view_name = 'api:credential_input_source_detail'
|
||||
return reverse(view_name, kwargs={'pk': self.pk}, request=request)
|
||||
|
||||
|
||||
for plugin in credential_plugins:
|
||||
CredentialType.load_plugin(plugin)
|
||||
|
||||
@@ -140,6 +140,7 @@ def pytest_runtest_teardown(item, nextitem):
|
||||
# this is a local test cache, so we want every test to start with empty cache
|
||||
cache.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope='session', autouse=True)
|
||||
def mock_external_credential_input_sources():
|
||||
# Credential objects query their related input sources on initialization.
|
||||
|
||||
115
awx/main/tests/functional/api/test_credential_input_sources.py
Normal file
115
awx/main/tests/functional/api/test_credential_input_sources.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import pytest
|
||||
|
||||
from awx.api.versioning import reverse
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_credential_input_source(get, post, admin, vault_credential, external_credential):
|
||||
list_url = reverse(
|
||||
'api:credential_input_source_list',
|
||||
kwargs={'version': 'v2'}
|
||||
)
|
||||
sublist_url = reverse(
|
||||
'api:credential_input_source_sublist',
|
||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
||||
)
|
||||
params = {
|
||||
'source_credential': external_credential.pk,
|
||||
'target_credential': vault_credential.pk,
|
||||
'input_field_name': 'vault_password'
|
||||
}
|
||||
response = post(list_url, params, admin)
|
||||
assert response.status_code == 201
|
||||
|
||||
response = get(response.data['url'], admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
response = get(list_url, admin)
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 1
|
||||
|
||||
response = get(sublist_url, admin)
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_credential_input_source_using_sublist_returns_405(post, admin, vault_credential, external_credential):
|
||||
sublist_url = reverse(
|
||||
'api:credential_input_source_sublist',
|
||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
||||
)
|
||||
params = {
|
||||
'source_credential': external_credential.pk,
|
||||
'target_credential': vault_credential.pk,
|
||||
'input_field_name': 'vault_password'
|
||||
}
|
||||
response = post(sublist_url, params, admin)
|
||||
assert response.status_code == 405
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_credential_input_source_with_external_target_returns_400(post, admin, external_credential, other_external_credential):
|
||||
list_url = reverse(
|
||||
'api:credential_input_source_list',
|
||||
kwargs={'version': 'v2'}
|
||||
)
|
||||
params = {
|
||||
'source_credential': external_credential.pk,
|
||||
'target_credential': other_external_credential.pk,
|
||||
'input_field_name': 'token'
|
||||
}
|
||||
response = post(list_url, params, admin)
|
||||
assert response.status_code == 400
|
||||
assert response.data['target_credential'] == ['Target must be a non-external credential']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_credential_input_source_with_non_external_source_returns_400(post, admin, credential, vault_credential):
|
||||
list_url = reverse(
|
||||
'api:credential_input_source_list',
|
||||
kwargs={'version': 'v2'}
|
||||
)
|
||||
params = {
|
||||
'source_credential': credential.pk,
|
||||
'target_credential': vault_credential.pk,
|
||||
'input_field_name': 'vault_password'
|
||||
}
|
||||
response = post(list_url, params, admin)
|
||||
assert response.status_code == 400
|
||||
assert response.data['source_credential'] == ['Source must be an external credential']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_credential_input_source_with_undefined_input_returns_400(post, admin, vault_credential, external_credential):
|
||||
list_url = reverse(
|
||||
'api:credential_input_source_list',
|
||||
kwargs={'version': 'v2'}
|
||||
)
|
||||
params = {
|
||||
'source_credential': external_credential.pk,
|
||||
'target_credential': vault_credential.pk,
|
||||
'input_field_name': 'not_defined_for_credential_type'
|
||||
}
|
||||
response = post(list_url, params, admin)
|
||||
assert response.status_code == 400
|
||||
assert response.data['input_field_name'] == ['Input field must be defined on target credential.']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_credential_input_source_with_already_used_input_returns_400(post, admin, vault_credential, external_credential, other_external_credential):
|
||||
list_url = reverse(
|
||||
'api:credential_input_source_list',
|
||||
kwargs={'version': 'v2'}
|
||||
)
|
||||
all_params = [{
|
||||
'source_credential': external_credential.pk,
|
||||
'target_credential': vault_credential.pk,
|
||||
'input_field_name': 'vault_password'
|
||||
}, {
|
||||
'source_credential': other_external_credential.pk,
|
||||
'target_credential': vault_credential.pk,
|
||||
'input_field_name': 'vault_password'
|
||||
}]
|
||||
all_responses = [post(list_url, params, admin) for params in all_params]
|
||||
assert all_responses.pop().status_code == 400
|
||||
@@ -250,6 +250,33 @@ def credentialtype_insights():
|
||||
return insights_type
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def credentialtype_external():
|
||||
external_type_inputs = {
|
||||
'fields': [{
|
||||
'id': 'url',
|
||||
'label': 'Server URL',
|
||||
'type': 'string',
|
||||
'help_text': 'The server url.'
|
||||
}, {
|
||||
'id': 'token',
|
||||
'label': 'Token',
|
||||
'type': 'string',
|
||||
'secret': True,
|
||||
'help_text': 'An access token for the server.'
|
||||
}],
|
||||
'required': ['url', 'token'],
|
||||
}
|
||||
external_type = CredentialType(
|
||||
kind='external',
|
||||
managed_by_tower=True,
|
||||
name='External Service',
|
||||
inputs=external_type_inputs
|
||||
)
|
||||
external_type.save()
|
||||
return external_type
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def credential(credentialtype_aws):
|
||||
return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
|
||||
@@ -293,6 +320,18 @@ def org_credential(organization, credentialtype_aws):
|
||||
organization=organization)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def external_credential(credentialtype_external):
|
||||
return Credential.objects.create(credential_type=credentialtype_external, name='external-cred',
|
||||
inputs={'url': 'http://testhost.com', 'token': 'secret1'})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def other_external_credential(credentialtype_external):
|
||||
return Credential.objects.create(credential_type=credentialtype_external, name='other-external-cred',
|
||||
inputs={'url': 'http://testhost.com', 'token': 'secret2'})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def inventory(organization):
|
||||
return organization.inventories.create(name="test-inv")
|
||||
|
||||
Reference in New Issue
Block a user