mirror of
https://github.com/ansible/awx.git
synced 2026-02-22 13:36:02 -03:30
Merge pull request #1110 from anoek/rbac
Role description fields and some api lock down
This commit is contained in:
@@ -358,6 +358,7 @@ class BaseSerializer(serializers.ModelSerializer):
|
|||||||
roles[field.name] = {
|
roles[field.name] = {
|
||||||
'id': role.id,
|
'id': role.id,
|
||||||
'name': role.name,
|
'name': role.name,
|
||||||
|
'description': role.description,
|
||||||
'url': role.get_absolute_url(),
|
'url': role.get_absolute_url(),
|
||||||
}
|
}
|
||||||
if len(roles) > 0:
|
if len(roles) > 0:
|
||||||
@@ -1540,7 +1541,7 @@ class ResourceAccessListElementSerializer(UserSerializer):
|
|||||||
ret['summary_fields']['permissions'] = resource.get_permissions(user)
|
ret['summary_fields']['permissions'] = resource.get_permissions(user)
|
||||||
|
|
||||||
def format_role_perm(role):
|
def format_role_perm(role):
|
||||||
role_dict = { 'id': role.id, 'name': role.name}
|
role_dict = { 'id': role.id, 'name': role.name, 'description': role.description}
|
||||||
try:
|
try:
|
||||||
role_dict['resource_name'] = role.content_object.name
|
role_dict['resource_name'] = role.content_object.name
|
||||||
role_dict['resource_type'] = role.content_type.name
|
role_dict['resource_type'] = role.content_type.name
|
||||||
|
|||||||
@@ -730,11 +730,8 @@ class TeamRolesList(SubListCreateAttachDetachAPIView):
|
|||||||
relationship='member_role.children'
|
relationship='member_role.children'
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
# XXX: This needs to be the intersection between
|
|
||||||
# what roles the user has and what roles the viewer
|
|
||||||
# has access to see.
|
|
||||||
team = Team.objects.get(pk=self.kwargs['pk'])
|
team = Team.objects.get(pk=self.kwargs['pk'])
|
||||||
return team.member_role.children
|
return team.member_role.children.filter(id__in=Role.visible_roles(self.request.user))
|
||||||
|
|
||||||
# XXX: Need to enforce permissions
|
# XXX: Need to enforce permissions
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
@@ -979,13 +976,11 @@ class UserRolesList(SubListCreateAttachDetachAPIView):
|
|||||||
serializer_class = RoleSerializer
|
serializer_class = RoleSerializer
|
||||||
parent_model = User
|
parent_model = User
|
||||||
relationship='roles'
|
relationship='roles'
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
# XXX: This needs to be the intersection between
|
#u = User.objects.get(pk=self.kwargs['pk'])
|
||||||
# what roles the user has and what roles the viewer
|
return Role.visible_roles(self.request.user).filter(members__in=[int(self.kwargs['pk']), ])
|
||||||
# has access to see.
|
|
||||||
u = User.objects.get(pk=self.kwargs['pk'])
|
|
||||||
return u.roles
|
|
||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
# Forbid implicit role creation here
|
# Forbid implicit role creation here
|
||||||
@@ -995,6 +990,10 @@ class UserRolesList(SubListCreateAttachDetachAPIView):
|
|||||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||||
return super(type(self), self).post(request, *args, **kwargs)
|
return super(type(self), self).post(request, *args, **kwargs)
|
||||||
|
|
||||||
|
def check_parent_access(self, parent=None):
|
||||||
|
# We hide roles that shouldn't be seen in our queryset
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class UserProjectsList(SubListAPIView):
|
class UserProjectsList(SubListAPIView):
|
||||||
@@ -3162,29 +3161,27 @@ class SettingsReset(APIView):
|
|||||||
TowerSettings.objects.filter(key=settings_key).delete()
|
TowerSettings.objects.filter(key=settings_key).delete()
|
||||||
return Response(status=status.HTTP_204_NO_CONTENT)
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
||||||
|
|
||||||
#class RoleList(ListCreateAPIView):
|
|
||||||
class RoleList(ListAPIView):
|
class RoleList(ListAPIView):
|
||||||
|
|
||||||
model = Role
|
model = Role
|
||||||
serializer_class = RoleSerializer
|
serializer_class = RoleSerializer
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
new_in_300 = True
|
new_in_300 = True
|
||||||
|
|
||||||
# XXX: Permissions - only roles the user has access to see should be listed here
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
return Role.objects
|
if self.request.user.is_superuser:
|
||||||
|
return Role.objects
|
||||||
|
return Role.visible_roles(self.request.user)
|
||||||
|
|
||||||
# XXX: Need to define who can create custom roles, and then restrict access
|
|
||||||
# appropriately
|
|
||||||
# XXX: Need to define how we want to deal with administration of custom roles.
|
|
||||||
|
|
||||||
class RoleDetail(RetrieveUpdateAPIView):
|
class RoleDetail(RetrieveAPIView):
|
||||||
|
|
||||||
model = Role
|
model = Role
|
||||||
serializer_class = RoleSerializer
|
serializer_class = RoleSerializer
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
new_in_300 = True
|
new_in_300 = True
|
||||||
|
|
||||||
# XXX: Permissions - only appropriate people should be able to change these
|
|
||||||
|
|
||||||
|
|
||||||
class RoleUsersList(SubListCreateAttachDetachAPIView):
|
class RoleUsersList(SubListCreateAttachDetachAPIView):
|
||||||
|
|
||||||
@@ -3192,6 +3189,8 @@ class RoleUsersList(SubListCreateAttachDetachAPIView):
|
|||||||
serializer_class = UserSerializer
|
serializer_class = UserSerializer
|
||||||
parent_model = Role
|
parent_model = Role
|
||||||
relationship = 'members'
|
relationship = 'members'
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
|
new_in_300 = True
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
# XXX: Access control
|
# XXX: Access control
|
||||||
@@ -3213,6 +3212,8 @@ class RoleTeamsList(ListAPIView):
|
|||||||
serializer_class = TeamSerializer
|
serializer_class = TeamSerializer
|
||||||
parent_model = Role
|
parent_model = Role
|
||||||
relationship = 'member_role.parents'
|
relationship = 'member_role.parents'
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
|
new_in_300 = True
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
# TODO: Check
|
# TODO: Check
|
||||||
@@ -3243,6 +3244,8 @@ class RoleParentsList(SubListAPIView):
|
|||||||
serializer_class = RoleSerializer
|
serializer_class = RoleSerializer
|
||||||
parent_model = Role
|
parent_model = Role
|
||||||
relationship = 'parents'
|
relationship = 'parents'
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
|
new_in_300 = True
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
# XXX: This should be the intersection between the roles of the user
|
# XXX: This should be the intersection between the roles of the user
|
||||||
@@ -3256,6 +3259,8 @@ class RoleChildrenList(SubListAPIView):
|
|||||||
serializer_class = RoleSerializer
|
serializer_class = RoleSerializer
|
||||||
parent_model = Role
|
parent_model = Role
|
||||||
relationship = 'children'
|
relationship = 'children'
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
|
new_in_300 = True
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
# XXX: This should be the intersection between the roles of the user
|
# XXX: This should be the intersection between the roles of the user
|
||||||
@@ -3267,6 +3272,7 @@ class ResourceDetail(RetrieveAPIView):
|
|||||||
|
|
||||||
model = Resource
|
model = Resource
|
||||||
serializer_class = ResourceSerializer
|
serializer_class = ResourceSerializer
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
new_in_300 = True
|
new_in_300 = True
|
||||||
|
|
||||||
# XXX: Permissions - only roles the user has access to see should be listed here
|
# XXX: Permissions - only roles the user has access to see should be listed here
|
||||||
@@ -3277,6 +3283,7 @@ class ResourceList(ListAPIView):
|
|||||||
|
|
||||||
model = Resource
|
model = Resource
|
||||||
serializer_class = ResourceSerializer
|
serializer_class = ResourceSerializer
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
new_in_300 = True
|
new_in_300 = True
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
@@ -3286,6 +3293,7 @@ class ResourceAccessList(ListAPIView):
|
|||||||
|
|
||||||
model = User
|
model = User
|
||||||
serializer_class = ResourceAccessListElementSerializer
|
serializer_class = ResourceAccessListElementSerializer
|
||||||
|
permission_classes = (IsAuthenticated,)
|
||||||
new_in_300 = True
|
new_in_300 = True
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
|
|||||||
@@ -1695,23 +1695,31 @@ class RoleAccess(BaseAccess):
|
|||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
if self.user.is_superuser:
|
if self.user.is_superuser:
|
||||||
return self.model.objects.all()
|
return self.model.objects.all()
|
||||||
return self.model.objects.none()
|
return self.model.visible_roles(self.user)
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
return self.user.is_superuser
|
return self.user.is_superuser
|
||||||
|
|
||||||
def can_add(self, obj, data):
|
def can_add(self, obj, data):
|
||||||
return self.user.is_superuser
|
# Unsupported for now
|
||||||
|
return False
|
||||||
|
|
||||||
def can_attach(self, obj, sub_obj, relationship, data,
|
def can_attach(self, obj, sub_obj, relationship, data,
|
||||||
skip_sub_obj_read_check=False):
|
skip_sub_obj_read_check=False):
|
||||||
return self.user.is_superuser
|
return self.can_unattach(obj, sub_obj, relationship)
|
||||||
|
|
||||||
def can_unattach(self, obj, sub_obj, relationship):
|
def can_unattach(self, obj, sub_obj, relationship):
|
||||||
return self.user.is_superuser
|
if self.user.is_superuser:
|
||||||
|
return True
|
||||||
|
if obj.object_id and \
|
||||||
|
isinstance(obj.content_object, ResourceMixin) and \
|
||||||
|
obj.content_object.accessible_by(self.user, {'write': True}):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
return self.user.is_superuser
|
# Unsupported for now
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class ResourceAccess(BaseAccess):
|
class ResourceAccess(BaseAccess):
|
||||||
|
|||||||
@@ -134,8 +134,9 @@ def resolve_role_field(obj, field):
|
|||||||
class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
|
class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
|
||||||
"""Descriptor Implict Role Fields. Auto-creates the appropriate role entry on first access"""
|
"""Descriptor Implict Role Fields. Auto-creates the appropriate role entry on first access"""
|
||||||
|
|
||||||
def __init__(self, role_name, permissions, parent_role, *args, **kwargs):
|
def __init__(self, role_name, role_description, permissions, parent_role, *args, **kwargs):
|
||||||
self.role_name = role_name
|
self.role_name = role_name
|
||||||
|
self.role_description = role_description if role_description else ""
|
||||||
self.permissions = permissions
|
self.permissions = permissions
|
||||||
self.parent_role = parent_role
|
self.parent_role = parent_role
|
||||||
|
|
||||||
@@ -152,7 +153,7 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
|
|||||||
if connection.needs_rollback:
|
if connection.needs_rollback:
|
||||||
raise TransactionManagementError('Current transaction has failed, cannot create implicit role')
|
raise TransactionManagementError('Current transaction has failed, cannot create implicit role')
|
||||||
|
|
||||||
role = Role.objects.create(name=self.role_name, content_object=instance)
|
role = Role.objects.create(name=self.role_name, description=self.role_description, content_object=instance)
|
||||||
if self.parent_role:
|
if self.parent_role:
|
||||||
|
|
||||||
# Add all non-null parent roles as parents
|
# Add all non-null parent roles as parents
|
||||||
@@ -195,8 +196,9 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
|
|||||||
class ImplicitRoleField(models.ForeignKey):
|
class ImplicitRoleField(models.ForeignKey):
|
||||||
"""Implicitly creates a role entry for a resource"""
|
"""Implicitly creates a role entry for a resource"""
|
||||||
|
|
||||||
def __init__(self, role_name=None, permissions=None, parent_role=None, *args, **kwargs):
|
def __init__(self, role_name=None, role_description=None, permissions=None, parent_role=None, *args, **kwargs):
|
||||||
self.role_name = role_name
|
self.role_name = role_name
|
||||||
|
self.role_description = role_description
|
||||||
self.permissions = permissions
|
self.permissions = permissions
|
||||||
self.parent_role = parent_role
|
self.parent_role = parent_role
|
||||||
|
|
||||||
@@ -211,6 +213,7 @@ class ImplicitRoleField(models.ForeignKey):
|
|||||||
self.name,
|
self.name,
|
||||||
ImplicitRoleDescriptor(
|
ImplicitRoleDescriptor(
|
||||||
self.role_name,
|
self.role_name,
|
||||||
|
self.role_description,
|
||||||
self.permissions,
|
self.permissions,
|
||||||
self.parent_role,
|
self.parent_role,
|
||||||
self
|
self
|
||||||
|
|||||||
@@ -157,11 +157,13 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
|||||||
)
|
)
|
||||||
owner_role = ImplicitRoleField(
|
owner_role = ImplicitRoleField(
|
||||||
role_name='Credential Owner',
|
role_name='Credential Owner',
|
||||||
|
role_description='Owner of the credential',
|
||||||
parent_role='team.admin_role',
|
parent_role='team.admin_role',
|
||||||
permissions = {'all': True}
|
permissions = {'all': True}
|
||||||
)
|
)
|
||||||
usage_role = ImplicitRoleField(
|
usage_role = ImplicitRoleField(
|
||||||
role_name='Credential User',
|
role_name='Credential User',
|
||||||
|
role_description='May use this credential, but not read sensitive portions or modify it',
|
||||||
parent_role= 'team.member_role',
|
parent_role= 'team.member_role',
|
||||||
permissions = {'use': True}
|
permissions = {'use': True}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -98,19 +98,23 @@ class Inventory(CommonModel, ResourceMixin):
|
|||||||
)
|
)
|
||||||
admin_role = ImplicitRoleField(
|
admin_role = ImplicitRoleField(
|
||||||
role_name='Inventory Administrator',
|
role_name='Inventory Administrator',
|
||||||
|
role_description='May manage this inventory',
|
||||||
parent_role='organization.admin_role',
|
parent_role='organization.admin_role',
|
||||||
permissions = {'all': True}
|
permissions = {'all': True}
|
||||||
)
|
)
|
||||||
auditor_role = ImplicitRoleField(
|
auditor_role = ImplicitRoleField(
|
||||||
role_name='Inventory Auditor',
|
role_name='Inventory Auditor',
|
||||||
|
role_description='May view but not modify this inventory',
|
||||||
parent_role='organization.auditor_role',
|
parent_role='organization.auditor_role',
|
||||||
permissions = {'read': True}
|
permissions = {'read': True}
|
||||||
)
|
)
|
||||||
updater_role = ImplicitRoleField(
|
updater_role = ImplicitRoleField(
|
||||||
role_name='Inventory Updater',
|
role_name='Inventory Updater',
|
||||||
|
role_description='May update the inventory',
|
||||||
)
|
)
|
||||||
executor_role = ImplicitRoleField(
|
executor_role = ImplicitRoleField(
|
||||||
role_name='Inventory Executor',
|
role_name='Inventory Executor',
|
||||||
|
role_description='May execute jobs against this inventory',
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_absolute_url(self):
|
def get_absolute_url(self):
|
||||||
|
|||||||
@@ -185,16 +185,19 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
|
|||||||
)
|
)
|
||||||
admin_role = ImplicitRoleField(
|
admin_role = ImplicitRoleField(
|
||||||
role_name='Job Template Administrator',
|
role_name='Job Template Administrator',
|
||||||
|
role_description='Full access to all settings',
|
||||||
parent_role='project.admin_role',
|
parent_role='project.admin_role',
|
||||||
permissions = {'all': True}
|
permissions = {'all': True}
|
||||||
)
|
)
|
||||||
auditor_role = ImplicitRoleField(
|
auditor_role = ImplicitRoleField(
|
||||||
role_name='Job Template Auditor',
|
role_name='Job Template Auditor',
|
||||||
|
role_description='Read-only access to all settings',
|
||||||
parent_role='project.auditor_role',
|
parent_role='project.auditor_role',
|
||||||
permissions = {'read': True}
|
permissions = {'read': True}
|
||||||
)
|
)
|
||||||
executor_role = ImplicitRoleField(
|
executor_role = ImplicitRoleField(
|
||||||
role_name='Job Template Executor',
|
role_name='Job Template Runner',
|
||||||
|
role_description='May run the job template',
|
||||||
permissions = {'read': True, 'execute': True}
|
permissions = {'read': True, 'execute': True}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -51,16 +51,20 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin):
|
|||||||
)
|
)
|
||||||
admin_role = ImplicitRoleField(
|
admin_role = ImplicitRoleField(
|
||||||
role_name='Organization Administrator',
|
role_name='Organization Administrator',
|
||||||
|
role_description='May manage all aspects of this organization',
|
||||||
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
|
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
|
||||||
permissions = {'all': True}
|
permissions = {'all': True}
|
||||||
)
|
)
|
||||||
auditor_role = ImplicitRoleField(
|
auditor_role = ImplicitRoleField(
|
||||||
role_name='Organization Auditor',
|
role_name='Organization Auditor',
|
||||||
|
role_description='May read all settings associated with this organization',
|
||||||
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,
|
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,
|
||||||
permissions = {'read': True}
|
permissions = {'read': True}
|
||||||
)
|
)
|
||||||
member_role = ImplicitRoleField(
|
member_role = ImplicitRoleField(
|
||||||
role_name='Organization Member',
|
role_name='Organization Member',
|
||||||
|
role_description='A member of this organization',
|
||||||
|
parent_role='admin_role',
|
||||||
permissions = {'read': True}
|
permissions = {'read': True}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -107,16 +111,19 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
|
|||||||
)
|
)
|
||||||
admin_role = ImplicitRoleField(
|
admin_role = ImplicitRoleField(
|
||||||
role_name='Team Administrator',
|
role_name='Team Administrator',
|
||||||
|
role_description='May manage this team',
|
||||||
parent_role='organization.admin_role',
|
parent_role='organization.admin_role',
|
||||||
permissions = {'all': True}
|
permissions = {'all': True}
|
||||||
)
|
)
|
||||||
auditor_role = ImplicitRoleField(
|
auditor_role = ImplicitRoleField(
|
||||||
role_name='Team Auditor',
|
role_name='Team Auditor',
|
||||||
|
role_description='May read all settings associated with this team',
|
||||||
parent_role='organization.auditor_role',
|
parent_role='organization.auditor_role',
|
||||||
permissions = {'read': True}
|
permissions = {'read': True}
|
||||||
)
|
)
|
||||||
member_role = ImplicitRoleField(
|
member_role = ImplicitRoleField(
|
||||||
role_name='Team Member',
|
role_name='Team Member',
|
||||||
|
role_description='A member of this team',
|
||||||
parent_role='admin_role',
|
parent_role='admin_role',
|
||||||
permissions = {'read':True},
|
permissions = {'read':True},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -211,20 +211,24 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
|
|||||||
)
|
)
|
||||||
admin_role = ImplicitRoleField(
|
admin_role = ImplicitRoleField(
|
||||||
role_name='Project Administrator',
|
role_name='Project Administrator',
|
||||||
|
role_description='May manage this project',
|
||||||
parent_role='organizations.admin_role',
|
parent_role='organizations.admin_role',
|
||||||
permissions = {'all': True}
|
permissions = {'all': True}
|
||||||
)
|
)
|
||||||
auditor_role = ImplicitRoleField(
|
auditor_role = ImplicitRoleField(
|
||||||
role_name='Project Auditor',
|
role_name='Project Auditor',
|
||||||
|
role_description='May read all settings associated with this project',
|
||||||
parent_role='organizations.auditor_role',
|
parent_role='organizations.auditor_role',
|
||||||
permissions = {'read': True}
|
permissions = {'read': True}
|
||||||
)
|
)
|
||||||
member_role = ImplicitRoleField(
|
member_role = ImplicitRoleField(
|
||||||
role_name='Project Member',
|
role_name='Project Member',
|
||||||
|
role_description='Implies membership within this project',
|
||||||
permissions = {'read': True}
|
permissions = {'read': True}
|
||||||
)
|
)
|
||||||
scm_update_role = ImplicitRoleField(
|
scm_update_role = ImplicitRoleField(
|
||||||
role_name='Project Updater',
|
role_name='Project Updater',
|
||||||
|
role_description='May update this project from the source control management system',
|
||||||
parent_role='admin_role',
|
parent_role='admin_role',
|
||||||
permissions = {'scm_update': True}
|
permissions = {'scm_update': True}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import logging
|
|||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
from django.db.models import Q
|
||||||
from django.db.models.aggregates import Max
|
from django.db.models.aggregates import Max
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
@@ -139,6 +140,10 @@ class Role(CommonModelNameNotUnique):
|
|||||||
setattr(permission, k, int(permissions[k]))
|
setattr(permission, k, int(permissions[k]))
|
||||||
permission.save()
|
permission.save()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def visible_roles(user):
|
||||||
|
return Role.objects.filter(Q(descendents__in=user.roles.filter()) | Q(ancestors__in=user.roles.filter()))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def singleton(name):
|
def singleton(name):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -26,5 +26,6 @@ class UserResource(CommonModelNameNotUnique, ResourceMixin):
|
|||||||
|
|
||||||
admin_role = ImplicitRoleField(
|
admin_role = ImplicitRoleField(
|
||||||
role_name='User Administrator',
|
role_name='User Administrator',
|
||||||
|
role_description='May manage this user',
|
||||||
permissions = {'all': True},
|
permissions = {'all': True},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import mock # noqa
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
from awx.main.models.rbac import Role
|
from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
|
||||||
|
|
||||||
def mock_feature_enabled(feature, bypass_database=None):
|
def mock_feature_enabled(feature, bypass_database=None):
|
||||||
return True
|
return True
|
||||||
@@ -24,39 +24,55 @@ def test_get_roles_list_admin(organization, get, admin):
|
|||||||
assert roles['count'] > 0
|
assert roles['count'] > 0
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.skipif(True, reason='Unimplemented')
|
def test_get_roles_list_user(organization, inventory, team, get, user):
|
||||||
def test_get_roles_list_user(organization, get, user):
|
|
||||||
'Users can see all roles they have access to, but not all roles'
|
'Users can see all roles they have access to, but not all roles'
|
||||||
assert False
|
this_user = user('user-test_get_roles_list_user')
|
||||||
|
organization.member_role.members.add(this_user)
|
||||||
|
custom_role = Role.objects.create(name='custom_role-test_get_roles_list_user')
|
||||||
|
organization.member_role.children.add(custom_role)
|
||||||
|
|
||||||
|
url = reverse('api:role_list')
|
||||||
|
response = get(url, this_user)
|
||||||
|
assert response.status_code == 200
|
||||||
|
roles = response.data
|
||||||
|
assert roles['count'] > 0
|
||||||
|
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
|
||||||
|
|
||||||
|
role_hash = {}
|
||||||
|
|
||||||
|
for r in roles['results']:
|
||||||
|
role_hash[r['id']] = r
|
||||||
|
|
||||||
|
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id in role_hash
|
||||||
|
assert organization.admin_role.id in role_hash
|
||||||
|
assert organization.member_role.id in role_hash
|
||||||
|
assert this_user.resource.admin_role.id in role_hash
|
||||||
|
assert custom_role.id in role_hash
|
||||||
|
|
||||||
|
assert inventory.admin_role.id not in role_hash
|
||||||
|
assert team.member_role.id not in role_hash
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
def test_cant_create_role(post, admin):
|
||||||
def test_create_role(post, admin):
|
"Ensure we can't create new roles through the api"
|
||||||
'Admins can create new roles'
|
# Some day we might want to do this, but until that is speced out, lets
|
||||||
#u = user('admin', True)
|
# ensure we don't slip up and allow this implicitly through some helper or
|
||||||
|
# another
|
||||||
response = post(reverse('api:role_list'), {'name': 'New Role'}, admin)
|
response = post(reverse('api:role_list'), {'name': 'New Role'}, admin)
|
||||||
assert response.status_code == 201
|
assert response.status_code == 405
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
def test_cant_delete_role(delete, admin):
|
||||||
def test_delete_role(post, admin):
|
"Ensure we can't delete roles through the api"
|
||||||
'Admins can delete a custom role'
|
# Some day we might want to do this, but until that is speced out, lets
|
||||||
assert False
|
# ensure we don't slip up and allow this implicitly through some helper or
|
||||||
|
# another
|
||||||
|
response = delete(reverse('api:role_detail', args=(admin.resource.admin_role.id,)), admin)
|
||||||
@pytest.mark.django_db
|
assert response.status_code == 405
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
|
||||||
def test_user_create_role(organization, get, user):
|
|
||||||
'User can create custom roles'
|
|
||||||
assert False
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
|
||||||
def test_user_delete_role(organization, get, user):
|
|
||||||
'User can delete their custom roles, but not any old row'
|
|
||||||
assert False
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -72,6 +88,53 @@ def test_get_user_roles_list(get, admin):
|
|||||||
roles = response.data
|
roles = response.data
|
||||||
assert roles['count'] > 0 # 'System Administrator' role if nothing else
|
assert roles['count'] > 0 # 'System Administrator' role if nothing else
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob):
|
||||||
|
'Users can see roles for other users, but only the roles that that user has access to see as well'
|
||||||
|
organization.member_role.members.add(alice)
|
||||||
|
organization.admins.add(bob)
|
||||||
|
custom_role = Role.objects.create(name='custom_role-test_user_view_admin_roles_list')
|
||||||
|
organization.member_role.children.add(custom_role)
|
||||||
|
team.users.add(bob)
|
||||||
|
|
||||||
|
# alice and bob are in the same org and can see some child role of that org.
|
||||||
|
# Bob is an org admin, alice can see this.
|
||||||
|
# Bob is in a team that alice is not, alice cannot see that bob is a member of that team.
|
||||||
|
|
||||||
|
url = reverse('api:user_roles_list', args=(bob.id,))
|
||||||
|
response = get(url, alice)
|
||||||
|
assert response.status_code == 200
|
||||||
|
roles = response.data
|
||||||
|
assert roles['count'] > 0
|
||||||
|
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
|
||||||
|
|
||||||
|
role_hash = {}
|
||||||
|
for r in roles['results']:
|
||||||
|
role_hash[r['id']] = r['name']
|
||||||
|
|
||||||
|
assert organization.admin_role.id in role_hash
|
||||||
|
assert custom_role.id not in role_hash # doesn't show up in the user roles list, not an explicit grant
|
||||||
|
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id not in role_hash
|
||||||
|
assert inventory.admin_role.id not in role_hash
|
||||||
|
assert team.member_role.id not in role_hash # alice can't see this
|
||||||
|
|
||||||
|
# again but this time alice is part of the team, and should be able to see the team role
|
||||||
|
team.users.add(alice)
|
||||||
|
response = get(url, alice)
|
||||||
|
assert response.status_code == 200
|
||||||
|
roles = response.data
|
||||||
|
assert roles['count'] > 0
|
||||||
|
assert roles['count'] == len(roles['results']) # just to make sure the tests below are valid
|
||||||
|
|
||||||
|
role_hash = {}
|
||||||
|
for r in roles['results']:
|
||||||
|
role_hash[r['id']] = r['name']
|
||||||
|
|
||||||
|
assert team.member_role.id in role_hash # Alice can now see this
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_add_role_to_user(role, post, admin):
|
def test_add_role_to_user(role, post, admin):
|
||||||
assert admin.roles.filter(id=role.id).count() == 0
|
assert admin.roles.filter(id=role.id).count() == 0
|
||||||
@@ -165,15 +228,15 @@ def test_get_role(get, admin, role):
|
|||||||
def test_put_role(put, admin, role):
|
def test_put_role(put, admin, role):
|
||||||
url = reverse('api:role_detail', args=(role.id,))
|
url = reverse('api:role_detail', args=(role.id,))
|
||||||
response = put(url, {'name': 'Some new name'}, admin)
|
response = put(url, {'name': 'Some new name'}, admin)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 405
|
||||||
r = Role.objects.get(id=role.id)
|
#r = Role.objects.get(id=role.id)
|
||||||
assert r.name == 'Some new name'
|
#assert r.name == 'Some new name'
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_put_role_access_denied(put, alice, admin, role):
|
def test_put_role_access_denied(put, alice, admin, role):
|
||||||
url = reverse('api:role_detail', args=(role.id,))
|
url = reverse('api:role_detail', args=(role.id,))
|
||||||
response = put(url, {'name': 'Some new name'}, alice)
|
response = put(url, {'name': 'Some new name'}, alice)
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403 or response.status_code == 405
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -204,6 +267,67 @@ def test_remove_user_to_role(post, admin, role):
|
|||||||
post(url, {'disassociate': True, 'id': admin.id}, admin)
|
post(url, {'disassociate': True, 'id': admin.id}, admin)
|
||||||
assert role.members.filter(id=admin.id).count() == 0
|
assert role.members.filter(id=admin.id).count() == 0
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplate, user):
|
||||||
|
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
|
||||||
|
org_admin = user('org-admin')
|
||||||
|
joe = user('joe')
|
||||||
|
organization.admins.add(org_admin)
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
|
||||||
|
|
||||||
|
post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, org_admin)
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemplate, user):
|
||||||
|
'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
|
||||||
|
org_admin = user('org-admin')
|
||||||
|
joe = user('joe')
|
||||||
|
organization.admins.add(org_admin)
|
||||||
|
check_jobtemplate.executor_role.members.add(joe)
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
|
||||||
|
|
||||||
|
post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, org_admin)
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_user_fail_to_add_user_to_job_template(post, organization, check_jobtemplate, user):
|
||||||
|
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
|
||||||
|
rando = user('rando')
|
||||||
|
joe = user('joe')
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(rando, {'write': True}) is False
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
|
||||||
|
|
||||||
|
res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'id': joe.id}, rando)
|
||||||
|
assert res.status_code == 403
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobtemplate, user):
|
||||||
|
'Tests that a user without permissions to assign/revoke membership to a particular role cannot do so'
|
||||||
|
rando = user('rando')
|
||||||
|
joe = user('joe')
|
||||||
|
check_jobtemplate.executor_role.members.add(joe)
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(rando, {'write': True}) is False
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
|
||||||
|
|
||||||
|
res = post(reverse('api:role_users_list', args=(check_jobtemplate.executor_role.id,)), {'disassociate': True, 'id': joe.id}, rando)
|
||||||
|
assert res.status_code == 403
|
||||||
|
|
||||||
|
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is True
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# /roles/<id>/teams/
|
# /roles/<id>/teams/
|
||||||
#
|
#
|
||||||
@@ -252,22 +376,6 @@ def test_role_parents(get, team, admin, role):
|
|||||||
assert response.data['count'] == 1
|
assert response.data['count'] == 1
|
||||||
assert response.data['results'][0]['id'] == team.member_role.id
|
assert response.data['results'][0]['id'] == team.member_role.id
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
|
||||||
def test_role_add_parent(post, team, admin, role):
|
|
||||||
assert role.parents.count() == 0
|
|
||||||
url = reverse('api:role_parents_list', args=(role.id,))
|
|
||||||
post(url, {'id': team.member_role.id}, admin)
|
|
||||||
assert role.parents.count() == 1
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
|
||||||
def test_role_remove_parent(post, team, admin, role):
|
|
||||||
role.parents.add(team.member_role)
|
|
||||||
assert role.parents.count() == 1
|
|
||||||
url = reverse('api:role_parents_list', args=(role.id,))
|
|
||||||
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
|
|
||||||
assert role.parents.count() == 0
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# /roles/<id>/children/
|
# /roles/<id>/children/
|
||||||
@@ -282,22 +390,6 @@ def test_role_children(get, team, admin, role):
|
|||||||
assert response.data['count'] == 1
|
assert response.data['count'] == 1
|
||||||
assert response.data['results'][0]['id'] == role.id
|
assert response.data['results'][0]['id'] == role.id
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
|
||||||
def test_role_add_children(post, team, admin, role):
|
|
||||||
assert role.children.count() == 0
|
|
||||||
url = reverse('api:role_children_list', args=(role.id,))
|
|
||||||
post(url, {'id': team.member_role.id}, admin)
|
|
||||||
assert role.children.count() == 1
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
|
|
||||||
def test_role_remove_children(post, team, admin, role):
|
|
||||||
role.children.add(team.member_role)
|
|
||||||
assert role.children.count() == 1
|
|
||||||
url = reverse('api:role_children_list', args=(role.id,))
|
|
||||||
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
|
|
||||||
assert role.children.count() == 0
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -138,3 +138,32 @@ def test_content_object(user):
|
|||||||
assert org.resource.content_object.id == org.id
|
assert org.resource.content_object.id == org.id
|
||||||
assert org.admin_role.content_object.id == org.id
|
assert org.admin_role.content_object.id == org.id
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_hierarchy_rebuilding():
|
||||||
|
'Tests some subdtle cases around role hierarchy rebuilding'
|
||||||
|
|
||||||
|
X = Role.objects.create(name='X')
|
||||||
|
A = Role.objects.create(name='A')
|
||||||
|
B = Role.objects.create(name='B')
|
||||||
|
C = Role.objects.create(name='C')
|
||||||
|
D = Role.objects.create(name='D')
|
||||||
|
|
||||||
|
A.children.add(B)
|
||||||
|
A.children.add(D)
|
||||||
|
B.children.add(C)
|
||||||
|
C.children.add(D)
|
||||||
|
|
||||||
|
assert A.is_ancestor_of(D)
|
||||||
|
assert X.is_ancestor_of(D) is False
|
||||||
|
|
||||||
|
X.children.add(A)
|
||||||
|
|
||||||
|
assert X.is_ancestor_of(D) is True
|
||||||
|
|
||||||
|
X.children.remove(A)
|
||||||
|
|
||||||
|
# This can be the stickler, the rebuilder needs to ensure that D's role
|
||||||
|
# hierarchy is built after both A and C are updated.
|
||||||
|
assert X.is_ancestor_of(D) is False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user