Merge pull request #1011 from anoek/rbac

Initial RBAC API implementation
This commit is contained in:
Wayne Witzel III 2016-02-23 10:24:03 -05:00
commit 21482fc294
19 changed files with 977 additions and 127 deletions

View File

@ -348,7 +348,7 @@ class SubListCreateAPIView(SubListAPIView, ListCreateAPIView):
# object deserialized
obj = serializer.save()
serializer = self.get_serializer(instance=obj)
headers = {'Location': obj.get_absolute_url()}
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
@ -359,7 +359,7 @@ class SubListCreateAttachDetachAPIView(SubListCreateAPIView):
def attach(self, request, *args, **kwargs):
created = False
parent = self.get_parent_object()
relationship = getattr(parent, self.relationship)
relationship = getattrd(parent, self.relationship)
sub_id = request.data.get('id', None)
data = request.data
@ -378,7 +378,7 @@ class SubListCreateAttachDetachAPIView(SubListCreateAPIView):
# Retrive the sub object (whether created or by ID).
sub = get_object_or_400(self.model, pk=sub_id)
# Verify we have permission to attach.
if not request.user.can_access(self.parent_model, 'attach', parent, sub,
self.relationship, data,
@ -405,7 +405,7 @@ class SubListCreateAttachDetachAPIView(SubListCreateAPIView):
parent = self.get_parent_object()
parent_key = getattr(self, 'parent_key', None)
relationship = getattr(parent, self.relationship)
relationship = getattrd(parent, self.relationship)
sub = get_object_or_400(self.model, pk=sub_id)
if not request.user.can_access(self.parent_model, 'unattach', parent,

View File

@ -39,6 +39,7 @@ from polymorphic import PolymorphicModel
# AWX
from awx.main.constants import SCHEDULEABLE_PROVIDERS
from awx.main.models import * # noqa
from awx.main.fields import ImplicitRoleField
from awx.main.utils import get_type_for_model, get_model_for_type, build_url, timestamp_apiformat
from awx.main.redact import REPLACE_STR
from awx.main.conf import tower_settings
@ -127,7 +128,7 @@ class BaseSerializerMetaclass(serializers.SerializerMetaclass):
'foo': {'required': False, 'default': ''},
'bar': {'label': 'New Label for Bar'},
}
# The resulting value of extra_kwargs would be:
extra_kwargs = {
'foo': {'required': False, 'default': ''},
@ -201,7 +202,7 @@ class BaseSerializer(serializers.ModelSerializer):
__metaclass__ = BaseSerializerMetaclass
class Meta:
fields = ('id', 'type', 'url', 'related', 'summary_fields', 'created',
fields = ('id', 'type', 'resource_id', 'url', 'related', 'summary_fields', 'created',
'modified', 'name', 'description')
summary_fields = () # FIXME: List of field names from this serializer that should be used when included as part of another's summary_fields.
summarizable_fields = () # FIXME: List of field names on this serializer that should be included in summary_fields.
@ -216,6 +217,8 @@ class BaseSerializer(serializers.ModelSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
active = serializers.SerializerMethodField()
resource_id = serializers.SerializerMethodField()
def get_type(self, obj):
return get_type_for_model(self.Meta.model)
@ -254,6 +257,8 @@ class BaseSerializer(serializers.ModelSerializer):
res['created_by'] = reverse('api:user_detail', args=(obj.created_by.pk,))
if getattr(obj, 'modified_by', None) and obj.modified_by.is_active:
res['modified_by'] = reverse('api:user_detail', args=(obj.modified_by.pk,))
if isinstance(obj, ResourceMixin):
res['resource'] = reverse('api:resource_detail', args=(obj.resource_id,))
return res
def _get_summary_fields(self, obj):
@ -304,8 +309,30 @@ class BaseSerializer(serializers.ModelSerializer):
summary_fields['modified_by'] = OrderedDict()
for field in SUMMARIZABLE_FK_FIELDS['user']:
summary_fields['modified_by'][field] = getattr(obj.modified_by, field)
# RBAC summary fields
request = self.context.get('request', None)
if request and isinstance(obj, ResourceMixin) and request.user.is_authenticated():
summary_fields['permissions'] = obj.get_permissions(request.user)
roles = {}
for field in obj._meta.get_fields():
if type(field) is ImplicitRoleField:
role = getattr(obj, field.name)
#roles[field.name] = RoleSerializer(data=role).to_representation(role)
roles[field.name] = {
'id': role.id,
'name': role.name,
'url': role.get_absolute_url(),
}
if len(roles) > 0:
summary_fields['roles'] = roles
return summary_fields
def get_resource_id(self, obj):
if isinstance(obj, ResourceMixin):
return obj.resource.id
return None
def get_created(self, obj):
if obj is None:
return None
@ -479,6 +506,8 @@ class BaseSerializer(serializers.ModelSerializer):
# set by the sub list create view.
if parent_key and hasattr(view, '_raw_data_form_marker'):
ret.pop(parent_key, None)
if 'resource_id' in ret and ret['resource_id'] is None:
ret.pop('resource_id')
return ret
@ -737,7 +766,7 @@ class UserSerializer(BaseSerializer):
admin_of_organizations = reverse('api:user_admin_of_organizations_list', args=(obj.pk,)),
projects = reverse('api:user_projects_list', args=(obj.pk,)),
credentials = reverse('api:user_credentials_list', args=(obj.pk,)),
permissions = reverse('api:user_permissions_list', args=(obj.pk,)),
roles = reverse('api:user_roles_list', args=(obj.pk,)),
activity_stream = reverse('api:user_activity_stream_list', args=(obj.pk,)),
))
return res
@ -1369,7 +1398,7 @@ class TeamSerializer(BaseSerializer):
projects = reverse('api:team_projects_list', args=(obj.pk,)),
users = reverse('api:team_users_list', args=(obj.pk,)),
credentials = reverse('api:team_credentials_list', args=(obj.pk,)),
permissions = reverse('api:team_permissions_list', args=(obj.pk,)),
roles = reverse('api:team_roles_list', args=(obj.pk,)),
activity_stream = reverse('api:team_activity_stream_list', args=(obj.pk,)),
))
if obj.organization and obj.organization.active:
@ -1383,56 +1412,70 @@ class TeamSerializer(BaseSerializer):
return ret
class PermissionSerializer(BaseSerializer):
class RoleSerializer(BaseSerializer):
class Meta:
model = Permission
fields = ('*', 'user', 'team', 'project', 'inventory',
'permission_type', 'run_ad_hoc_commands')
model = Role
fields = ('*',)
def get_related(self, obj):
res = super(PermissionSerializer, self).get_related(obj)
if obj.user and obj.user.is_active:
res['user'] = reverse('api:user_detail', args=(obj.user.pk,))
if obj.team and obj.team.active:
res['team'] = reverse('api:team_detail', args=(obj.team.pk,))
if obj.project and obj.project.active:
res['project'] = reverse('api:project_detail', args=(obj.project.pk,))
if obj.inventory and obj.inventory.active:
res['inventory'] = reverse('api:inventory_detail', args=(obj.inventory.pk,))
return res
ret = super(RoleSerializer, self).get_related(obj)
if obj.content_object:
if type(obj.content_object) is Organization:
ret['organization'] = reverse('api:organization_detail', args=(obj.object_id,))
if type(obj.content_object) is Team:
ret['team'] = reverse('api:team_detail', args=(obj.object_id,))
if type(obj.content_object) is Project:
ret['project'] = reverse('api:project_detail', args=(obj.object_id,))
if type(obj.content_object) is Inventory:
ret['inventory'] = reverse('api:inventory_detail', args=(obj.object_id,))
if type(obj.content_object) is Host:
ret['host'] = reverse('api:host_detail', args=(obj.object_id,))
if type(obj.content_object) is Group:
ret['group'] = reverse('api:group_detail', args=(obj.object_id,))
if type(obj.content_object) is InventorySource:
ret['inventory_source'] = reverse('api:inventory_source_detail', args=(obj.object_id,))
if type(obj.content_object) is Credential:
ret['credential'] = reverse('api:credential_detail', args=(obj.object_id,))
if type(obj.content_object) is JobTemplate:
ret['job_template'] = reverse('api:job_template_detail', args=(obj.object_id,))
def validate(self, attrs):
# Can only set either user or team.
if attrs.get('user', None) and attrs.get('team', None):
raise serializers.ValidationError('permission can only be assigned'
' to a user OR a team, not both')
# Cannot assign admit/read/write permissions for a project.
if attrs.get('permission_type', None) in ('admin', 'read', 'write') and attrs.get('project', None):
raise serializers.ValidationError('project cannot be assigned for '
'inventory-only permissions')
# Project is required when setting deployment permissions.
if attrs.get('permission_type', None) in ('run', 'check') and not attrs.get('project', None):
raise serializers.ValidationError('project is required when '
'assigning deployment permissions')
return super(PermissionSerializer, self).validate(attrs)
def to_representation(self, obj):
ret = super(PermissionSerializer, self).to_representation(obj)
if obj is None:
return ret
if 'user' in ret and (not obj.user or not obj.user.is_active):
ret['user'] = None
if 'team' in ret and (not obj.team or not obj.team.active):
ret['team'] = None
if 'project' in ret and (not obj.project or not obj.project.active):
ret['project'] = None
if 'inventory' in ret and (not obj.inventory or not obj.inventory.active):
ret['inventory'] = None
return ret
class ResourceSerializer(BaseSerializer):
class Meta:
model = Resource
fields = ('*',)
class ResourceAccessListElementSerializer(UserSerializer):
def to_representation(self, user):
ret = super(ResourceAccessListElementSerializer, self).to_representation(user)
resource_id = self.context['view'].resource_id
resource = Resource.objects.get(pk=resource_id)
if 'summary_fields' not in ret:
ret['summary_fields'] = {}
ret['summary_fields']['permissions'] = resource.get_permissions(user)
def format_role_perm(role):
return { 'role': { 'id': role.id, 'name': role.name}, 'permissions': resource.get_role_permissions(role)}
direct_permissive_role_ids = resource.permissions.values_list('role__id')
direct_access_roles = user.roles.filter(id__in=direct_permissive_role_ids).all()
ret['summary_fields']['direct_access'] = [format_role_perm(r) for r in direct_access_roles]
all_permissive_role_ids = resource.permissions.values_list('role__ancestors__id')
indirect_access_roles = user.roles.filter(id__in=all_permissive_role_ids).exclude(id__in=direct_permissive_role_ids).all()
ret['summary_fields']['indirect_access'] = [format_role_perm(r) for r in indirect_access_roles]
return ret
class CredentialSerializer(BaseSerializer):
# FIXME: may want to make some fields filtered based on user accessing
@ -1705,7 +1748,7 @@ class JobRelaunchSerializer(JobSerializer):
obj = self.context.get('obj')
data = self.context.get('data')
# Check for passwords needed
# Check for passwords needed
needed = self.get_passwords_needed_to_start(obj)
provided = dict([(field, data.get(field, '')) for field in needed])
if not all(provided.values()):
@ -2292,7 +2335,7 @@ class AuthTokenSerializer(serializers.Serializer):
class FactVersionSerializer(BaseFactSerializer):
related = serializers.SerializerMethodField('get_related')
class Meta:
model = FactVersion
fields = ('related', 'module', 'timestamp',)

View File

@ -30,7 +30,7 @@ user_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/admin_of_organizations/$', 'user_admin_of_organizations_list'),
url(r'^(?P<pk>[0-9]+)/projects/$', 'user_projects_list'),
url(r'^(?P<pk>[0-9]+)/credentials/$', 'user_credentials_list'),
url(r'^(?P<pk>[0-9]+)/permissions/$', 'user_permissions_list'),
url(r'^(?P<pk>[0-9]+)/roles/$', 'user_roles_list'),
url(r'^(?P<pk>[0-9]+)/activity_stream/$', 'user_activity_stream_list'),
)
@ -58,7 +58,7 @@ team_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/projects/$', 'team_projects_list'),
url(r'^(?P<pk>[0-9]+)/users/$', 'team_users_list'),
url(r'^(?P<pk>[0-9]+)/credentials/$', 'team_credentials_list'),
url(r'^(?P<pk>[0-9]+)/permissions/$', 'team_permissions_list'),
url(r'^(?P<pk>[0-9]+)/roles/$', 'team_roles_list'),
url(r'^(?P<pk>[0-9]+)/activity_stream/$', 'team_activity_stream_list'),
)
@ -141,8 +141,22 @@ credential_urls = patterns('awx.api.views',
# See also credentials resources on users/teams.
)
permission_urls = patterns('awx.api.views',
url(r'^(?P<pk>[0-9]+)/$', 'permission_detail'),
role_urls = patterns('awx.api.views',
url(r'^$', 'role_list'),
url(r'^(?P<pk>[0-9]+)/$', 'role_detail'),
url(r'^(?P<pk>[0-9]+)/users/$', 'role_users_list'),
url(r'^(?P<pk>[0-9]+)/teams/$', 'role_teams_list'),
url(r'^(?P<pk>[0-9]+)/parents/$', 'role_parents_list'),
url(r'^(?P<pk>[0-9]+)/children/$', 'role_children_list'),
)
resource_urls = patterns('awx.api.views',
#url(r'^$', 'resource_list'),
url(r'^(?P<pk>[0-9]+)/$', 'resource_detail'),
url(r'^(?P<pk>[0-9]+)/access_list/$', 'resource_access_list'),
#url(r'^(?P<pk>[0-9]+)/users/$', 'resource_users_list'),
#url(r'^(?P<pk>[0-9]+)/teams/$', 'resource_teams_list'),
#url(r'^(?P<pk>[0-9]+)/roles/$', 'resource_teams_list'),
)
job_template_urls = patterns('awx.api.views',
@ -249,7 +263,8 @@ v1_urls = patterns('awx.api.views',
url(r'^inventory_updates/', include(inventory_update_urls)),
url(r'^inventory_scripts/', include(inventory_script_urls)),
url(r'^credentials/', include(credential_urls)),
url(r'^permissions/', include(permission_urls)),
url(r'^roles/', include(role_urls)),
url(r'^resources/', include(resource_urls)),
url(r'^job_templates/', include(job_template_urls)),
url(r'^jobs/', include(job_urls)),
url(r'^job_host_summaries/', include(job_host_summary_urls)),

View File

@ -581,7 +581,7 @@ class AuthTokenView(APIView):
except IndexError:
token = AuthToken.objects.create(user=serializer.validated_data['user'],
request_hash=request_hash)
# Get user un-expired tokens that are not invalidated that are
# Get user un-expired tokens that are not invalidated that are
# over the configured limit.
# Mark them as invalid and inform the user
invalid_tokens = AuthToken.get_tokens_over_limit(serializer.validated_data['user'])
@ -700,24 +700,29 @@ class TeamUsersList(SubListCreateAttachDetachAPIView):
parent_model = Team
relationship = 'users'
class TeamPermissionsList(SubListCreateAttachDetachAPIView):
model = Permission
serializer_class = PermissionSerializer
class TeamRolesList(SubListCreateAttachDetachAPIView):
model = Role
serializer_class = RoleSerializer
parent_model = Team
relationship = 'permissions'
parent_key = 'team'
relationship='member_role.children'
def get_queryset(self):
# FIXME: Default get_queryset should handle this.
# XXX: This needs to be the intersection between
# what roles the user has and what roles the viewer
# has access to see.
team = Team.objects.get(pk=self.kwargs['pk'])
base = Permission.objects.filter(team = team)
#if Team.can_user_administrate(self.request.user, team, None):
if self.request.user.can_access(Team, 'change', team, None):
return base
elif team.users.filter(pk=self.request.user.pk).count() > 0:
return base
raise PermissionDenied()
return team.member_role.children
# XXX: Need to enforce permissions
def post(self, request, *args, **kwargs):
# Forbid implicit role creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg='Role "id" field is missing')
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(type(self), self).post(request, *args, **kwargs)
class TeamProjectsList(SubListCreateAttachDetachAPIView):
@ -920,13 +925,30 @@ class UserTeamsList(SubListAPIView):
parent_model = User
relationship = 'teams'
class UserPermissionsList(SubListCreateAttachDetachAPIView):
model = Permission
serializer_class = PermissionSerializer
class UserRolesList(SubListCreateAttachDetachAPIView):
model = Role
serializer_class = RoleSerializer
parent_model = User
relationship = 'permissions'
parent_key = 'user'
relationship='roles'
def get_queryset(self):
# XXX: This needs to be the intersection between
# what roles the user has and what roles the viewer
# has access to see.
u = User.objects.get(pk=self.kwargs['pk'])
return u.roles
def post(self, request, *args, **kwargs):
# Forbid implicit role creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg='Role "id" field is missing')
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(type(self), self).post(request, *args, **kwargs)
class UserProjectsList(SubListAPIView):
@ -1047,10 +1069,6 @@ class CredentialActivityStreamList(SubListAPIView):
# Okay, let it through.
return super(type(self), self).get(request, *args, **kwargs)
class PermissionDetail(RetrieveUpdateDestroyAPIView):
model = Permission
serializer_class = PermissionSerializer
class InventoryScriptList(ListCreateAPIView):
@ -2872,7 +2890,7 @@ class UnifiedJobStdout(RetrieveAPIView):
return Response({'range': {'start': 0, 'end': 1, 'absolute_end': 1}, 'content': response_message})
else:
return Response(response_message)
if request.accepted_renderer.format in ('html', 'api', 'json'):
content_format = request.query_params.get('content_format', 'html')
content_encoding = request.query_params.get('content_encoding', None)
@ -3031,6 +3049,134 @@ class SettingsReset(APIView):
TowerSettings.objects.filter(key=settings_key).delete()
return Response(status=status.HTTP_204_NO_CONTENT)
#class RoleList(ListCreateAPIView):
class RoleList(ListAPIView):
model = Role
serializer_class = RoleSerializer
new_in_300 = True
# XXX: Permissions - only roles the user has access to see should be listed here
def get_queryset(self):
return Role.objects
# XXX: Need to define who can create custom roles, and then restrict access
# appropriately
# XXX: Need to define how we want to deal with administration of custom roles.
class RoleDetail(RetrieveUpdateAPIView):
model = Role
serializer_class = RoleSerializer
new_in_300 = True
# XXX: Permissions - only appropriate people should be able to change these
class RoleUsersList(SubListCreateAttachDetachAPIView):
model = User
serializer_class = UserSerializer
parent_model = Role
relationship = 'members'
def get_queryset(self):
# XXX: Access control
role = Role.objects.get(pk=self.kwargs['pk'])
return role.members
def post(self, request, *args, **kwargs):
# Forbid implicit role creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg='Role "id" field is missing')
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(type(self), self).post(request, *args, **kwargs)
class RoleTeamsList(ListAPIView):
model = Team
serializer_class = TeamSerializer
parent_model = Role
relationship = 'member_role.parents'
def get_queryset(self):
# TODO: Check
role = Role.objects.get(pk=self.kwargs['pk'])
return Team.objects.filter(member_role__children__in=[role])
def post(self, request, pk, *args, **kwargs):
# Forbid implicit role creation here
sub_id = request.data.get('id', None)
if not sub_id:
data = dict(msg='Role "id" field is missing')
return Response(data, status=status.HTTP_400_BAD_REQUEST)
# XXX: Need to pull in can_attach and can_unattach kinda code from SubListCreateAttachDetachAPIView
role = Role.objects.get(pk=self.kwargs['pk'])
team = Team.objects.get(pk=sub_id)
if request.data.get('disassociate', None):
team.member_role.children.remove(role)
else:
team.member_role.children.add(role)
return Response(status=status.HTTP_204_NO_CONTENT)
# XXX attach/detach needs to ensure we have the appropriate perms
class RoleParentsList(SubListAPIView):
model = Role
serializer_class = RoleSerializer
parent_model = Role
relationship = 'parents'
def get_queryset(self):
# XXX: This should be the intersection between the roles of the user
# and the roles that the requesting user has access to see
role = Role.objects.get(pk=self.kwargs['pk'])
return role.parents
class RoleChildrenList(SubListAPIView):
model = Role
serializer_class = RoleSerializer
parent_model = Role
relationship = 'children'
def get_queryset(self):
# XXX: This should be the intersection between the roles of the user
# and the roles that the requesting user has access to see
role = Role.objects.get(pk=self.kwargs['pk'])
return role.children
class ResourceDetail(RetrieveAPIView):
model = Resource
serializer_class = ResourceSerializer
new_in_300 = True
# XXX: Permissions - only roles the user has access to see should be listed here
def get_queryset(self):
return Resource.objects
class ResourceAccessList(ListAPIView):
model = User
serializer_class = ResourceAccessListElementSerializer
new_in_300 = True
def get_queryset(self):
self.resource_id = self.kwargs['pk']
resource = Resource.objects.get(pk=self.kwargs['pk'])
roles = set([p.role for p in resource.permissions.all()])
ancestors = set()
for r in roles:
ancestors.update(set(r.ancestors.all()))
return User.objects.filter(roles__in=list(ancestors))
# Create view functions for all of the class-based views to simplify inclusion
# in URL patterns and reverse URL lookups, converting CamelCase names to
# lowercase_with_underscore (e.g. MyView.as_view() becomes my_view).

View File

@ -1659,6 +1659,64 @@ class TowerSettingsAccess(BaseAccess):
def can_delete(self, obj):
return self.user.is_superuser
class RoleAccess(BaseAccess):
'''
TODO: XXX: Needs implemenation
'''
model = Role
def get_queryset(self):
if self.user.is_superuser:
return self.model.objects.all()
return self.model.objects.none()
def can_change(self, obj, data):
return self.user.is_superuser
def can_add(self, obj, data):
return self.user.is_superuser
def can_attach(self, obj, sub_obj, relationship, data,
skip_sub_obj_read_check=False):
return self.user.is_superuser
def can_unattach(self, obj, sub_obj, relationship):
return self.user.is_superuser
def can_delete(self, obj):
return self.user.is_superuser
class ResourceAccess(BaseAccess):
'''
TODO: XXX: Needs implemenation
'''
model = Role
def get_queryset(self):
if self.user.is_superuser:
return self.model.objects.all()
return self.model.objects.none()
def can_change(self, obj, data):
return self.user.is_superuser
def can_add(self, obj, data):
return self.user.is_superuser
def can_attach(self, obj, sub_obj, relationship, data,
skip_sub_obj_read_check=False):
return self.user.is_superuser
def can_unattach(self, obj, sub_obj, relationship):
return self.user.is_superuser
def can_delete(self, obj):
return self.user.is_superuser
register_access(User, UserAccess)
register_access(Organization, OrganizationAccess)
register_access(Inventory, InventoryAccess)
@ -1685,3 +1743,5 @@ register_access(UnifiedJob, UnifiedJobAccess)
register_access(ActivityStream, ActivityStreamAccess)
register_access(CustomInventoryScript, CustomInventoryScriptAccess)
register_access(TowerSettings, TowerSettingsAccess)
register_access(Role, RoleAccess)
register_access(Resource, ResourceAccess)

View File

@ -2,6 +2,7 @@
# All Rights Reserved.
# Django
from django.db import connection
from django.db.models.signals import post_save
from django.db.models.signals import m2m_changed
from django.db import models
@ -14,6 +15,7 @@ from django.db.models.fields.related import (
)
from django.core.exceptions import FieldError
from django.db.transaction import TransactionManagementError
# AWX
@ -63,6 +65,8 @@ class ResourceFieldDescriptor(ReverseSingleRelatedObjectDescriptor):
resource = super(ResourceFieldDescriptor, self).__get__(instance, instance_type)
if resource:
return resource
if connection.needs_rollback:
raise TransactionManagementError('Current transaction has failed, cannot create implicit resource')
resource = Resource.objects.create(content_object=instance)
setattr(instance, self.field.name, resource)
instance.save(update_fields=[self.field.name,])
@ -107,6 +111,9 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
if not self.role_name:
raise FieldError('Implicit role missing `role_name`')
if connection.needs_rollback:
raise TransactionManagementError('Current transaction has failed, cannot create implicit role')
role = Role.objects.create(name=self.role_name, content_object=instance)
if self.parent_role:
def resolve_field(obj, field):

View File

@ -18,6 +18,7 @@ from awx.main.models.activity_stream import * # noqa
from awx.main.models.ha import * # noqa
from awx.main.models.configuration import * # noqa
from awx.main.models.rbac import * # noqa
from awx.main.models.mixins import * # noqa
# Monkeypatch Django serializer to ignore django-taggit fields (which break
# the dumpdata command; see https://github.com/alex/django-taggit/issues/155).

View File

@ -8,7 +8,7 @@ from awx.main.models.rbac import Resource
from awx.main.fields import ImplicitResourceField
__all__ = 'ResourceMixin'
__all__ = ['ResourceMixin']
class ResourceMixin(models.Model):
@ -43,40 +43,7 @@ class ResourceMixin(models.Model):
def get_permissions(self, user):
'''
Returns a dict (or None) of the permissions a user has for a given
resource.
Note: Each field in the dict is the `or` of all respective permissions
that have been granted to the roles that are applicable for the given
user.
In example, if a user has been granted read access through a permission
on one role and write access through a permission on a separate role,
the returned dict will denote that the user has both read and write
access.
'''
qs = user.__class__.objects.filter(id=user.id, roles__descendents__permissions__resource=self.resource)
qs = qs.annotate(max_create = Max('roles__descendents__permissions__create'))
qs = qs.annotate(max_read = Max('roles__descendents__permissions__read'))
qs = qs.annotate(max_write = Max('roles__descendents__permissions__write'))
qs = qs.annotate(max_update = Max('roles__descendents__permissions__update'))
qs = qs.annotate(max_delete = Max('roles__descendents__permissions__delete'))
qs = qs.annotate(max_scm_update = Max('roles__descendents__permissions__scm_update'))
qs = qs.annotate(max_execute = Max('roles__descendents__permissions__execute'))
qs = qs.annotate(max_use = Max('roles__descendents__permissions__use'))
qs = qs.values('max_create', 'max_read', 'max_write', 'max_update',
'max_delete', 'max_scm_update', 'max_execute', 'max_use')
res = qs.all()
if len(res):
# strip away the 'max_' prefix
return {k[4:]:v for k,v in res[0].items()}
return None
return self.resource.get_permissions(user)
def accessible_by(self, user, permissions):
'''

View File

@ -18,6 +18,7 @@ from django.utils.translation import ugettext_lazy as _
# AWX
from awx.main.fields import AutoOneToOneField, ImplicitRoleField
from awx.main.models.base import * # noqa
from awx.main.models.rbac import ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, ROLE_SINGLETON_SYSTEM_AUDITOR
from awx.main.models.mixins import ResourceMixin
from awx.main.conf import tower_settings
@ -50,12 +51,12 @@ class Organization(CommonModel, ResourceMixin):
)
admin_role = ImplicitRoleField(
role_name='Organization Administrator',
parent_role='singleton:System Administrator',
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Organization Auditor',
parent_role='singleton:System Auditor',
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,
permissions = {'read': True}
)
member_role = ImplicitRoleField(

View File

@ -6,6 +6,8 @@ import logging
# Django
from django.db import models
from django.db.models.aggregates import Max
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext_lazy as _
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
@ -13,10 +15,13 @@ from django.contrib.contenttypes.fields import GenericForeignKey
# AWX
from awx.main.models.base import * # noqa
__all__ = ['Role', 'RolePermission', 'Resource']
__all__ = ['Role', 'RolePermission', 'Resource', 'ROLE_SINGLETON_SYSTEM_ADMINISTRATOR', 'ROLE_SINGLETON_SYSTEM_AUDITOR']
logger = logging.getLogger('awx.main.models.rbac')
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR='System Administrator'
ROLE_SINGLETON_SYSTEM_AUDITOR='System Auditor'
class Role(CommonModelNameNotUnique):
'''
@ -40,6 +45,9 @@ class Role(CommonModelNameNotUnique):
super(Role, self).save(*args, **kwargs)
self.rebuild_role_ancestor_list()
def get_absolute_url(self):
return reverse('api:role_detail', args=(self.pk,))
def rebuild_role_ancestor_list(self):
'''
Updates our `ancestors` map to accurately reflect all of the ancestors for a role
@ -91,7 +99,7 @@ class Role(CommonModelNameNotUnique):
try:
return Role.objects.get(singleton_name=name)
except Role.DoesNotExist:
ret = Role(singleton_name=name)
ret = Role(singleton_name=name, name=name)
ret.save()
return ret
@ -113,6 +121,70 @@ class Resource(CommonModelNameNotUnique):
object_id = models.PositiveIntegerField(null=True, default=None)
content_object = GenericForeignKey('content_type', 'object_id')
def get_permissions(self, user):
'''
Returns a dict (or None) of the permissions a user has for a given
resource.
Note: Each field in the dict is the `or` of all respective permissions
that have been granted to the roles that are applicable for the given
user.
In example, if a user has been granted read access through a permission
on one role and write access through a permission on a separate role,
the returned dict will denote that the user has both read and write
access.
'''
qs = user.__class__.objects.filter(id=user.id, roles__descendents__permissions__resource=self)
qs = qs.annotate(max_create = Max('roles__descendents__permissions__create'))
qs = qs.annotate(max_read = Max('roles__descendents__permissions__read'))
qs = qs.annotate(max_write = Max('roles__descendents__permissions__write'))
qs = qs.annotate(max_update = Max('roles__descendents__permissions__update'))
qs = qs.annotate(max_delete = Max('roles__descendents__permissions__delete'))
qs = qs.annotate(max_scm_update = Max('roles__descendents__permissions__scm_update'))
qs = qs.annotate(max_execute = Max('roles__descendents__permissions__execute'))
qs = qs.annotate(max_use = Max('roles__descendents__permissions__use'))
qs = qs.values('max_create', 'max_read', 'max_write', 'max_update',
'max_delete', 'max_scm_update', 'max_execute', 'max_use')
res = qs.all()
if len(res):
# strip away the 'max_' prefix
return {k[4:]:v for k,v in res[0].items()}
return None
def get_role_permissions(self, role):
'''
Returns a dict (or None) of the permissions a role has for a given
resource.
Note: Each field in the dict is the `or` of all respective permissions
that have been granted to either the role or any descendents of that role.
'''
qs = Role.objects.filter(id=role.id, descendents__permissions__resource=self)
qs = qs.annotate(max_create = Max('descendents__permissions__create'))
qs = qs.annotate(max_read = Max('descendents__permissions__read'))
qs = qs.annotate(max_write = Max('descendents__permissions__write'))
qs = qs.annotate(max_update = Max('descendents__permissions__update'))
qs = qs.annotate(max_delete = Max('descendents__permissions__delete'))
qs = qs.annotate(max_scm_update = Max('descendents__permissions__scm_update'))
qs = qs.annotate(max_execute = Max('descendents__permissions__execute'))
qs = qs.annotate(max_use = Max('descendents__permissions__use'))
qs = qs.values('max_create', 'max_read', 'max_write', 'max_update',
'max_delete', 'max_scm_update', 'max_execute', 'max_use')
res = qs.all()
if len(res):
# strip away the 'max_' prefix
return {k[4:]:v for k,v in res[0].items()}
return None
class RolePermission(CreatedModifiedModel):
'''

View File

@ -116,12 +116,36 @@ def store_initial_active_state(sender, **kwargs):
instance._saved_active_state = True
def rebuild_role_ancestor_list(sender, reverse, model, instance, pk_set, **kwargs):
'When a role parent is added or removed, update our role hierarchy list'
if reverse:
for id in pk_set:
model.objects.get(id=id).rebuild_role_ancestor_list()
else:
instance.rebuild_role_ancestor_list()
def sync_superuser_status_to_rbac(sender, instance, **kwargs):
'When the is_superuser flag is changed on a user, reflect that in the membership of the System Admnistrator role'
if instance.is_superuser:
Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.add(instance)
else:
Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance)
def sync_user_to_team_members_role(sender, reverse, model, instance, pk_set, action, **kwargs):
'When a user is added or removed from Team.users, ensure that is reflected in Team.member_role'
if action == 'post_add' or action == 'pre_remove':
if reverse:
for team in Team.objects.filter(id__in=pk_set).all():
if action == 'post_add':
team.member_role.members.add(instance)
if action == 'pre_remove':
team.member_role.members.remove(instance)
else:
for user in User.objects.filter(id__in=pk_set).all():
if action == 'post_add':
instance.member_role.members.add(user)
if action == 'pre_remove':
instance.member_role.members.remove(user)
pre_save.connect(store_initial_active_state, sender=Host)
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
@ -142,6 +166,8 @@ post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Job)
post_save.connect(emit_job_event_detail, sender=JobEvent)
post_save.connect(emit_ad_hoc_command_event_detail, sender=AdHocCommandEvent)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
post_save.connect(sync_superuser_status_to_rbac, sender=User)
m2m_changed.connect(sync_user_to_team_members_role, Team.users.through)
#m2m_changed.connect(rebuild_group_parent_roles, Group.parents.through)
# Migrate hosts, groups to parent group(s) whenever a group is deleted or
@ -312,7 +338,6 @@ model_serializer_mapping = {
Credential: CredentialSerializer,
Team: TeamSerializer,
Project: ProjectSerializer,
Permission: PermissionSerializer,
JobTemplate: JobTemplateSerializer,
Job: JobSerializer,
AdHocCommand: AdHocCommandSerializer,

View File

@ -2,9 +2,6 @@ import pytest
from django.core.urlresolvers import resolve
from django.utils.six.moves.urllib.parse import urlparse
from awx.main.models.organization import Organization
from awx.main.models.ha import Instance
from django.contrib.auth.models import User
from rest_framework.test import (
@ -25,6 +22,9 @@ from awx.main.models.organization import (
Team,
)
from awx.main.models.rbac import Role
@pytest.fixture
def user():
def u(name, is_superuser=False):
@ -89,6 +89,22 @@ def credential():
def inventory(organization):
return Inventory.objects.create(name="test-inventory", organization=organization)
@pytest.fixture
def role():
return Role.objects.create(name='role')
@pytest.fixture
def admin(user):
return user('admin', True)
@pytest.fixture
def alice(user):
return user('alice', False)
@pytest.fixture
def bob(user):
return user('bob', False)
@pytest.fixture
def group(inventory):
def g(name):
@ -108,6 +124,7 @@ def permissions():
'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,},
}
@pytest.fixture
def post():
def rf(url, data, user=None, middleware=None, **kwargs):

View File

@ -0,0 +1,365 @@
import mock # noqa
import pytest
from django.core.urlresolvers import reverse
from awx.main.models.rbac import Role
def mock_feature_enabled(feature, bypass_database=None):
return True
#@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
#
# /roles
#
@pytest.mark.django_db
def test_get_roles_list_admin(organization, get, admin):
'Admin can see list of all roles'
url = reverse('api:role_list')
response = get(url, admin)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Unimplemented')
def test_get_roles_list_user(organization, get, user):
'Users can see all roles they have access to, but not all roles'
assert False
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_create_role(post, admin):
'Admins can create new roles'
#u = user('admin', True)
response = post(reverse('api:role_list'), {'name': 'New Role'}, admin)
assert response.status_code == 201
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_delete_role(post, admin):
'Admins can delete a custom role'
assert False
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_user_create_role(organization, get, user):
'User can create custom roles'
assert False
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_user_delete_role(organization, get, user):
'User can delete their custom roles, but not any old row'
assert False
#
# /user/<id>/roles
#
@pytest.mark.django_db
def test_get_user_roles_list(get, admin):
url = reverse('api:user_roles_list', args=(admin.id,))
response = get(url, admin)
assert response.status_code == 200
roles = response.data
assert roles['count'] > 0 # 'System Administrator' role if nothing else
@pytest.mark.django_db
def test_add_role_to_user(role, post, admin):
assert admin.roles.filter(id=role.id).count() == 0
url = reverse('api:user_roles_list', args=(admin.id,))
response = post(url, {'id': role.id}, admin)
assert response.status_code == 204
assert admin.roles.filter(id=role.id).count() == 1
response = post(url, {'id': role.id}, admin)
assert response.status_code == 204
assert admin.roles.filter(id=role.id).count() == 1
response = post(url, {}, admin)
assert response.status_code == 400
assert admin.roles.filter(id=role.id).count() == 1
@pytest.mark.django_db
def test_remove_role_from_user(role, post, admin):
assert admin.roles.filter(id=role.id).count() == 0
url = reverse('api:user_roles_list', args=(admin.id,))
response = post(url, {'id': role.id}, admin)
assert response.status_code == 204
assert admin.roles.filter(id=role.id).count() == 1
response = post(url, {'disassociate': role.id, 'id': role.id}, admin)
assert response.status_code == 204
assert admin.roles.filter(id=role.id).count() == 0
#
# /team/<id>/roles
#
@pytest.mark.django_db
def test_get_teams_roles_list(get, team, organization, admin):
team.member_role.children.add(organization.admin_role)
url = reverse('api:team_roles_list', args=(team.id,))
response = get(url, admin)
assert response.status_code == 200
roles = response.data
assert roles['count'] == 1
assert roles['results'][0]['id'] == organization.admin_role.id
@pytest.mark.django_db
def test_add_role_to_teams(team, role, post, admin):
assert team.member_role.children.filter(id=role.id).count() == 0
url = reverse('api:team_roles_list', args=(team.id,))
response = post(url, {'id': role.id}, admin)
assert response.status_code == 204
assert team.member_role.children.filter(id=role.id).count() == 1
response = post(url, {'id': role.id}, admin)
assert response.status_code == 204
assert team.member_role.children.filter(id=role.id).count() == 1
response = post(url, {}, admin)
assert response.status_code == 400
assert team.member_role.children.filter(id=role.id).count() == 1
@pytest.mark.django_db
def test_remove_role_from_teams(team, role, post, admin):
assert team.member_role.children.filter(id=role.id).count() == 0
url = reverse('api:team_roles_list', args=(team.id,))
response = post(url, {'id': role.id}, admin)
assert response.status_code == 204
assert team.member_role.children.filter(id=role.id).count() == 1
response = post(url, {'disassociate': role.id, 'id': role.id}, admin)
assert response.status_code == 204
assert team.member_role.children.filter(id=role.id).count() == 0
#
# /roles/<id>/
#
@pytest.mark.django_db
def test_get_role(get, admin, role):
url = reverse('api:role_detail', args=(role.id,))
response = get(url, admin)
assert response.status_code == 200
assert response.data['id'] == role.id
@pytest.mark.django_db
def test_put_role(put, admin, role):
url = reverse('api:role_detail', args=(role.id,))
response = put(url, {'name': 'Some new name'}, admin)
assert response.status_code == 200
r = Role.objects.get(id=role.id)
assert r.name == 'Some new name'
@pytest.mark.django_db
def test_put_role_access_denied(put, alice, admin, role):
url = reverse('api:role_detail', args=(role.id,))
response = put(url, {'name': 'Some new name'}, alice)
assert response.status_code == 403
#
# /roles/<id>/users/
#
@pytest.mark.django_db
def test_get_role_users(get, admin, role):
role.members.add(admin)
url = reverse('api:role_users_list', args=(role.id,))
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == admin.id
@pytest.mark.django_db
def test_add_user_to_role(post, admin, role):
url = reverse('api:role_users_list', args=(role.id,))
assert role.members.filter(id=admin.id).count() == 0
post(url, {'id': admin.id}, admin)
assert role.members.filter(id=admin.id).count() == 1
@pytest.mark.django_db
def test_remove_user_to_role(post, admin, role):
role.members.add(admin)
url = reverse('api:role_users_list', args=(role.id,))
assert role.members.filter(id=admin.id).count() == 1
post(url, {'disassociate': True, 'id': admin.id}, admin)
assert role.members.filter(id=admin.id).count() == 0
#
# /roles/<id>/teams/
#
@pytest.mark.django_db
def test_get_role_teams(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_teams_list', args=(role.id,))
response = get(url, admin)
print(response.data)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.id
@pytest.mark.django_db
def test_add_team_to_role(post, team, admin, role):
url = reverse('api:role_teams_list', args=(role.id,))
assert role.members.filter(id=admin.id).count() == 0
res = post(url, {'id': team.id}, admin)
print res.data
assert res.status_code == 204
assert role.parents.filter(id=team.member_role.id).count() == 1
@pytest.mark.django_db
def test_remove_team_from_role(post, team, admin, role):
role.members.add(admin)
url = reverse('api:role_teams_list', args=(role.id,))
assert role.members.filter(id=admin.id).count() == 1
res = post(url, {'disassociate': True, 'id': team.id}, admin)
print res.data
assert res.status_code == 204
assert role.parents.filter(id=team.member_role.id).count() == 0
#
# /roles/<id>/parents/
#
@pytest.mark.django_db
def test_role_parents(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_parents_list', args=(role.id,))
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.member_role.id
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_add_parent(post, team, admin, role):
assert role.parents.count() == 0
url = reverse('api:role_parents_list', args=(role.id,))
post(url, {'id': team.member_role.id}, admin)
assert role.parents.count() == 1
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_remove_parent(post, team, admin, role):
role.parents.add(team.member_role)
assert role.parents.count() == 1
url = reverse('api:role_parents_list', args=(role.id,))
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
assert role.parents.count() == 0
#
# /roles/<id>/children/
#
@pytest.mark.django_db
def test_role_children(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_children_list', args=(team.member_role.id,))
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == role.id
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_add_children(post, team, admin, role):
assert role.children.count() == 0
url = reverse('api:role_children_list', args=(role.id,))
post(url, {'id': team.member_role.id}, admin)
assert role.children.count() == 1
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Waiting on custom role requirements')
def test_role_remove_children(post, team, admin, role):
role.children.add(team.member_role)
assert role.children.count() == 1
url = reverse('api:role_children_list', args=(role.id,))
post(url, {'disassociate': True, 'id': team.member_role.id}, admin)
assert role.children.count() == 0
#
# /resource/<id>/access_list
#
@pytest.mark.django_db
def test_resource_access_list(get, team, admin, role):
team.users.add(admin)
url = reverse('api:resource_access_list', args=(team.resource.id,))
res = get(url, admin)
assert res.status_code == 200
#
# Generics
#
@pytest.mark.django_db
def test_ensure_rbac_fields_are_present(organization, get, admin):
url = reverse('api:organization_detail', args=(organization.id,))
response = get(url, admin)
assert response.status_code == 200
org = response.data
assert 'summary_fields' in org
assert 'resource_id' in org
assert org['resource_id'] > 0
assert org['related']['resource'] != ''
assert 'roles' in org['summary_fields']
org_role_response = get(org['summary_fields']['roles']['admin_role']['url'], admin)
assert org_role_response.status_code == 200
role = org_role_response.data
assert role['related']['organization'] == url
@pytest.mark.django_db
def test_ensure_permissions_is_present(organization, get, user):
#u = user('admin', True)
url = reverse('api:organization_detail', args=(organization.id,))
response = get(url, user('admin', True))
assert response.status_code == 200
org = response.data
assert 'summary_fields' in org
assert 'permissions' in org['summary_fields']
assert org['summary_fields']['permissions']['read'] > 0
@pytest.mark.django_db
def test_ensure_role_summary_is_present(organization, get, user):
#u = user('admin', True)
url = reverse('api:organization_detail', args=(organization.id,))
response = get(url, user('admin', True))
assert response.status_code == 200
org = response.data
assert 'summary_fields' in org
assert 'roles' in org['summary_fields']
assert org['summary_fields']['roles']['admin_role']['id'] > 0

View File

@ -0,0 +1,103 @@
import pytest
from awx.main.models import (
Role,
Organization,
)
@pytest.mark.django_db
def test_auto_inheritance_by_children(organization, alice):
A = Role.objects.create(name='A')
B = Role.objects.create(name='B')
A.members.add(alice)
assert organization.accessible_by(alice, {'read': True}) is False
A.children.add(B)
assert organization.accessible_by(alice, {'read': True}) is False
A.children.add(organization.admin_role)
assert organization.accessible_by(alice, {'read': True}) is True
A.children.remove(organization.admin_role)
assert organization.accessible_by(alice, {'read': True}) is False
B.children.add(organization.admin_role)
assert organization.accessible_by(alice, {'read': True}) is True
B.children.remove(organization.admin_role)
assert organization.accessible_by(alice, {'read': True}) is False
@pytest.mark.django_db
def test_auto_inheritance_by_parents(organization, alice):
A = Role.objects.create(name='A')
B = Role.objects.create(name='B')
A.members.add(alice)
assert organization.accessible_by(alice, {'read': True}) is False
B.parents.add(A)
assert organization.accessible_by(alice, {'read': True}) is False
organization.admin_role.parents.add(A)
assert organization.accessible_by(alice, {'read': True}) is True
organization.admin_role.parents.remove(A)
assert organization.accessible_by(alice, {'read': True}) is False
organization.admin_role.parents.add(B)
assert organization.accessible_by(alice, {'read': True}) is True
organization.admin_role.parents.remove(B)
assert organization.accessible_by(alice, {'read': True}) is False
@pytest.mark.django_db
def test_permission_union(organization, alice):
A = Role.objects.create(name='A')
A.members.add(alice)
B = Role.objects.create(name='B')
B.members.add(alice)
assert organization.accessible_by(alice, {'read': True, 'write': True}) is False
A.grant(organization, {'read': True})
assert organization.accessible_by(alice, {'read': True, 'write': True}) is False
B.grant(organization, {'write': True})
assert organization.accessible_by(alice, {'read': True, 'write': True}) is True
@pytest.mark.django_db
def test_team_symantics(organization, team, alice):
assert organization.accessible_by(alice, {'read': True}) is False
team.member_role.children.add(organization.auditor_role)
assert organization.accessible_by(alice, {'read': True}) is False
team.users.add(alice)
assert organization.accessible_by(alice, {'read': True}) is True
team.users.remove(alice)
assert organization.accessible_by(alice, {'read': True}) is False
alice.teams.add(team)
assert organization.accessible_by(alice, {'read': True}) is True
alice.teams.remove(team)
assert organization.accessible_by(alice, {'read': True}) is False
@pytest.mark.django_db
def test_auto_m2m_adjuments(organization, project, alice):
'Ensures the auto role reparenting is working correctly through m2m maps'
organization.admin_role.members.add(alice)
assert project.accessible_by(alice, {'read': True}) is True
project.organizations.remove(organization)
assert project.accessible_by(alice, {'read': True}) is False
project.organizations.add(organization)
assert project.accessible_by(alice, {'read': True}) is True
organization.projects.remove(project)
assert project.accessible_by(alice, {'read': True}) is False
organization.projects.add(project)
assert project.accessible_by(alice, {'read': True}) is True
@pytest.mark.django_db
@pytest.mark.skipif(True, reason='Unimplemented')
def test_auto_field_adjuments(organization, inventory, team, alice):
'Ensures the auto role reparenting is working correctly through m2m maps'
org2 = Organization.objects.create(name='Org 2', description='org 2')
org2.admin_role.members.add(alice)
assert inventory.accessible_by(alice, {'read': True}) is False
inventory.organization = org2
assert inventory.accessible_by(alice, {'read': True}) is True
inventory.organization = organization
assert inventory.accessible_by(alice, {'read': True}) is False

View File

@ -11,7 +11,7 @@ from django.apps import apps
@pytest.mark.django_db
def test_organization_migration_admin(organization, permissions, user):
u = user('admin', True)
u = user('admin', False)
organization.admins.add(u)
assert not organization.accessible_by(u, permissions['admin'])

View File

@ -1,7 +1,7 @@
import pytest
from awx.main.migrations import _rbac as rbac
from awx.main.models import Permission
from awx.main.models import Permission, Role
from django.apps import apps
from awx.main.migrations import _old_access as old_access
@ -24,6 +24,8 @@ def test_project_user_project(user_project, project, user):
@pytest.mark.django_db
def test_project_accessible_by_sa(user, project):
u = user('systemadmin', is_superuser=True)
# This gets setup by a signal, but we want to test the migration which will set this up too, so remove it
Role.singleton('System Administrator').members.remove(u)
assert project.accessible_by(u, {'read': True}) is False
rbac.migrate_organization(apps, None)

View File

@ -10,6 +10,9 @@ def test_team_migration_user(team, user, permissions):
team.users.add(u)
team.save()
# This gets setup by a signal handler, but we want to test the migration, so remove the user
team.member_role.members.remove(u)
assert not team.accessible_by(u, permissions['auditor'])
migrated = rbac.migrate_team(apps, None)

View File

@ -10,11 +10,16 @@ def test_user_admin(user_project, project, user):
admin = user('admin', is_superuser = True)
sa = Role.singleton('System Administrator')
# this should happen automatically with our signal
assert sa.members.filter(id=admin.id).exists() is True
sa.members.remove(admin)
assert sa.members.filter(id=joe.id).exists() is False
assert sa.members.filter(id=admin.id).exists() is False
migrations = rbac.migrate_users(apps, None)
# The migration should add the admin back in
assert sa.members.filter(id=joe.id).exists() is False
assert sa.members.filter(id=admin.id).exists() is True
assert len(migrations) == 1

View File

@ -30,7 +30,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
'get_ansible_version', 'get_ssh_version', 'get_awx_version', 'update_scm_url',
'get_type_for_model', 'get_model_for_type', 'to_python_boolean',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict']
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided']
def get_object_or_400(klass, *args, **kwargs):
@ -521,3 +521,21 @@ def timedelta_total_seconds(timedelta):
timedelta.microseconds + 0.0 +
(timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
class NoDefaultProvided(object):
pass
def getattrd(obj, name, default=NoDefaultProvided):
"""
Same as getattr(), but allows dot notation lookup
Discussed in:
http://stackoverflow.com/questions/11975781
"""
try:
return reduce(getattr, name.split("."), obj)
except AttributeError:
if default != NoDefaultProvided:
return default
raise