mirror of
https://github.com/ansible/awx.git
synced 2026-01-21 06:28:01 -03:30
Replaced get user permissions with get_roles_on_resource
This commit is contained in:
parent
128a4f1823
commit
fa10d562c1
@ -1485,7 +1485,7 @@ class ResourceAccessListElementSerializer(UserSerializer):
|
||||
|
||||
if 'summary_fields' not in ret:
|
||||
ret['summary_fields'] = {}
|
||||
ret['summary_fields']['permissions'] = get_user_permissions_on_resource(obj, user)
|
||||
ret['summary_fields']['permissions'] = get_roles_on_resource(obj, user)
|
||||
|
||||
def format_role_perm(role):
|
||||
role_dict = { 'id': role.id, 'name': role.name, 'description': role.description}
|
||||
@ -1495,7 +1495,7 @@ class ResourceAccessListElementSerializer(UserSerializer):
|
||||
role_dict['related'] = reverse_gfk(role.content_object)
|
||||
except:
|
||||
pass
|
||||
return { 'role': role_dict, 'permissions': get_role_permissions_on_resource(obj, role)}
|
||||
return { 'role': role_dict, 'permissions': get_roles_on_resource(obj, role)}
|
||||
|
||||
def format_team_role_perm(team_role, permissive_role_ids):
|
||||
role = team_role.children.filter(id__in=permissive_role_ids)[0]
|
||||
@ -1513,7 +1513,7 @@ class ResourceAccessListElementSerializer(UserSerializer):
|
||||
role_dict['related'] = reverse_gfk(role.content_object)
|
||||
except:
|
||||
pass
|
||||
return { 'role': role_dict, 'permissions': get_role_permissions_on_resource(obj, team_role)}
|
||||
return { 'role': role_dict, 'permissions': get_roles_on_resource(obj, team_role)}
|
||||
|
||||
team_content_type = ContentType.objects.get_for_model(Team)
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
|
||||
@ -667,7 +667,7 @@ class OrganizationDetail(RetrieveUpdateDestroyAPIView):
|
||||
org_id = int(self.kwargs['pk'])
|
||||
|
||||
org_counts = {}
|
||||
access_kwargs = {'accessor': self.request.user, 'permissions': {"read": True}}
|
||||
access_kwargs = {'accessor': self.request.user, 'role_name': 'read_role'}
|
||||
direct_counts = Organization.objects.filter(id=org_id).annotate(
|
||||
users=Count('member_role__members', distinct=True),
|
||||
admins=Count('admin_role__members', distinct=True)
|
||||
|
||||
@ -65,8 +65,8 @@ def register_access(model_class, access_class):
|
||||
def user_admin_role(self):
|
||||
return Role.objects.get(content_type=ContentType.objects.get_for_model(User), object_id=self.id)
|
||||
|
||||
def user_accessible_objects(user, role):
|
||||
return ResourceMixin._accessible_objects(User, user, role)
|
||||
def user_accessible_objects(user, role_name):
|
||||
return ResourceMixin._accessible_objects(User, user, role_name)
|
||||
|
||||
def get_user_queryset(user, model_class):
|
||||
'''
|
||||
|
||||
@ -1,13 +1,11 @@
|
||||
# Django
|
||||
from django.db import models
|
||||
from django.db.models.aggregates import Max
|
||||
from django.contrib.contenttypes.fields import GenericRelation
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.auth.models import User # noqa
|
||||
|
||||
# AWX
|
||||
from awx.main.models.rbac import (
|
||||
Role,
|
||||
Role, get_roles_on_resource
|
||||
)
|
||||
|
||||
|
||||
@ -55,45 +53,12 @@ class ResourceMixin(models.Model):
|
||||
return qs
|
||||
|
||||
|
||||
def get_permissions(self, user):
|
||||
def get_permissions(self, accessor):
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a user has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to the roles that are applicable for the given
|
||||
user.
|
||||
|
||||
In example, if a user has been granted read access through a permission
|
||||
on one role and write access through a permission on a separate role,
|
||||
the returned dict will denote that the user has both read and write
|
||||
access.
|
||||
Returns a dict (or None) of the roles a accessor has for a given resource.
|
||||
An accessor can be either a User, Role, or an arbitrary resource that
|
||||
contains one or more Roles associated with it.
|
||||
'''
|
||||
|
||||
return get_user_permissions_on_resource(self, user)
|
||||
return get_roles_on_resource(self, accessor)
|
||||
|
||||
|
||||
def get_role_permissions(self, role):
|
||||
'''
|
||||
Returns a dict (or None) of the permissions a role has for a given
|
||||
resource.
|
||||
|
||||
Note: Each field in the dict is the `or` of all respective permissions
|
||||
that have been granted to either the role or any descendents of that role.
|
||||
'''
|
||||
|
||||
return get_role_permissions_on_resource(self, role)
|
||||
|
||||
|
||||
def accessible_by(self, user, permissions):
|
||||
'''
|
||||
Returns true if the user has all of the specified permissions
|
||||
'''
|
||||
|
||||
perms = self.get_permissions(user)
|
||||
if perms is None:
|
||||
return False
|
||||
for k in permissions:
|
||||
if k not in perms or perms[k] < permissions[k]:
|
||||
return False
|
||||
return True
|
||||
|
||||
@ -70,7 +70,7 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin):
|
||||
read_role = ImplicitRoleField(
|
||||
role_name='Organization Read Access',
|
||||
role_description='Read an organization',
|
||||
parent_role='member_role',
|
||||
parent_role=['member_role', 'auditor_role'],
|
||||
)
|
||||
|
||||
|
||||
@ -124,6 +124,11 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
|
||||
role_description='A member of this team',
|
||||
parent_role='admin_role',
|
||||
)
|
||||
read_role = ImplicitRoleField(
|
||||
role_name='Read',
|
||||
role_description='Can view this team',
|
||||
parent_role=['auditor_role', 'member_role'],
|
||||
)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('api:team_detail', args=(self.pk,))
|
||||
|
||||
@ -9,7 +9,6 @@ import contextlib
|
||||
# Django
|
||||
from django.db import models, transaction, connection
|
||||
from django.db.models import Q
|
||||
from django.db.models.aggregates import Max
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
@ -22,6 +21,7 @@ from awx.main.models.base import * # noqa
|
||||
__all__ = [
|
||||
'Role',
|
||||
'batch_role_ancestor_rebuilding',
|
||||
'get_roles_on_resource',
|
||||
'ROLE_SINGLETON_SYSTEM_ADMINISTRATOR',
|
||||
'ROLE_SINGLETON_SYSTEM_AUDITOR',
|
||||
]
|
||||
@ -345,3 +345,28 @@ class Role(CommonModelNameNotUnique):
|
||||
def is_ancestor_of(self, role):
|
||||
return role.ancestors.filter(id=self.id).exists()
|
||||
|
||||
|
||||
|
||||
|
||||
def get_roles_on_resource(resource, accessor):
|
||||
'''
|
||||
Returns a dict (or None) of the roles a accessor has for a given resource.
|
||||
An accessor can be either a User, Role, or an arbitrary resource that
|
||||
contains one or more Roles associated with it.
|
||||
'''
|
||||
|
||||
if type(accessor) == User:
|
||||
roles = accessor.roles.all()
|
||||
elif type(accessor) == Role:
|
||||
roles = accessor
|
||||
else:
|
||||
accessor_type = ContentType.objects.get_for_model(accessor)
|
||||
roles = Role.objects.filter(content_type__pk=accessor_type.id,
|
||||
object_id=accessor.id)
|
||||
|
||||
return { role.role_field: True for role in
|
||||
Role.objects.filter(
|
||||
content_type = ContentType.objects.get_for_model(resource),
|
||||
object_id = resource.id,
|
||||
ancestors = roles)}
|
||||
|
||||
|
||||
@ -176,8 +176,9 @@ def test_get_teams_roles_list(get, team, organization, admin):
|
||||
response = get(url, admin)
|
||||
assert response.status_code == 200
|
||||
roles = response.data
|
||||
assert roles['count'] == 1
|
||||
assert roles['results'][0]['id'] == organization.admin_role.id
|
||||
|
||||
assert roles['count'] == 2
|
||||
assert roles['results'][0]['id'] == organization.admin_role.id or roles['results'][1]['id'] == organization.admin_role.id
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@ -291,7 +292,7 @@ def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemp
|
||||
assert joe in check_jobtemplate.execute_role
|
||||
|
||||
post(reverse('api:role_users_list', args=(check_jobtemplate.execute_role.id,)), {'disassociate': True, 'id': joe.id}, org_admin)
|
||||
assert joe not in check_jobtemplate.execute
|
||||
assert joe not in check_jobtemplate.execute_role
|
||||
|
||||
|
||||
@pytest.mark.django_db(transaction=True)
|
||||
@ -317,7 +318,7 @@ def test_user_fail_to_remove_user_to_job_template(post, organization, check_jobt
|
||||
check_jobtemplate.execute_role.members.add(joe)
|
||||
|
||||
assert rando not in check_jobtemplate.admin_role
|
||||
assert joe not in check_jobtemplate.execute_role
|
||||
assert joe in check_jobtemplate.execute_role
|
||||
|
||||
res = post(reverse('api:role_users_list', args=(check_jobtemplate.execute_role.id,)), {'disassociate': True, 'id': joe.id}, rando)
|
||||
assert res.status_code == 403
|
||||
@ -380,8 +381,8 @@ def test_role_children(get, team, admin, role):
|
||||
url = reverse('api:role_children_list', args=(team.member_role.id,))
|
||||
response = get(url, admin)
|
||||
assert response.status_code == 200
|
||||
assert response.data['count'] == 1
|
||||
assert response.data['results'][0]['id'] == role.id
|
||||
assert response.data['count'] == 2
|
||||
assert response.data['results'][0]['id'] == role.id or response.data['results'][1]['id'] == role.id
|
||||
|
||||
|
||||
|
||||
@ -417,7 +418,7 @@ def test_ensure_permissions_is_present(organization, get, user):
|
||||
|
||||
assert 'summary_fields' in org
|
||||
assert 'permissions' in org['summary_fields']
|
||||
assert org['summary_fields']['permissions']['read'] > 0
|
||||
assert org['summary_fields']['permissions']['read_role'] > 0
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ensure_role_summary_is_present(organization, get, user):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user