diff --git a/awx/api/serializers.py b/awx/api/serializers.py index b460020e0a..0fc9baa6e5 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -942,7 +942,6 @@ class UserSerializer(BaseSerializer): roles = self.reverse('api:user_roles_list', kwargs={'pk': obj.pk}), activity_stream = self.reverse('api:user_activity_stream_list', kwargs={'pk': obj.pk}), access_list = self.reverse('api:user_access_list', kwargs={'pk': obj.pk}), - applications = self.reverse('api:o_auth2_application_list', kwargs={'pk': obj.pk}), tokens = self.reverse('api:o_auth2_token_list', kwargs={'pk': obj.pk}), authorized_tokens = self.reverse('api:user_authorized_token_list', kwargs={'pk': obj.pk}), personal_tokens = self.reverse('api:o_auth2_personal_token_list', kwargs={'pk': obj.pk}), @@ -991,7 +990,7 @@ class UserAuthorizedTokenSerializer(BaseSerializer): model = OAuth2AccessToken fields = ( '*', '-name', 'description', 'user', 'token', 'refresh_token', - 'expires', 'scope', 'application', + 'expires', 'scope', 'application' ) read_only_fields = ('user', 'token', 'expires') @@ -1016,7 +1015,8 @@ class UserAuthorizedTokenSerializer(BaseSerializer): return '' def create(self, validated_data): - validated_data['user'] = self.context['request'].user + current_user = self.context['request'].user + validated_data['user'] = current_user validated_data['token'] = generate_token() validated_data['expires'] = now() + timedelta( seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS @@ -1025,7 +1025,7 @@ class UserAuthorizedTokenSerializer(BaseSerializer): obj.save() if obj.application is not None: RefreshToken.objects.create( - user=self.context['request'].user, + user=current_user, token=generate_token(), application=obj.application, access_token=obj @@ -1040,13 +1040,14 @@ class OAuth2ApplicationSerializer(BaseSerializer): class Meta: model = OAuth2Application fields = ( - '*', 'description', 'user', 'client_id', 'client_secret', 'client_type', - 'redirect_uris', 'authorization_grant_type', 'skip_authorization', + '*', 'description', '-user', 'client_id', 'client_secret', 'client_type', + 'redirect_uris', 'authorization_grant_type', 'skip_authorization', 'organization' ) read_only_fields = ('client_id', 'client_secret') read_only_on_update_fields = ('user', 'authorization_grant_type') extra_kwargs = { - 'user': {'allow_null': False, 'required': True}, + 'user': {'allow_null': True, 'required': False}, + 'organization': {'allow_null': False}, 'authorization_grant_type': {'allow_null': False} } @@ -1104,6 +1105,10 @@ class OAuth2TokenSerializer(BaseSerializer): 'application', 'expires', 'scope', ) read_only_fields = ('user', 'token', 'expires') + extra_kwargs = { + 'scope': {'allow_null': False, 'required': True}, + 'user': {'allow_null': False, 'required': True} + } def get_modified(self, obj): if obj is None: @@ -1162,7 +1167,8 @@ class OAuth2TokenSerializer(BaseSerializer): return value def create(self, validated_data): - validated_data['user'] = self.context['request'].user + current_user = self.context['request'].user + validated_data['user'] = current_user validated_data['token'] = generate_token() validated_data['expires'] = now() + timedelta( seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS @@ -1173,7 +1179,7 @@ class OAuth2TokenSerializer(BaseSerializer): obj.save() if obj.application is not None: RefreshToken.objects.create( - user=obj.application.user if obj.application.user else None, + user=current_user, token=generate_token(), application=obj.application, access_token=obj @@ -1195,10 +1201,13 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer): class Meta: model = OAuth2AccessToken fields = ( - '*', '-name', 'description', 'user', 'token', 'refresh_token', + '*', '-name', 'description', '-user', 'token', 'refresh_token', 'expires', 'scope', 'application', ) read_only_fields = ('user', 'token', 'expires') + extra_kwargs = { + 'scope': {'allow_null': False, 'required': True} + } def get_token(self, obj): request = self.context.get('request', None) @@ -1221,7 +1230,8 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer): return '' def create(self, validated_data): - validated_data['user'] = self.context['request'].user + current_user = self.context['request'].user + validated_data['user'] = current_user validated_data['token'] = generate_token() validated_data['expires'] = now() + timedelta( seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS @@ -1232,7 +1242,7 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer): obj.save() if obj.application is not None: RefreshToken.objects.create( - user=obj.application.user if obj.application.user else None, + user=current_user, token=generate_token(), application=obj.application, access_token=obj @@ -1252,6 +1262,9 @@ class OAuth2PersonalTokenSerializer(BaseSerializer): 'application', 'expires', 'scope', ) read_only_fields = ('user', 'token', 'expires', 'application') + extra_kwargs = { + 'scope': {'allow_null': False, 'required': True} + } def get_modified(self, obj): if obj is None: @@ -1290,6 +1303,7 @@ class OAuth2PersonalTokenSerializer(BaseSerializer): validated_data['expires'] = now() + timedelta( seconds=oauth2_settings.ACCESS_TOKEN_EXPIRE_SECONDS ) + validated_data['application'] = None obj = super(OAuth2PersonalTokenSerializer, self).create(validated_data) obj.save() return obj @@ -1312,6 +1326,7 @@ class OrganizationSerializer(BaseSerializer): admins = self.reverse('api:organization_admins_list', kwargs={'pk': obj.pk}), teams = self.reverse('api:organization_teams_list', kwargs={'pk': obj.pk}), credentials = self.reverse('api:organization_credential_list', kwargs={'pk': obj.pk}), + applications = self.reverse('api:organization_applications_list', kwargs={'pk': obj.pk}), activity_stream = self.reverse('api:organization_activity_stream_list', kwargs={'pk': obj.pk}), notification_templates = self.reverse('api:organization_notification_templates_list', kwargs={'pk': obj.pk}), notification_templates_any = self.reverse('api:organization_notification_templates_any_list', kwargs={'pk': obj.pk}), diff --git a/awx/api/urls/organization.py b/awx/api/urls/organization.py index b17ffce1fa..911143bb86 100644 --- a/awx/api/urls/organization.py +++ b/awx/api/urls/organization.py @@ -21,6 +21,7 @@ from awx.api.views import ( OrganizationInstanceGroupsList, OrganizationObjectRolesList, OrganizationAccessList, + OrganizationApplicationList, ) @@ -45,6 +46,7 @@ urls = [ url(r'^(?P[0-9]+)/instance_groups/$', OrganizationInstanceGroupsList.as_view(), name='organization_instance_groups_list'), url(r'^(?P[0-9]+)/object_roles/$', OrganizationObjectRolesList.as_view(), name='organization_object_roles_list'), url(r'^(?P[0-9]+)/access_list/$', OrganizationAccessList.as_view(), name='organization_access_list'), + url(r'^(?P[0-9]+)/applications/$', OrganizationApplicationList.as_view(), name='organization_applications_list'), ] __all__ = ['urls'] diff --git a/awx/api/views.py b/awx/api/views.py index d3378c127d..82e017f0ec 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -1598,6 +1598,18 @@ class UserAuthorizedTokenList(SubListCreateAPIView): def get_queryset(self): return get_access_token_model().objects.filter(application__isnull=False, user=self.request.user) + + +class OrganizationApplicationList(SubListCreateAPIView): + + view_name = _("Organization OAuth2 Applications") + + model = OAuth2Application + serializer_class = OAuth2ApplicationSerializer + parent_model = Organization + relationship = 'applications' + parent_key = 'organization' + swagger_topic = 'Authentication' class OAuth2PersonalTokenList(SubListCreateAPIView): diff --git a/awx/main/access.py b/awx/main/access.py index 62c52a41e6..23f182177d 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -580,69 +580,77 @@ class UserAccess(BaseAccess): class OAuth2ApplicationAccess(BaseAccess): ''' - I can read, change or delete OAuth applications when: + I can read, change or delete OAuth 2 applications when: - I am a superuser. - I am the admin of the organization of the user of the application. - - I am the user of the application. - I can create OAuth applications when: + - I am a user in the organization of the application. + I can create OAuth 2 applications when: - I am a superuser. - - I am the admin of the organization of the user of the application. + - I am the admin of the organization of the application. ''' model = OAuth2Application select_related = ('user',) def filtered_queryset(self): - accessible_users = User.objects.filter( - pk__in=self.user.admin_of_organizations.values('member_role__members') - ) | User.objects.filter(pk=self.user.pk) - return self.model.objects.filter(user__in=accessible_users) + return self.model.objects.filter(organization__in=self.user.organizations) def can_change(self, obj, data): - return self.can_read(obj) + return self.user.is_superuser or self.check_related('organization', Organization, data, obj=obj, + role_field='admin_role', mandatory=True) def can_delete(self, obj): - return self.can_read(obj) + return self.user.is_superuser or obj.organization in self.user.admin_of_organizations def can_add(self, data): if self.user.is_superuser: - return True - user = get_object_from_data('user', User, data) - if not user: - return False - return set(self.user.admin_of_organizations.all()) & set(user.organizations.all()) + return True + if not data: + return Organization.accessible_objects(self.user, 'admin_role').exists() + return self.check_related('organization', Organization, data, role_field='admin_role', mandatory=True) class OAuth2TokenAccess(BaseAccess): ''' - I can read, change or delete an OAuth2 token when: + I can read, change or delete an app token when: - I am a superuser. - - I am the admin of the organization of the user of the token. + - I am the admin of the organization of the application of the token. - I am the user of the token. - I can create an OAuth token when: + I can create an OAuth2 app token when: - I have the read permission of the related application. + I can read, change or delete a personal token when: + - I am the user of the token + - I am the superuser + I can create an OAuth2 Personal Access Token when: + - I am a user. But I can only create a PAT for myself. ''' model = OAuth2AccessToken + select_related = ('user', 'application') - - def filtered_queryset(self): - accessible_users = User.objects.filter( - pk__in=self.user.admin_of_organizations.values('member_role__members') - ) | User.objects.filter(pk=self.user.pk) - return self.model.objects.filter(user__in=accessible_users) - - def can_change(self, obj, data): - return self.can_read(obj) - + + def filtered_queryset(self): + org_access_qs = Organization.objects.filter( + Q(admin_role__members=self.user) | Q(auditor_role__members=self.user)) + return self.model.objects.filter(application__organization__in=org_access_qs) | self.model.objects.filter(user__id=self.user.pk) + def can_delete(self, obj): - return self.can_read(obj) + if (self.user.is_superuser) | (obj.user == self.user): + return True + elif not obj.application: + return False + return self.user in obj.application.organization.admin_role + + def can_change(self, obj, data): + return self.can_delete(obj) def can_add(self, data): - app = get_object_from_data('application', OAuth2Application, data) - if not app: - return True - return OAuth2ApplicationAccess(self.user).can_read(app) + if 'application' in data: + app = get_object_from_data('application', OAuth2Application, data) + if app is None: + return True + return OAuth2ApplicationAccess(self.user).can_read(app) + return True class OrganizationAccess(BaseAccess): diff --git a/awx/main/migrations/0028_v330_modify_application.py b/awx/main/migrations/0028_v330_modify_application.py new file mode 100644 index 0000000000..a54a3a9527 --- /dev/null +++ b/awx/main/migrations/0028_v330_modify_application.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-03-16 20:25 +from __future__ import unicode_literals + +import awx.main.fields +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0027_v330_add_tower_verify'), + ] + + operations = [ + migrations.AddField( + model_name='oauth2application', + name='organization', + field=models.ForeignKey(help_text='Organization containing this application.', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='applications', to='main.Organization'), + ), + ] diff --git a/awx/main/models/oauth.py b/awx/main/models/oauth.py index a1c13a23cd..8626ca51c3 100644 --- a/awx/main/models/oauth.py +++ b/awx/main/models/oauth.py @@ -31,6 +31,13 @@ class OAuth2Application(AbstractApplication): editable=False, validators=[RegexValidator(DATA_URI_RE)], ) + organization = models.ForeignKey( + 'Organization', + related_name='applications', + help_text=_('Organization containing this application.'), + on_delete=models.CASCADE, + null=True, + ) class OAuth2AccessToken(AbstractAccessToken): diff --git a/awx/main/signals.py b/awx/main/signals.py index a6fc73f5d9..c79b1742aa 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -634,12 +634,3 @@ def create_access_token_user_if_missing(sender, **kwargs): post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken) -# @receiver(post_save, sender=User) -# def create_default_oauth_app(sender, **kwargs): -# if kwargs.get('created', False): -# user = kwargs['instance'] -# OAuth2Application.objects.create( -# name='Default application for {}'.format(user.username), -# user=user, client_type='confidential', redirect_uris='', -# authorization_grant_type='password' -# ) diff --git a/awx/main/tests/functional/api/test_oauth.py b/awx/main/tests/functional/api/test_oauth.py index 15362e71be..a417eb539f 100644 --- a/awx/main/tests/functional/api/test_oauth.py +++ b/awx/main/tests/functional/api/test_oauth.py @@ -19,44 +19,42 @@ def test_personal_access_token_creation(oauth_application, post, alice): oauth_application.client_id, oauth_application.client_secret ])) ) - resp_json = resp._container[0] assert 'access_token' in resp_json assert 'scope' in resp_json assert 'refresh_token' in resp_json - + @pytest.mark.django_db -def test_oauth_application_create(admin, post): +def test_oauth_application_create(admin, organization, post): response = post( reverse('api:o_auth2_application_list'), { 'name': 'test app', - 'user': admin.pk, + 'organization': organization.pk, 'client_type': 'confidential', 'authorization_grant_type': 'password', }, admin, expect=201 ) assert 'modified' in response.data assert 'updated' not in response.data - assert 'user' in response.data['related'] created_app = Application.objects.get(client_id=response.data['client_id']) assert created_app.name == 'test app' - assert created_app.user == admin assert created_app.skip_authorization is False assert created_app.redirect_uris == '' assert created_app.client_type == 'confidential' assert created_app.authorization_grant_type == 'password' + assert created_app.organization == organization @pytest.mark.django_db -def test_oauth_application_update(oauth_application, patch, admin, alice): +def test_oauth_application_update(oauth_application, organization, patch, admin, alice): patch( reverse('api:o_auth2_application_detail', kwargs={'pk': oauth_application.pk}), { 'name': 'Test app with immutable grant type and user', + 'organization': organization.pk, 'redirect_uris': 'http://localhost/api/', 'authorization_grant_type': 'implicit', 'skip_authorization': True, - 'user': alice.pk, }, admin, expect=200 ) updated_app = Application.objects.get(client_id=oauth_application.client_id) @@ -64,7 +62,7 @@ def test_oauth_application_update(oauth_application, patch, admin, alice): assert updated_app.redirect_uris == 'http://localhost/api/' assert updated_app.skip_authorization is True assert updated_app.authorization_grant_type == 'password' - assert updated_app.user == admin + assert updated_app.organization == organization @pytest.mark.django_db diff --git a/awx/main/tests/functional/api/test_organizations.py b/awx/main/tests/functional/api/test_organizations.py index 9ec6787d53..43a9ffb1e5 100644 --- a/awx/main/tests/functional/api/test_organizations.py +++ b/awx/main/tests/functional/api/test_organizations.py @@ -131,7 +131,7 @@ def test_organization_inventory_list(organization, inventory_factory, get, alice assert get(reverse('api:organization_inventories_list', kwargs={'pk': organization.id}), user=alice).data['count'] == 2 assert get(reverse('api:organization_inventories_list', kwargs={'pk': organization.id}), user=bob).data['count'] == 1 get(reverse('api:organization_inventories_list', kwargs={'pk': organization.id}), user=rando, expect=403) - + @pytest.mark.django_db @mock.patch('awx.api.views.feature_enabled', lambda feature: True) diff --git a/awx/main/tests/functional/test_rbac_oauth.py b/awx/main/tests/functional/test_rbac_oauth.py index 4aabd74f1e..8f673cab80 100644 --- a/awx/main/tests/functional/test_rbac_oauth.py +++ b/awx/main/tests/functional/test_rbac_oauth.py @@ -12,105 +12,203 @@ from awx.api.versioning import reverse @pytest.mark.django_db -class TestOAuthApplication: +class TestOAuth2Application: + + @pytest.mark.parametrize("user_for_access, can_access_list", [ + (0, [True, True]), + (1, [True, True]), + (2, [True, True]), + (3, [False, False]), + ]) + def test_can_read( + self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization + ): + user_list = [admin, org_admin, org_member, alice] + access = OAuth2ApplicationAccess(user_list[user_for_access]) + app_creation_user_list = [admin, org_admin] + for user, can_access in zip(app_creation_user_list, can_access_list): + app = Application.objects.create( + name='test app for {}'.format(user.username), user=user, + client_type='confidential', authorization_grant_type='password', organization=organization + ) + assert access.can_read(app) is can_access + + + def test_can_edit_delete_app_org_admin( + self, admin, org_admin, org_member, alice, organization + ): + user_list = [admin, org_admin, org_member, alice] + can_access_list = [True, True, False, False] + for user, can_access in zip(user_list, can_access_list): + app = Application.objects.create( + name='test app for {}'.format(org_admin.username), user=org_admin, + client_type='confidential', authorization_grant_type='password', organization=organization + ) + access = OAuth2ApplicationAccess(user) + assert access.can_change(app, {}) is can_access + assert access.can_delete(app) is can_access + + + def test_can_edit_delete_app_admin( + self, admin, org_admin, org_member, alice, organization + ): + user_list = [admin, org_admin, org_member, alice] + can_access_list = [True, True, False, False] + for user, can_access in zip(user_list, can_access_list): + app = Application.objects.create( + name='test app for {}'.format(admin.username), user=admin, + client_type='confidential', authorization_grant_type='password', organization=organization + ) + access = OAuth2ApplicationAccess(user) + assert access.can_change(app, {}) is can_access + assert access.can_delete(app) is can_access + - @pytest.mark.parametrize("user_for_access, can_access_list", [ - (0, [True, True, True, True]), - (1, [False, True, True, False]), - (2, [False, False, True, False]), - (3, [False, False, False, True]), - ]) - def test_can_read_change_delete( - self, admin, org_admin, org_member, alice, user_for_access, can_access_list - ): - user_list = [admin, org_admin, org_member, alice] - access = OAuth2ApplicationAccess(user_list[user_for_access]) - for user, can_access in zip(user_list, can_access_list): - app = Application.objects.create( - name='test app for {}'.format(user.username), user=user, - client_type='confidential', authorization_grant_type='password' - ) - assert access.can_read(app) is can_access - assert access.can_change(app, {}) is can_access - assert access.can_delete(app) is can_access - - def test_superuser_can_always_create(self, admin, org_admin, org_member, alice): - access = OAuth2ApplicationAccess(admin) - for user in [admin, org_admin, org_member, alice]: - assert access.can_add({ - 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', - 'authorization_grant_type': 'password' - }) - - def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice): - for access_user in [org_member, alice]: - access = OAuth2ApplicationAccess(access_user) + def test_superuser_can_always_create(self, admin, org_admin, org_member, alice): + access = OAuth2ApplicationAccess(admin) for user in [admin, org_admin, org_member, alice]: - assert not access.can_add({ + assert access.can_add({ 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', - 'authorization_grant_type': 'password' + 'authorization_grant_type': 'password', 'organization': 1 }) - - def test_org_admin_can_create_in_org(self, admin, org_admin, org_member, alice): - access = OAuth2ApplicationAccess(org_admin) - for user in [admin, alice]: - assert not access.can_add({ - 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', - 'authorization_grant_type': 'password' - }) - for user in [org_admin, org_member]: - assert access.can_add({ - 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', - 'authorization_grant_type': 'password' - }) + + def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice): + for access_user in [org_member, alice]: + access = OAuth2ApplicationAccess(access_user) + for user in [admin, org_admin, org_member, alice]: + assert not access.can_add({ + 'name': 'test app', 'user': user.pk, 'client_type': 'confidential', + 'authorization_grant_type': 'password', 'organization': 1 + }) -@pytest.mark.skip(reason="Needs Update - CA") @pytest.mark.django_db -class TestOAuthToken: - - @pytest.mark.parametrize("user_for_access, can_access_list", [ - (0, [True, True, True, True]), - (1, [False, True, True, False]), - (2, [False, False, True, False]), - (3, [False, False, False, True]), - ]) - def test_can_read_change_delete( - self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list +class TestOAuth2Token: + + def test_can_read_change_delete_app_token( + self, post, admin, org_admin, org_member, alice, organization ): user_list = [admin, org_admin, org_member, alice] - access = OAuth2TokenAccess(user_list[user_for_access]) + can_access_list = [True, True, False, False] + app = Application.objects.create( + name='test app for {}'.format(admin.username), user=admin, + client_type='confidential', authorization_grant_type='password', + organization=organization + ) + response = post( + reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), + {'scope': 'read'}, admin, expect=201 + ) for user, can_access in zip(user_list, can_access_list): - app = Application.objects.create( - name='test app for {}'.format(user.username), user=user, - client_type='confidential', authorization_grant_type='password' - ) - response = post( - reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), - {'scope': 'read'}, admin, expect=201 - ) token = AccessToken.objects.get(token=response.data['token']) - - assert access.can_read(token) is can_access # TODO: fix this test + access = OAuth2TokenAccess(user) + assert access.can_read(token) is can_access assert access.can_change(token, {}) is can_access assert access.can_delete(token) is can_access + + def test_auditor_can_read( + self, post, admin, org_admin, org_member, alice, system_auditor, organization + ): + user_list = [admin, org_admin, org_member] + can_access_list = [True, True, True] + cannot_access_list = [False, False, False] + app = Application.objects.create( + name='test app for {}'.format(admin.username), user=admin, + client_type='confidential', authorization_grant_type='password', + organization=organization + ) + for user, can_access, cannot_access in zip(user_list, can_access_list, cannot_access_list): + response = post( + reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), + {'scope': 'read'}, user, expect=201 + ) + token = AccessToken.objects.get(token=response.data['token']) + access = OAuth2TokenAccess(system_auditor) + assert access.can_read(token) is can_access + assert access.can_change(token, {}) is cannot_access + assert access.can_delete(token) is cannot_access + + def test_user_auditor_can_change( + self, post, org_member, org_admin, system_auditor, organization + ): + app = Application.objects.create( + name='test app for {}'.format(org_admin.username), user=org_admin, + client_type='confidential', authorization_grant_type='password', + organization=organization + ) + response = post( + reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), + {'scope': 'read'}, org_member, expect=201 + ) + token = AccessToken.objects.get(token=response.data['token']) + access = OAuth2TokenAccess(system_auditor) + assert access.can_read(token) is True + assert access.can_change(token, {}) is False + assert access.can_delete(token) is False + dual_user = system_auditor + organization.admin_role.members.add(dual_user) + access = OAuth2TokenAccess(dual_user) + assert access.can_read(token) is True + assert access.can_change(token, {}) is True + assert access.can_delete(token) is True + + + + def test_can_read_change_delete_personal_token_org_member( + self, post, admin, org_admin, org_member, alice + ): + # Tests who can read a token created by an org-member + user_list = [admin, org_admin, org_member, alice] + can_access_list = [True, False, True, False] + response = post( + reverse('api:o_auth2_personal_token_list', kwargs={'pk': org_member.pk}), + {'scope': 'read'}, org_member, expect=201 + ) + token = AccessToken.objects.get(token=response.data['token']) + for user, can_access in zip(user_list, can_access_list): + access = OAuth2TokenAccess(user) + assert access.can_read(token) is can_access + assert access.can_change(token, {}) is can_access + assert access.can_delete(token) is can_access + + + def test_can_read_personal_token_creator( + self, post, admin, org_admin, org_member, alice + ): + # Tests the token's creator can read their tokens + user_list = [admin, org_admin, org_member, alice] + can_access_list = [True, True, True, True] + + for user, can_access in zip(user_list, can_access_list): + response = post( + reverse('api:o_auth2_personal_token_list', kwargs={'pk': user.pk}), + {'scope': 'read', 'application':None}, user, expect=201 + ) + token = AccessToken.objects.get(token=response.data['token']) + access = OAuth2TokenAccess(user) + assert access.can_read(token) is can_access + assert access.can_change(token, {}) is can_access + assert access.can_delete(token) is can_access + + @pytest.mark.parametrize("user_for_access, can_access_list", [ - (0, [True, True, True, True]), - (1, [False, True, True, False]), - (2, [False, False, True, False]), - (3, [False, False, False, True]), + (0, [True, True]), + (1, [True, True]), + (2, [True, True]), + (3, [False, False]), ]) def test_can_create( - self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list + self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization ): user_list = [admin, org_admin, org_member, alice] for user, can_access in zip(user_list, can_access_list): app = Application.objects.create( name='test app for {}'.format(user.username), user=user, - client_type='confidential', authorization_grant_type='password' + client_type='confidential', authorization_grant_type='password', organization=organization ) post( reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), {'scope': 'read'}, user_list[user_for_access], expect=201 if can_access else 403 ) + diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 013f831492..645947eb60 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -289,8 +289,9 @@ REST_FRAMEWORK = { 'DEFAULT_PAGINATION_CLASS': 'awx.api.pagination.Pagination', 'PAGE_SIZE': 25, 'DEFAULT_AUTHENTICATION_CLASSES': ( - 'awx.api.authentication.LoggedOAuth2Authentication', 'awx.api.authentication.SessionAuthentication', + 'awx.api.authentication.LoggedOAuth2Authentication', + # 'awx.api.authentication.SessionAuthentication', 'awx.api.authentication.LoggedBasicAuthentication', ), 'DEFAULT_PERMISSION_CLASSES': (