From 20aa8c02d1ff43659a3c6ce24ff2fc2e8f223f2a Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Thu, 24 Mar 2016 10:45:49 -0400 Subject: [PATCH] Added accessible_by/objects support for Team --- awx/main/models/mixins.py | 24 +++++++++++++++------ awx/main/models/rbac.py | 10 ++++++++- awx/main/tests/functional/test_rbac_team.py | 23 ++++++++++++++++++++ 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index 639611fbca..0160ca9be5 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -2,11 +2,14 @@ from django.db import models from django.db.models.aggregates import Max from django.contrib.contenttypes.fields import GenericRelation +from django.contrib.contenttypes.models import ContentType +from django.contrib.auth.models import User # noqa # AWX from awx.main.models.rbac import ( get_user_permissions_on_resource, get_role_permissions_on_resource, + Role, ) @@ -20,7 +23,7 @@ class ResourceMixin(models.Model): role_permissions = GenericRelation('main.RolePermission') @classmethod - def accessible_objects(cls, user, permissions): + def accessible_objects(cls, accessor, permissions): ''' Use instead of `MyModel.objects` when you want to only consider resources that a user has specific permissions for. For example: @@ -32,13 +35,22 @@ class ResourceMixin(models.Model): performant to resolve the resource in question then call `myresource.get_permissions(user)`. ''' - return ResourceMixin._accessible_objects(cls, user, permissions) + return ResourceMixin._accessible_objects(cls, accessor, permissions) @staticmethod - def _accessible_objects(cls, user, permissions): - qs = cls.objects.filter( - role_permissions__role__ancestors__members=user - ) + def _accessible_objects(cls, accessor, permissions): + if type(accessor) == User: + qs = cls.objects.filter( + role_permissions__role__ancestors__members=accessor + ) + else: + accessor_type = ContentType.objects.get_for_model(accessor) + roles = Role.objects.filter(content_type__pk=accessor_type.id, + object_id=accessor.id) + qs = cls.objects.filter( + role_permissions__role__ancestors__in=roles + ) + for perm in permissions: qs = qs.annotate(**{'max_' + perm: Max('role_permissions__' + perm)}) qs = qs.filter(**{'max_' + perm: int(permissions[perm])}) diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 644ffa1315..b1f3ca0a57 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -16,6 +16,7 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.fields import GenericForeignKey # AWX +from django.contrib.auth.models import User # noqa from awx.main.models.base import * # noqa __all__ = [ @@ -195,10 +196,17 @@ def get_user_permissions_on_resource(resource, user): access. ''' + if type(user) == User: + roles = user.roles.all() + else: + accessor_type = ContentType.objects.get_for_model(user) + roles = Role.objects.filter(content_type__pk=accessor_type.id, + object_id=user.id) + qs = RolePermission.objects.filter( content_type=ContentType.objects.get_for_model(resource), object_id=resource.id, - role__ancestors__in=user.roles.all() + role__ancestors__in=roles, ) res = qs = qs.aggregate( diff --git a/awx/main/tests/functional/test_rbac_team.py b/awx/main/tests/functional/test_rbac_team.py index 0c4ed86b34..a6ad507e22 100644 --- a/awx/main/tests/functional/test_rbac_team.py +++ b/awx/main/tests/functional/test_rbac_team.py @@ -1,6 +1,7 @@ import pytest from awx.main.access import TeamAccess +from awx.main.models import Project @pytest.mark.django_db def test_team_access_superuser(team, user): @@ -48,3 +49,25 @@ def test_team_access_member(organization, team, user): assert len(t.member_role.members.all()) == 1 assert len(t.organization.admin_role.members.all()) == 0 +@pytest.mark.django_db +def test_team_accessible_by(team, user, project): + u = user('team_member', False) + + team.member_role.children.add(project.member_role) + assert project.accessible_by(team, {'read':True}) + assert not project.accessible_by(u, {'read':True}) + + team.member_role.members.add(u) + assert project.accessible_by(u, {'read':True}) + +@pytest.mark.django_db +def test_team_accessible_objects(team, user, project): + u = user('team_member', False) + + team.member_role.children.add(project.member_role) + assert len(Project.accessible_objects(team, {'read':True})) == 1 + assert not Project.accessible_objects(u, {'read':True}) + + team.member_role.members.add(u) + assert len(Project.accessible_objects(u, {'read':True})) == 1 +