Locked down user/team role listing and role membership management api endpoints

This commit is contained in:
Akita Noek 2016-03-02 16:36:16 -05:00
parent 9699f34976
commit c15d48a640
4 changed files with 198 additions and 85 deletions

View File

@ -730,11 +730,8 @@ class TeamRolesList(SubListCreateAttachDetachAPIView):
relationship='member_role.children'
def get_queryset(self):
# XXX: This needs to be the intersection between
# what roles the user has and what roles the viewer
# has access to see.
team = Team.objects.get(pk=self.kwargs['pk'])
return team.member_role.children
return team.member_role.children.filter(id__in=Role.visible_roles(self.request.user))
# XXX: Need to enforce permissions
def post(self, request, *args, **kwargs):
@ -979,13 +976,11 @@ class UserRolesList(SubListCreateAttachDetachAPIView):
serializer_class = RoleSerializer
parent_model = User
relationship='roles'
permission_classes = (IsAuthenticated,)
def get_queryset(self):
# XXX: This needs to be the intersection between
# what roles the user has and what roles the viewer
# has access to see.
u = User.objects.get(pk=self.kwargs['pk'])
return u.roles
#u = User.objects.get(pk=self.kwargs['pk'])
return Role.visible_roles(self.request.user).filter(members__in=[int(self.kwargs['pk']), ])
def post(self, request, *args, **kwargs):
# Forbid implicit role creation here
@ -995,6 +990,10 @@ class UserRolesList(SubListCreateAttachDetachAPIView):
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(type(self), self).post(request, *args, **kwargs)
def check_parent_access(self, parent=None):
# We hide roles that shouldn't be seen in our queryset
return True
class UserProjectsList(SubListAPIView):
@ -3162,29 +3161,27 @@ class SettingsReset(APIView):
TowerSettings.objects.filter(key=settings_key).delete()
return Response(status=status.HTTP_204_NO_CONTENT)
#class RoleList(ListCreateAPIView):
class RoleList(ListAPIView):
model = Role
serializer_class = RoleSerializer
permission_classes = (IsAuthenticated,)
new_in_300 = True
# XXX: Permissions - only roles the user has access to see should be listed here
def get_queryset(self):
return Role.objects
if self.request.user.is_superuser:
return Role.objects
return Role.visible_roles(self.request.user)
# XXX: Need to define who can create custom roles, and then restrict access
# appropriately
# XXX: Need to define how we want to deal with administration of custom roles.
class RoleDetail(RetrieveUpdateAPIView):
class RoleDetail(RetrieveAPIView):
model = Role
serializer_class = RoleSerializer
permission_classes = (IsAuthenticated,)
new_in_300 = True
# XXX: Permissions - only appropriate people should be able to change these
class RoleUsersList(SubListCreateAttachDetachAPIView):
@ -3192,6 +3189,8 @@ class RoleUsersList(SubListCreateAttachDetachAPIView):
serializer_class = UserSerializer
parent_model = Role
relationship = 'members'
permission_classes = (IsAuthenticated,)
new_in_300 = True
def get_queryset(self):
# XXX: Access control
@ -3213,6 +3212,8 @@ class RoleTeamsList(ListAPIView):
serializer_class = TeamSerializer
parent_model = Role
relationship = 'member_role.parents'
permission_classes = (IsAuthenticated,)
new_in_300 = True
def get_queryset(self):
# TODO: Check
@ -3243,6 +3244,8 @@ class RoleParentsList(SubListAPIView):
serializer_class = RoleSerializer
parent_model = Role
relationship = 'parents'
permission_classes = (IsAuthenticated,)
new_in_300 = True
def get_queryset(self):
# XXX: This should be the intersection between the roles of the user
@ -3256,6 +3259,8 @@ class RoleChildrenList(SubListAPIView):
serializer_class = RoleSerializer
parent_model = Role
relationship = 'children'
permission_classes = (IsAuthenticated,)
new_in_300 = True
def get_queryset(self):
# XXX: This should be the intersection between the roles of the user
@ -3267,6 +3272,7 @@ class ResourceDetail(RetrieveAPIView):
model = Resource
serializer_class = ResourceSerializer
permission_classes = (IsAuthenticated,)
new_in_300 = True
# XXX: Permissions - only roles the user has access to see should be listed here
@ -3277,6 +3283,7 @@ class ResourceList(ListAPIView):
model = Resource
serializer_class = ResourceSerializer
permission_classes = (IsAuthenticated,)
new_in_300 = True
def get_queryset(self):
@ -3286,6 +3293,7 @@ class ResourceAccessList(ListAPIView):
model = User
serializer_class = ResourceAccessListElementSerializer
permission_classes = (IsAuthenticated,)
new_in_300 = True
def get_queryset(self):

View File

@ -1695,23 +1695,31 @@ class RoleAccess(BaseAccess):
def get_queryset(self):
if self.user.is_superuser:
return self.model.objects.all()
return self.model.objects.none()
return self.model.visible_roles(self.user)
def can_change(self, obj, data):
return self.user.is_superuser
def can_add(self, obj, data):
return self.user.is_superuser
# Unsupported for now
return False
def can_attach(self, obj, sub_obj, relationship, data,
skip_sub_obj_read_check=False):
return self.user.is_superuser
return self.can_unattach(obj, sub_obj, relationship)
def can_unattach(self, obj, sub_obj, relationship):
return self.user.is_superuser
if self.user.is_superuser:
return True
if obj.object_id and \
isinstance(obj.content_object, ResourceMixin) and \
obj.content_object.accessible_by(self.user, {'write': True}):
return True
return False
def can_delete(self, obj):
return self.user.is_superuser
# Unsupported for now
return False
class ResourceAccess(BaseAccess):

View File

@ -6,6 +6,7 @@ import logging
# Django
from django.db import models
from django.db.models import Q
from django.db.models.aggregates import Max
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext_lazy as _
@ -139,6 +140,10 @@ class Role(CommonModelNameNotUnique):
setattr(permission, k, int(permissions[k]))
permission.save()
@staticmethod
def visible_roles(user):
return Role.objects.filter(Q(descendents__in=user.roles.filter()) | Q(ancestors__in=user.roles.filter()))
@staticmethod
def singleton(name):
try:

View File

@ -2,7 +2,7 @@ import mock # noqa
import pytest
from django.core.urlresolvers import reverse
from awx.main.models.rbac import Role
from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
def mock_feature_enabled(feature, bypass_database=None):
return True
@ -24,39 +24,55 @@ def test_get_roles_list_admin(organization, get, admin):
assert roles['count'] > 0
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Unimplemented')
def test_get_roles_list_user(organization, get, user):
def test_get_roles_list_user(organization, inventory, team, get, user):
'Users can see all roles they have access to, but not all roles'
assert False
this_user = user('user-test_get_roles_list_user')
organization.member_role.members.add(this_user)
custom_role = Role.objects.create(name='custom_role-test_get_roles_list_user')
organization.member_role.children.add(custom_role)
url = reverse('api:role_list')
response = get(url, this_user)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
role_hash = {}
for r in roles['results']:
role_hash[r['id']] = r
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id in role_hash
assert organization.admin_role.id in role_hash
assert organization.member_role.id in role_hash
assert this_user.resource.admin_role.id in role_hash
assert custom_role.id in role_hash
assert inventory.admin_role.id not in role_hash
assert team.member_role.id not in role_hash
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_create_role(post, admin):
'Admins can create new roles'
#u = user('admin', True)
def test_cant_create_role(post, admin):
"Ensure we can't create new roles through the api"
# Some day we might want to do this, but until that is speced out, lets
# ensure we don't slip up and allow this implicitly through some helper or
# another
response = post(reverse('api:role_list'), {'name': 'New Role'}, admin)
assert response.status_code == 201
assert response.status_code == 405
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_delete_role(post, admin):
'Admins can delete a custom role'
assert False
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_user_create_role(organization, get, user):
'User can create custom roles'
assert False
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_user_delete_role(organization, get, user):
'User can delete their custom roles, but not any old row'
assert False
def test_cant_delete_role(delete, admin):
"Ensure we can't delete roles through the api"
# Some day we might want to do this, but until that is speced out, lets
# ensure we don't slip up and allow this implicitly through some helper or
# another
response = delete(reverse('api:role_detail', args=(admin.resource.admin_role.id,)), admin)
assert response.status_code == 405
@ -72,6 +88,53 @@ def test_get_user_roles_list(get, admin):
roles = response.data
assert roles['count'] > 0 # 'System Administrator' role if nothing else
@pytest.mark.django_db
def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob):
'Users can see roles for other users, but only the roles that that user has access to see as well'
organization.member_role.members.add(alice)
organization.admins.add(bob)
custom_role = Role.objects.create(name='custom_role-test_user_view_admin_roles_list')
organization.member_role.children.add(custom_role)
team.users.add(bob)
# alice and bob are in the same org and can see some child role of that org.
# Bob is an org admin, alice can see this.
# Bob is in a team that alice is not, alice cannot see that bob is a member of that team.
url = reverse('api:user_roles_list', args=(bob.id,))
response = get(url, alice)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
role_hash = {}
for r in roles['results']:
role_hash[r['id']] = r['name']
assert organization.admin_role.id in role_hash
assert custom_role.id not in role_hash # doesn't show up in the user roles list, not an explicit grant
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id not in role_hash
assert inventory.admin_role.id not in role_hash
assert team.member_role.id not in role_hash # alice can't see this
# again but this time alice is part of the team, and should be able to see the team role
team.users.add(alice)
response = get(url, alice)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
role_hash = {}
for r in roles['results']:
role_hash[r['id']] = r['name']
assert team.member_role.id in role_hash # Alice can now see this
@pytest.mark.django_db
def test_add_role_to_user(role, post, admin):
assert admin.roles.filter(id=role.id).count() == 0
@ -165,15 +228,15 @@ def test_get_role(get, admin, role):
def test_put_role(put, admin, role):
url = reverse('api:role_detail', args=(role.id,))
response = put(url, {'name': 'Some new name'}, admin)
assert response.status_code == 200
r = Role.objects.get(id=role.id)
assert r.name == 'Some new name'
assert response.status_code == 405
#r = Role.objects.get(id=role.id)
#assert r.name == 'Some new name'
@pytest.mark.django_db
def test_put_role_access_denied(put, alice, admin, role):
url = reverse('api:role_detail', args=(role.id,))
response = put(url, {'name': 'Some new name'}, alice)
assert response.status_code == 403
assert response.status_code == 403 or response.status_code == 405
#
@ -204,6 +267,67 @@ def test_remove_user_to_role(post, admin, role):
post(url, {'disassociate': True, 'id': admin.id}, admin)
assert role.members.filter(id=admin.id).count() == 0
@pytest.mark.django_db
def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin')
joe = user('joe')
organization.admins.add(org_admin)
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, org_admin)
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
@pytest.mark.django_db
def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin')
joe = user('joe')
organization.admins.add(org_admin)
check_jobtemplate.executor_role.members.add(joe)
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, org_admin)
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
@pytest.mark.django_db
def test_user_fail_to_add_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
rando = user('rando')
joe = user('joe')
assert check_jobtemplate.accessible_by(rando, {'write': True}) is False
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, rando)
assert res.status_code == 403
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
@pytest.mark.django_db
def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobtemplate, user):
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
rando = user('rando')
joe = user('joe')
check_jobtemplate.executor_role.members.add(joe)
assert check_jobtemplate.accessible_by(rando, {'write': True}) is False
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, rando)
assert res.status_code == 403
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
#
# /roles/<id>/teams/
#
@ -252,22 +376,6 @@ def test_role_parents(get, team, admin, role):
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.member_role.id
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_add_parent(post, team, admin, role):
assert role.parents.count() == 0
url = reverse('api:role_parents_list', args=(role.id,))
post(url, {'id': team.member_role.id}, admin)
assert role.parents.count() == 1
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_remove_parent(post, team, admin, role):
role.parents.add(team.member_role)
assert role.parents.count() == 1
url = reverse('api:role_parents_list', args=(role.id,))
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
assert role.parents.count() == 0
#
# /roles/<id>/children/
@ -282,22 +390,6 @@ def test_role_children(get, team, admin, role):
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == role.id
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_add_children(post, team, admin, role):
assert role.children.count() == 0
url = reverse('api:role_children_list', args=(role.id,))
post(url, {'id': team.member_role.id}, admin)
assert role.children.count() == 1
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_remove_children(post, team, admin, role):
role.children.add(team.member_role)
assert role.children.count() == 1
url = reverse('api:role_children_list', args=(role.id,))
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
assert role.children.count() == 0