diff --git a/awx/main/access.py b/awx/main/access.py index d13f4ed682..6f0aec60b2 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1165,16 +1165,43 @@ class CredentialAccess(BaseAccess): class CredentialInputSourceAccess(BaseAccess): ''' - I can see credential input sources when: - - I'm a superuser (TODO: Update) - I can create credential input sources when: - - I'm a superuser (TODO: Update) - I can delete credential input sources when: - - I'm a superuser (TODO: Update) + I can see a CredentialInputSource when: + - I can see the associated target_credential + I can create/change a CredentialInputSource when: + - I'm an admin of the associated target_credential + - I have use access to the associated source credential + I can delete a CredentialInputSource when: + - I'm an admin of the associated target_credential ''' model = CredentialInputSource + def filtered_queryset(self): + return CredentialInputSource.objects.filter( + target_credential__in=Credential.accessible_pk_qs(self.user, 'read_role')) + + @check_superuser + def can_read(self, obj): + return self.user in obj.target_credential.read_role + + @check_superuser + def can_add(self, data): + return ( + self.check_related('target_credential', Credential, data, role_field='admin_role') and + self.check_related('source_credential', Credential, data, role_field='use_role') + ) + + @check_superuser + def can_change(self, obj, data): + if self.can_add(data) is False: + return False + + return self.user in obj.target_credential.admin_role + + @check_superuser + def can_delete(self, obj): + return self.user in obj.target_credential.admin_role + class TeamAccess(BaseAccess): ''' diff --git a/awx/main/tests/functional/api/test_credential_input_sources.py b/awx/main/tests/functional/api/test_credential_input_sources.py index 516bb54f67..a4237083c0 100644 --- a/awx/main/tests/functional/api/test_credential_input_sources.py +++ b/awx/main/tests/functional/api/test_credential_input_sources.py @@ -106,6 +106,150 @@ def test_create_credential_input_source_with_external_target_returns_400(post, a assert response.data['target_credential'] == ['Target must be a non-external credential'] +@pytest.mark.django_db +def test_input_source_rbac_associate(get, post, alice, vault_credential, external_credential): + sublist_url = reverse( + 'api:credential_input_source_sublist', + kwargs={'version': 'v2', 'pk': vault_credential.pk} + ) + params = { + 'source_credential': external_credential.pk, + 'input_field_name': 'vault_password', + 'associate': True, + 'metadata': {'key': 'some_key'}, + } + + # alice can't admin the target *or* source cred + response = post(sublist_url, params, alice) + assert response.status_code == 403 + + # alice can't use the source cred + vault_credential.admin_role.members.add(alice) + response = post(sublist_url, params, alice) + assert response.status_code == 403 + + # alice is allowed to associate now + external_credential.use_role.members.add(alice) + response = post(sublist_url, params, alice) + assert response.status_code == 201 + + # now let's try disassociation + detail = get(response.data['url'], alice) + assert detail.status_code == 200 + vault_credential.admin_role.members.remove(alice) + external_credential.use_role.members.remove(alice) + + # now that permissions are removed, alice can't *read* the input source + assert get(response.data['url'], alice).status_code == 403 + + # alice can't admin the target (so she can't remove the input source) + params = { + 'id': detail.data['id'], + 'disassociate': True + } + response = post(sublist_url, params, alice) + assert response.status_code == 403 + + # alice is allowed to disassociate now + vault_credential.admin_role.members.add(alice) + response = post(sublist_url, params, alice) + assert response.status_code == 204 + + +@pytest.mark.django_db +def test_input_source_detail_rbac(get, post, patch, delete, admin, alice, + vault_credential, external_credential, + other_external_credential): + sublist_url = reverse( + 'api:credential_input_source_sublist', + kwargs={'version': 'v2', 'pk': vault_credential.pk} + ) + params = { + 'source_credential': external_credential.pk, + 'input_field_name': 'vault_password', + 'associate': True, + 'metadata': {'key': 'some_key'}, + } + + response = post(sublist_url, params, admin) + assert response.status_code == 201 + + url = response.data['url'] + + # alice can't read the input source directly because she can't read the target cred + detail = get(url, alice) + assert detail.status_code == 403 + + # alice can read the input source directly + vault_credential.read_role.members.add(alice) + detail = get(url, alice) + assert detail.status_code == 200 + + # she can also see it on the credential sublist + response = get(sublist_url, admin) + assert response.status_code == 200 + assert response.data['count'] == 1 + + # alice can't change or delete the input source because she can't change the target cred + assert patch(url, {'input_field_name': 'vault_id'}, alice).status_code == 403 + assert delete(url, alice).status_code == 403 + + # alice can admin the target cred, so she can change the input field name + vault_credential.admin_role.members.add(alice) + external_credential.use_role.members.add(alice) + assert patch(url, {'input_field_name': 'vault_id'}, alice).status_code == 200 + assert CredentialInputSource.objects.first().input_field_name == 'vault_id' + + # she _cannot_, however, apply a source credential she doesn't have access to + assert patch(url, {'source_credential': other_external_credential.pk}, alice).status_code == 403 + + assert delete(url, alice).status_code == 204 + assert CredentialInputSource.objects.count() == 0 + + +@pytest.mark.django_db +def test_input_source_rbac_swap_target_credential(get, post, put, patch, admin, alice, + machine_credential, vault_credential, + external_credential): + # If you change the target credential for an input source, + # you have to have admin role on the *original* credential (so you can + # remove the relationship) *and* on the *new* credential (so you can apply the + # new relationship) + sublist_url = reverse( + 'api:credential_input_source_sublist', + kwargs={'version': 'v2', 'pk': vault_credential.pk} + ) + params = { + 'source_credential': external_credential.pk, + 'input_field_name': 'vault_password', + 'associate': True, + 'metadata': {'key': 'some_key'}, + } + + response = post(sublist_url, params, admin) + assert response.status_code == 201 + url = response.data['url'] + + # alice can't change target cred because she can't admin either one + assert patch(url, { + 'target_credential': machine_credential.pk, + 'input_field_name': 'password' + }, alice).status_code == 403 + + # alice still can't change target cred because she can't admin *the new one* + vault_credential.admin_role.members.add(alice) + assert patch(url, { + 'target_credential': machine_credential.pk, + 'input_field_name': 'password' + }, alice).status_code == 403 + + machine_credential.admin_role.members.add(alice) + assert patch(url, { + 'target_credential': machine_credential.pk, + 'input_field_name': 'password' + }, alice).status_code == 200 + + @pytest.mark.django_db def test_create_credential_input_source_with_non_external_source_returns_400(post, admin, credential, vault_credential): sublist_url = reverse(