mirror of
https://github.com/ansible/awx.git
synced 2026-05-19 23:07:42 -02:30
remove association behavior from /api/v2/credentials/input_sources/
This commit is contained in:
committed by
Jake McDermott
parent
e9532dea8e
commit
368d933799
@@ -1461,7 +1461,7 @@ class CredentialInputSourceList(ListCreateAPIView):
|
|||||||
serializer_class = serializers.CredentialInputSourceSerializer
|
serializer_class = serializers.CredentialInputSourceSerializer
|
||||||
|
|
||||||
|
|
||||||
class CredentialInputSourceSubList(SubListCreateAttachDetachAPIView):
|
class CredentialInputSourceSubList(SubListCreateAPIView):
|
||||||
|
|
||||||
view_name = _("Credential Input Sources")
|
view_name = _("Credential Input Sources")
|
||||||
|
|
||||||
|
|||||||
@@ -5,51 +5,45 @@ from awx.api.versioning import reverse
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_associate_credential_input_source(get, post, admin, vault_credential, external_credential):
|
def test_associate_credential_input_source(get, post, delete, admin, vault_credential, external_credential):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
|
|
||||||
# attach
|
# attach
|
||||||
params = {
|
params = {
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'vault_password',
|
'input_field_name': 'vault_password',
|
||||||
'metadata': {'key': 'some_example_key'},
|
'metadata': {'key': 'some_example_key'}
|
||||||
'associate': True
|
|
||||||
}
|
}
|
||||||
response = post(sublist_url, params, admin)
|
response = post(list_url, params, admin)
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
|
|
||||||
detail = get(response.data['url'], admin)
|
detail = get(response.data['url'], admin)
|
||||||
assert detail.status_code == 200
|
assert detail.status_code == 200
|
||||||
|
|
||||||
response = get(sublist_url, admin)
|
response = get(list_url, admin)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.data['count'] == 1
|
assert response.data['count'] == 1
|
||||||
assert get(reverse(
|
|
||||||
'api:credential_input_source_list',
|
|
||||||
kwargs={'version': 'v2'}
|
|
||||||
), admin).data['count'] == 1
|
|
||||||
assert CredentialInputSource.objects.count() == 1
|
assert CredentialInputSource.objects.count() == 1
|
||||||
input_source = CredentialInputSource.objects.first()
|
input_source = CredentialInputSource.objects.first()
|
||||||
assert input_source.metadata == {'key': 'some_example_key'}
|
assert input_source.metadata == {'key': 'some_example_key'}
|
||||||
|
|
||||||
# detach
|
# detach
|
||||||
params = {
|
response = delete(
|
||||||
'id': detail.data['id'],
|
reverse(
|
||||||
'disassociate': True
|
'api:credential_input_source_detail',
|
||||||
}
|
kwargs={'version': 'v2', 'pk': detail.data['id']}
|
||||||
response = post(sublist_url, params, admin)
|
),
|
||||||
|
admin
|
||||||
|
)
|
||||||
assert response.status_code == 204
|
assert response.status_code == 204
|
||||||
|
|
||||||
response = get(sublist_url, admin)
|
response = get(list_url, admin)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.data['count'] == 0
|
assert response.data['count'] == 0
|
||||||
assert get(reverse(
|
|
||||||
'api:credential_input_source_list',
|
|
||||||
kwargs={'version': 'v2'}
|
|
||||||
), admin).data['count'] == 0
|
|
||||||
assert CredentialInputSource.objects.count() == 0
|
assert CredentialInputSource.objects.count() == 0
|
||||||
|
|
||||||
|
|
||||||
@@ -61,19 +55,20 @@ def test_associate_credential_input_source(get, post, admin, vault_credential, e
|
|||||||
{'extraneous': 'foo'}, # invalid parameter
|
{'extraneous': 'foo'}, # invalid parameter
|
||||||
])
|
])
|
||||||
def test_associate_credential_input_source_with_invalid_metadata(get, post, admin, vault_credential, external_credential, metadata):
|
def test_associate_credential_input_source_with_invalid_metadata(get, post, admin, vault_credential, external_credential, metadata):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
kwargs={'version': 'v2'},
|
||||||
)
|
)
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'vault_password',
|
'input_field_name': 'vault_password',
|
||||||
'metadata': metadata,
|
'metadata': metadata,
|
||||||
'associate': True
|
|
||||||
}
|
}
|
||||||
response = post(sublist_url, params, admin)
|
response = post(list_url, params, admin)
|
||||||
assert response.status_code == 400
|
assert response.status_code == 400
|
||||||
|
assert b'metadata' in response.content
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@@ -93,46 +88,46 @@ def test_create_from_list(get, post, admin, vault_credential, external_credentia
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_credential_input_source_with_external_target_returns_400(post, admin, external_credential, other_external_credential):
|
def test_create_credential_input_source_with_external_target_returns_400(post, admin, external_credential, other_external_credential):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': other_external_credential.pk}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
params = {
|
params = {
|
||||||
|
'target_credential': other_external_credential.pk,
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'token',
|
'input_field_name': 'token',
|
||||||
'associate': True,
|
|
||||||
'metadata': {'key': 'some_key'},
|
'metadata': {'key': 'some_key'},
|
||||||
}
|
}
|
||||||
response = post(sublist_url, params, admin)
|
response = post(list_url, params, admin)
|
||||||
assert response.status_code == 400
|
assert response.status_code == 400
|
||||||
assert response.data['target_credential'] == ['Target must be a non-external credential']
|
assert response.data['target_credential'] == ['Target must be a non-external credential']
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_input_source_rbac_associate(get, post, alice, vault_credential, external_credential):
|
def test_input_source_rbac_associate(get, post, delete, alice, vault_credential, external_credential):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
params = {
|
params = {
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'vault_password',
|
'input_field_name': 'vault_password',
|
||||||
'associate': True,
|
|
||||||
'metadata': {'key': 'some_key'},
|
'metadata': {'key': 'some_key'},
|
||||||
}
|
}
|
||||||
|
|
||||||
# alice can't admin the target *or* source cred
|
# alice can't admin the target *or* source cred
|
||||||
response = post(sublist_url, params, alice)
|
response = post(list_url, params, alice)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
||||||
# alice can't use the source cred
|
# alice can't use the source cred
|
||||||
vault_credential.admin_role.members.add(alice)
|
vault_credential.admin_role.members.add(alice)
|
||||||
response = post(sublist_url, params, alice)
|
response = post(list_url, params, alice)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
||||||
# alice is allowed to associate now
|
# alice is allowed to associate now
|
||||||
external_credential.use_role.members.add(alice)
|
external_credential.use_role.members.add(alice)
|
||||||
response = post(sublist_url, params, alice)
|
response = post(list_url, params, alice)
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
|
|
||||||
# now let's try disassociation
|
# now let's try disassociation
|
||||||
@@ -145,16 +140,16 @@ def test_input_source_rbac_associate(get, post, alice, vault_credential, externa
|
|||||||
assert get(response.data['url'], alice).status_code == 403
|
assert get(response.data['url'], alice).status_code == 403
|
||||||
|
|
||||||
# alice can't admin the target (so she can't remove the input source)
|
# alice can't admin the target (so she can't remove the input source)
|
||||||
params = {
|
delete_url = reverse(
|
||||||
'id': detail.data['id'],
|
'api:credential_input_source_detail',
|
||||||
'disassociate': True
|
kwargs={'version': 'v2', 'pk': detail.data['id']}
|
||||||
}
|
)
|
||||||
response = post(sublist_url, params, alice)
|
response = delete(delete_url, alice)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
||||||
# alice is allowed to disassociate now
|
# alice is allowed to disassociate now
|
||||||
vault_credential.admin_role.members.add(alice)
|
vault_credential.admin_role.members.add(alice)
|
||||||
response = post(sublist_url, params, alice)
|
response = delete(delete_url, alice)
|
||||||
assert response.status_code == 204
|
assert response.status_code == 204
|
||||||
|
|
||||||
|
|
||||||
@@ -169,7 +164,6 @@ def test_input_source_detail_rbac(get, post, patch, delete, admin, alice,
|
|||||||
params = {
|
params = {
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'vault_password',
|
'input_field_name': 'vault_password',
|
||||||
'associate': True,
|
|
||||||
'metadata': {'key': 'some_key'},
|
'metadata': {'key': 'some_key'},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -213,7 +207,7 @@ def test_input_source_detail_rbac(get, post, patch, delete, admin, alice,
|
|||||||
def test_input_source_create_rbac(get, post, patch, delete, alice,
|
def test_input_source_create_rbac(get, post, patch, delete, alice,
|
||||||
vault_credential, external_credential,
|
vault_credential, external_credential,
|
||||||
other_external_credential):
|
other_external_credential):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_list',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2'}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
@@ -225,17 +219,17 @@ def test_input_source_create_rbac(get, post, patch, delete, alice,
|
|||||||
}
|
}
|
||||||
|
|
||||||
# alice can't create the inv source because she has access to neither credential
|
# alice can't create the inv source because she has access to neither credential
|
||||||
response = post(sublist_url, params, alice)
|
response = post(list_url, params, alice)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
||||||
# alice still can't because she can't use the source credential
|
# alice still can't because she can't use the source credential
|
||||||
vault_credential.admin_role.members.add(alice)
|
vault_credential.admin_role.members.add(alice)
|
||||||
response = post(sublist_url, params, alice)
|
response = post(list_url, params, alice)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
||||||
# alice can create an input source if she has permissions on both credentials
|
# alice can create an input source if she has permissions on both credentials
|
||||||
external_credential.use_role.members.add(alice)
|
external_credential.use_role.members.add(alice)
|
||||||
response = post(sublist_url, params, alice)
|
response = post(list_url, params, alice)
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
assert CredentialInputSource.objects.count() == 1
|
assert CredentialInputSource.objects.count() == 1
|
||||||
|
|
||||||
@@ -248,18 +242,18 @@ def test_input_source_rbac_swap_target_credential(get, post, put, patch, admin,
|
|||||||
# you have to have admin role on the *original* credential (so you can
|
# you have to have admin role on the *original* credential (so you can
|
||||||
# remove the relationship) *and* on the *new* credential (so you can apply the
|
# remove the relationship) *and* on the *new* credential (so you can apply the
|
||||||
# new relationship)
|
# new relationship)
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
params = {
|
params = {
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'vault_password',
|
'input_field_name': 'vault_password',
|
||||||
'associate': True,
|
|
||||||
'metadata': {'key': 'some_key'},
|
'metadata': {'key': 'some_key'},
|
||||||
}
|
}
|
||||||
|
|
||||||
response = post(sublist_url, params, admin)
|
response = post(list_url, params, admin)
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
url = response.data['url']
|
url = response.data['url']
|
||||||
|
|
||||||
@@ -285,47 +279,51 @@ def test_input_source_rbac_swap_target_credential(get, post, put, patch, admin,
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_credential_input_source_with_non_external_source_returns_400(post, admin, credential, vault_credential):
|
def test_create_credential_input_source_with_non_external_source_returns_400(post, admin, credential, vault_credential):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
params = {
|
params = {
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': credential.pk,
|
'source_credential': credential.pk,
|
||||||
'input_field_name': 'vault_password'
|
'input_field_name': 'vault_password'
|
||||||
}
|
}
|
||||||
response = post(sublist_url, params, admin)
|
response = post(list_url, params, admin)
|
||||||
assert response.status_code == 400
|
assert response.status_code == 400
|
||||||
assert response.data['source_credential'] == ['Source must be an external credential']
|
assert response.data['source_credential'] == ['Source must be an external credential']
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_credential_input_source_with_undefined_input_returns_400(post, admin, vault_credential, external_credential):
|
def test_create_credential_input_source_with_undefined_input_returns_400(post, admin, vault_credential, external_credential):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
params = {
|
params = {
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'not_defined_for_credential_type',
|
'input_field_name': 'not_defined_for_credential_type',
|
||||||
'metadata': {'key': 'some_key'}
|
'metadata': {'key': 'some_key'}
|
||||||
}
|
}
|
||||||
response = post(sublist_url, params, admin)
|
response = post(list_url, params, admin)
|
||||||
assert response.status_code == 400
|
assert response.status_code == 400
|
||||||
assert response.data['input_field_name'] == ['Input field must be defined on target credential (options are vault_id, vault_password).']
|
assert response.data['input_field_name'] == ['Input field must be defined on target credential (options are vault_id, vault_password).']
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_create_credential_input_source_with_already_used_input_returns_400(post, admin, vault_credential, external_credential, other_external_credential):
|
def test_create_credential_input_source_with_already_used_input_returns_400(post, admin, vault_credential, external_credential, other_external_credential):
|
||||||
sublist_url = reverse(
|
list_url = reverse(
|
||||||
'api:credential_input_source_sublist',
|
'api:credential_input_source_list',
|
||||||
kwargs={'version': 'v2', 'pk': vault_credential.pk}
|
kwargs={'version': 'v2'}
|
||||||
)
|
)
|
||||||
all_params = [{
|
all_params = [{
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': external_credential.pk,
|
'source_credential': external_credential.pk,
|
||||||
'input_field_name': 'vault_password'
|
'input_field_name': 'vault_password'
|
||||||
}, {
|
}, {
|
||||||
|
'target_credential': vault_credential.pk,
|
||||||
'source_credential': other_external_credential.pk,
|
'source_credential': other_external_credential.pk,
|
||||||
'input_field_name': 'vault_password'
|
'input_field_name': 'vault_password'
|
||||||
}]
|
}]
|
||||||
all_responses = [post(sublist_url, params, admin) for params in all_params]
|
all_responses = [post(list_url, params, admin) for params in all_params]
|
||||||
assert all_responses.pop().status_code == 400
|
assert all_responses.pop().status_code == 400
|
||||||
|
|||||||
Reference in New Issue
Block a user