From c15d48a640be701d3876992552549f84bfbef37f Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Wed, 2 Mar 2016 16:36:16 -0500 Subject: [PATCH] Locked down user/team role listing and role membership management api endpoints --- awx/api/views.py | 44 +++-- awx/main/access.py | 18 +- awx/main/models/rbac.py | 5 + awx/main/tests/functional/test_rbac_api.py | 216 +++++++++++++++------ 4 files changed, 198 insertions(+), 85 deletions(-) diff --git a/awx/api/views.py b/awx/api/views.py index 45c854b5f3..2a9c2b7228 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -730,11 +730,8 @@ class TeamRolesList(SubListCreateAttachDetachAPIView): relationship='member_role.children' def get_queryset(self): - # XXX: This needs to be the intersection between - # what roles the user has and what roles the viewer - # has access to see. team = Team.objects.get(pk=self.kwargs['pk']) - return team.member_role.children + return team.member_role.children.filter(id__in=Role.visible_roles(self.request.user)) # XXX: Need to enforce permissions def post(self, request, *args, **kwargs): @@ -979,13 +976,11 @@ class UserRolesList(SubListCreateAttachDetachAPIView): serializer_class = RoleSerializer parent_model = User relationship='roles' + permission_classes = (IsAuthenticated,) def get_queryset(self): - # XXX: This needs to be the intersection between - # what roles the user has and what roles the viewer - # has access to see. - u = User.objects.get(pk=self.kwargs['pk']) - return u.roles + #u = User.objects.get(pk=self.kwargs['pk']) + return Role.visible_roles(self.request.user).filter(members__in=[int(self.kwargs['pk']), ]) def post(self, request, *args, **kwargs): # Forbid implicit role creation here @@ -995,6 +990,10 @@ class UserRolesList(SubListCreateAttachDetachAPIView): return Response(data, status=status.HTTP_400_BAD_REQUEST) return super(type(self), self).post(request, *args, **kwargs) + def check_parent_access(self, parent=None): + # We hide roles that shouldn't be seen in our queryset + return True + class UserProjectsList(SubListAPIView): @@ -3162,29 +3161,27 @@ class SettingsReset(APIView): TowerSettings.objects.filter(key=settings_key).delete() return Response(status=status.HTTP_204_NO_CONTENT) -#class RoleList(ListCreateAPIView): + class RoleList(ListAPIView): model = Role serializer_class = RoleSerializer + permission_classes = (IsAuthenticated,) new_in_300 = True - # XXX: Permissions - only roles the user has access to see should be listed here def get_queryset(self): - return Role.objects + if self.request.user.is_superuser: + return Role.objects + return Role.visible_roles(self.request.user) - # XXX: Need to define who can create custom roles, and then restrict access - # appropriately - # XXX: Need to define how we want to deal with administration of custom roles. -class RoleDetail(RetrieveUpdateAPIView): +class RoleDetail(RetrieveAPIView): model = Role serializer_class = RoleSerializer + permission_classes = (IsAuthenticated,) new_in_300 = True - # XXX: Permissions - only appropriate people should be able to change these - class RoleUsersList(SubListCreateAttachDetachAPIView): @@ -3192,6 +3189,8 @@ class RoleUsersList(SubListCreateAttachDetachAPIView): serializer_class = UserSerializer parent_model = Role relationship = 'members' + permission_classes = (IsAuthenticated,) + new_in_300 = True def get_queryset(self): # XXX: Access control @@ -3213,6 +3212,8 @@ class RoleTeamsList(ListAPIView): serializer_class = TeamSerializer parent_model = Role relationship = 'member_role.parents' + permission_classes = (IsAuthenticated,) + new_in_300 = True def get_queryset(self): # TODO: Check @@ -3243,6 +3244,8 @@ class RoleParentsList(SubListAPIView): serializer_class = RoleSerializer parent_model = Role relationship = 'parents' + permission_classes = (IsAuthenticated,) + new_in_300 = True def get_queryset(self): # XXX: This should be the intersection between the roles of the user @@ -3256,6 +3259,8 @@ class RoleChildrenList(SubListAPIView): serializer_class = RoleSerializer parent_model = Role relationship = 'children' + permission_classes = (IsAuthenticated,) + new_in_300 = True def get_queryset(self): # XXX: This should be the intersection between the roles of the user @@ -3267,6 +3272,7 @@ class ResourceDetail(RetrieveAPIView): model = Resource serializer_class = ResourceSerializer + permission_classes = (IsAuthenticated,) new_in_300 = True # XXX: Permissions - only roles the user has access to see should be listed here @@ -3277,6 +3283,7 @@ class ResourceList(ListAPIView): model = Resource serializer_class = ResourceSerializer + permission_classes = (IsAuthenticated,) new_in_300 = True def get_queryset(self): @@ -3286,6 +3293,7 @@ class ResourceAccessList(ListAPIView): model = User serializer_class = ResourceAccessListElementSerializer + permission_classes = (IsAuthenticated,) new_in_300 = True def get_queryset(self): diff --git a/awx/main/access.py b/awx/main/access.py index 84eb3957a9..96f632e832 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1695,23 +1695,31 @@ class RoleAccess(BaseAccess): def get_queryset(self): if self.user.is_superuser: return self.model.objects.all() - return self.model.objects.none() + return self.model.visible_roles(self.user) def can_change(self, obj, data): return self.user.is_superuser def can_add(self, obj, data): - return self.user.is_superuser + # Unsupported for now + return False def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - return self.user.is_superuser + return self.can_unattach(obj, sub_obj, relationship) def can_unattach(self, obj, sub_obj, relationship): - return self.user.is_superuser + if self.user.is_superuser: + return True + if obj.object_id and \ + isinstance(obj.content_object, ResourceMixin) and \ + obj.content_object.accessible_by(self.user, {'write': True}): + return True + return False def can_delete(self, obj): - return self.user.is_superuser + # Unsupported for now + return False class ResourceAccess(BaseAccess): diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 396dcd71c3..0b2fb64290 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -6,6 +6,7 @@ import logging # Django from django.db import models +from django.db.models import Q from django.db.models.aggregates import Max from django.core.urlresolvers import reverse from django.utils.translation import ugettext_lazy as _ @@ -139,6 +140,10 @@ class Role(CommonModelNameNotUnique): setattr(permission, k, int(permissions[k])) permission.save() + @staticmethod + def visible_roles(user): + return Role.objects.filter(Q(descendents__in=user.roles.filter()) | Q(ancestors__in=user.roles.filter())) + @staticmethod def singleton(name): try: diff --git a/awx/main/tests/functional/test_rbac_api.py b/awx/main/tests/functional/test_rbac_api.py index 0cb3166e7c..c99c49aad3 100644 --- a/awx/main/tests/functional/test_rbac_api.py +++ b/awx/main/tests/functional/test_rbac_api.py @@ -2,7 +2,7 @@ import mock # noqa import pytest from django.core.urlresolvers import reverse -from awx.main.models.rbac import Role +from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR def mock_feature_enabled(feature, bypass_database=None): return True @@ -24,39 +24,55 @@ def test_get_roles_list_admin(organization, get, admin): assert roles['count'] > 0 @pytest.mark.django_db -@pytest.mark.skipif(True, reason='Unimplemented') -def test_get_roles_list_user(organization, get, user): +def test_get_roles_list_user(organization, inventory, team, get, user): 'Users can see all roles they have access to, but not all roles' - assert False + this_user = user('user-test_get_roles_list_user') + organization.member_role.members.add(this_user) + custom_role = Role.objects.create(name='custom_role-test_get_roles_list_user') + organization.member_role.children.add(custom_role) + + url = reverse('api:role_list') + response = get(url, this_user) + assert response.status_code == 200 + roles = response.data + assert roles['count'] > 0 + assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid + + role_hash = {} + + for r in roles['results']: + role_hash[r['id']] = r + + assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id in role_hash + assert organization.admin_role.id in role_hash + assert organization.member_role.id in role_hash + assert this_user.resource.admin_role.id in role_hash + assert custom_role.id in role_hash + + assert inventory.admin_role.id not in role_hash + assert team.member_role.id not in role_hash + + @pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_create_role(post, admin): - 'Admins can create new roles' - #u = user('admin', True) +def test_cant_create_role(post, admin): + "Ensure we can't create new roles through the api" + # Some day we might want to do this, but until that is speced out, lets + # ensure we don't slip up and allow this implicitly through some helper or + # another response = post(reverse('api:role_list'), {'name': 'New Role'}, admin) - assert response.status_code == 201 + assert response.status_code == 405 @pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_delete_role(post, admin): - 'Admins can delete a custom role' - assert False - - -@pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_user_create_role(organization, get, user): - 'User can create custom roles' - assert False - -@pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_user_delete_role(organization, get, user): - 'User can delete their custom roles, but not any old row' - assert False +def test_cant_delete_role(delete, admin): + "Ensure we can't delete roles through the api" + # Some day we might want to do this, but until that is speced out, lets + # ensure we don't slip up and allow this implicitly through some helper or + # another + response = delete(reverse('api:role_detail', args=(admin.resource.admin_role.id,)), admin) + assert response.status_code == 405 @@ -72,6 +88,53 @@ def test_get_user_roles_list(get, admin): roles = response.data assert roles['count'] > 0 # 'System Administrator' role if nothing else +@pytest.mark.django_db +def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob): + 'Users can see roles for other users, but only the roles that that user has access to see as well' + organization.member_role.members.add(alice) + organization.admins.add(bob) + custom_role = Role.objects.create(name='custom_role-test_user_view_admin_roles_list') + organization.member_role.children.add(custom_role) + team.users.add(bob) + + # alice and bob are in the same org and can see some child role of that org. + # Bob is an org admin, alice can see this. + # Bob is in a team that alice is not, alice cannot see that bob is a member of that team. + + url = reverse('api:user_roles_list', args=(bob.id,)) + response = get(url, alice) + assert response.status_code == 200 + roles = response.data + assert roles['count'] > 0 + assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid + + role_hash = {} + for r in roles['results']: + role_hash[r['id']] = r['name'] + + assert organization.admin_role.id in role_hash + assert custom_role.id not in role_hash # doesn't show up in the user roles list, not an explicit grant + assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id not in role_hash + assert inventory.admin_role.id not in role_hash + assert team.member_role.id not in role_hash # alice can't see this + + # again but this time alice is part of the team, and should be able to see the team role + team.users.add(alice) + response = get(url, alice) + assert response.status_code == 200 + roles = response.data + assert roles['count'] > 0 + assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid + + role_hash = {} + for r in roles['results']: + role_hash[r['id']] = r['name'] + + assert team.member_role.id in role_hash # Alice can now see this + + + + @pytest.mark.django_db def test_add_role_to_user(role, post, admin): assert admin.roles.filter(id=role.id).count() == 0 @@ -165,15 +228,15 @@ def test_get_role(get, admin, role): def test_put_role(put, admin, role): url = reverse('api:role_detail', args=(role.id,)) response = put(url, {'name': 'Some new name'}, admin) - assert response.status_code == 200 - r = Role.objects.get(id=role.id) - assert r.name == 'Some new name' + assert response.status_code == 405 + #r = Role.objects.get(id=role.id) + #assert r.name == 'Some new name' @pytest.mark.django_db def test_put_role_access_denied(put, alice, admin, role): url = reverse('api:role_detail', args=(role.id,)) response = put(url, {'name': 'Some new name'}, alice) - assert response.status_code == 403 + assert response.status_code == 403 or response.status_code == 405 # @@ -204,6 +267,67 @@ def test_remove_user_to_role(post, admin, role): post(url, {'disassociate': True, 'id': admin.id}, admin) assert role.members.filter(id=admin.id).count() == 0 +@pytest.mark.django_db +def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplate, user): + 'Tests that a user with permissions to assign/revoke membership to a particular role can do so' + org_admin = user('org-admin') + joe = user('joe') + organization.admins.add(org_admin) + + assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False + + post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, org_admin) + + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True + + +@pytest.mark.django_db +def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemplate, user): + 'Tests that a user with permissions to assign/revoke membership to a particular role can do so' + org_admin = user('org-admin') + joe = user('joe') + organization.admins.add(org_admin) + check_jobtemplate.executor_role.members.add(joe) + + assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True + + post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, org_admin) + + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False + +@pytest.mark.django_db +def test_user_fail_to_add_user_to_job_template(post, organization, check_jobtemplate, user): + 'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so' + rando = user('rando') + joe = user('joe') + + assert check_jobtemplate.accessible_by(rando, {'write': True}) is False + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False + + res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, rando) + assert res.status_code == 403 + + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False + + +@pytest.mark.django_db +def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobtemplate, user): + 'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so' + rando = user('rando') + joe = user('joe') + check_jobtemplate.executor_role.members.add(joe) + + assert check_jobtemplate.accessible_by(rando, {'write': True}) is False + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True + + res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, rando) + assert res.status_code == 403 + + assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True + + # # /roles//teams/ # @@ -252,22 +376,6 @@ def test_role_parents(get, team, admin, role): assert response.data['count'] == 1 assert response.data['results'][0]['id'] == team.member_role.id -@pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_role_add_parent(post, team, admin, role): - assert role.parents.count() == 0 - url = reverse('api:role_parents_list', args=(role.id,)) - post(url, {'id': team.member_role.id}, admin) - assert role.parents.count() == 1 - -@pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_role_remove_parent(post, team, admin, role): - role.parents.add(team.member_role) - assert role.parents.count() == 1 - url = reverse('api:role_parents_list', args=(role.id,)) - post(url, {'disassociate': True, 'id': team.member_role.id}, admin) - assert role.parents.count() == 0 # # /roles//children/ @@ -282,22 +390,6 @@ def test_role_children(get, team, admin, role): assert response.data['count'] == 1 assert response.data['results'][0]['id'] == role.id -@pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_role_add_children(post, team, admin, role): - assert role.children.count() == 0 - url = reverse('api:role_children_list', args=(role.id,)) - post(url, {'id': team.member_role.id}, admin) - assert role.children.count() == 1 - -@pytest.mark.django_db -@pytest.mark.skipif(True, reason='Waiting on custom role requirements') -def test_role_remove_children(post, team, admin, role): - role.children.add(team.member_role) - assert role.children.count() == 1 - url = reverse('api:role_children_list', args=(role.id,)) - post(url, {'disassociate': True, 'id': team.member_role.id}, admin) - assert role.children.count() == 0