[RBAC] Fix known issues with backward compatible access_list (#15052)

* Remove duplicate access_list entries for direct team access

* Revert test changes for superuser in access_list
This commit is contained in:
Alan Rominger 2024-04-02 15:07:39 -04:00
parent 2f3c9122fd
commit 389a729b75
4 changed files with 175 additions and 13 deletions

View File

@ -33,7 +33,7 @@ from rest_framework.negotiation import DefaultContentNegotiation
# django-ansible-base
from ansible_base.rest_filters.rest_framework.field_lookup_backend import FieldLookupBackend
from ansible_base.lib.utils.models import get_all_field_names
from ansible_base.rbac.models import RoleEvaluation
from ansible_base.rbac.models import RoleEvaluation, RoleDefinition
from ansible_base.rbac.permission_registry import permission_registry
# AWX
@ -810,7 +810,11 @@ class ResourceAccessList(ParentMixin, ListAPIView):
if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED:
ancestors = set(RoleEvaluation.objects.filter(content_type_id=content_type.id, object_id=obj.id).values_list('role_id', flat=True))
return (User.objects.filter(has_roles__in=ancestors) | User.objects.filter(is_superuser=True)).distinct()
qs = User.objects.filter(has_roles__in=ancestors) | User.objects.filter(is_superuser=True)
auditor_role = RoleDefinition.objects.filter(name="System Auditor").first()
if auditor_role:
qs |= User.objects.filter(roleuserassignment__role_definition=auditor_role)
return qs.distinct()
roles = set(Role.objects.filter(content_type=content_type, object_id=obj.id))

View File

@ -21,7 +21,7 @@ from jinja2.exceptions import TemplateSyntaxError, UndefinedError, SecurityError
# Django
from django.conf import settings
from django.contrib.auth import update_session_auth_hash
from django.contrib.auth.models import User, Permission
from django.contrib.auth.models import User
from django.contrib.auth.password_validation import validate_password as django_validate_password
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist, ValidationError as DjangoValidationError
@ -45,7 +45,8 @@ from polymorphic.models import PolymorphicModel
# django-ansible-base
from ansible_base.lib.utils.models import get_type_for_model
from ansible_base.rbac.models import RoleEvaluation
from ansible_base.rbac.models import RoleEvaluation, ObjectRole
from ansible_base.rbac import permission_registry
# AWX
from awx.main.access import get_user_capabilities
@ -2780,7 +2781,10 @@ class ResourceAccessListElementSerializer(UserSerializer):
if action in reversed_role_map:
role_names.add(reversed_role_map[action])
elif codename in reversed_org_map:
role_names.add(codename)
if isinstance(obj, Organization):
role_names.add(reversed_org_map[codename])
if 'view_organization' not in role_names:
role_names.add('read_role')
return list(role_names)
def format_role_perm(role):
@ -2799,10 +2803,10 @@ class ResourceAccessListElementSerializer(UserSerializer):
# Singleton roles should not be managed from this view, as per copy/edit rework spec
role_dict['user_capabilities'] = {'unattach': False}
if role.singleton_name:
descendant_perms = list(Permission.objects.filter(content_type=content_type).values_list('codename', flat=True))
model_name = content_type.model
if isinstance(obj, Organization):
descendant_perms = [codename for codename in get_role_codenames(role) if codename.endswith(model_name) or codename.startswith('add_')]
else:
model_name = content_type.model
descendant_perms = [codename for codename in get_role_codenames(role) if codename.endswith(model_name)]
return {'role': role_dict, 'descendant_roles': get_roles_from_perms(descendant_perms)}
@ -2852,7 +2856,7 @@ class ResourceAccessListElementSerializer(UserSerializer):
new_roles_seen = set()
all_team_roles = set()
all_permissive_role_ids = set()
for evaluation in RoleEvaluation.objects.filter(role__users=user, **gfk_kwargs).prefetch_related('role'):
for evaluation in RoleEvaluation.objects.filter(role__in=user.has_roles.all(), **gfk_kwargs).prefetch_related('role'):
new_role = evaluation.role
if new_role.id in new_roles_seen:
continue
@ -2867,9 +2871,48 @@ class ResourceAccessListElementSerializer(UserSerializer):
else:
ret['summary_fields']['indirect_access'].append(format_role_perm(old_role))
ret['summary_fields']['direct_access'].extend(
[y for x in (format_team_role_perm(r, direct_permissive_role_ids) for r in all_team_roles) for y in x]
)
# Lazy role creation gives us a big problem, where some intermediate roles are not easy to find
# like when a team has indirect permission, so here we get all roles the users teams have
# these contribute to all potential permission-granting roles of the object
user_teams_qs = permission_registry.team_model.objects.filter(member_roles__in=ObjectRole.objects.filter(users=user))
team_obj_roles = ObjectRole.objects.filter(teams__in=user_teams_qs)
for evaluation in RoleEvaluation.objects.filter(role__in=team_obj_roles, **gfk_kwargs).prefetch_related('role'):
new_role = evaluation.role
if new_role.id in new_roles_seen:
continue
new_roles_seen.add(new_role.id)
old_role = get_role_from_object_role(new_role)
all_permissive_role_ids.add(old_role.id)
# In DAB RBAC, superuser is strictly a user flag, and global roles are not in the RoleEvaluation table
if user.is_superuser:
ret['summary_fields'].setdefault('indirect_access', [])
all_role_names = [field.name for field in obj._meta.get_fields() if isinstance(field, ImplicitRoleField)]
ret['summary_fields']['indirect_access'].append(
{
"role": {
"id": None,
"name": _("System Administrator"),
"description": _("Can manage all aspects of the system"),
"user_capabilities": {"unattach": False},
},
"descendant_roles": all_role_names,
}
)
elif user.is_system_auditor:
ret['summary_fields'].setdefault('indirect_access', [])
ret['summary_fields']['indirect_access'].append(
{
"role": {
"id": None,
"name": _("System Auditor"),
"description": _("Can view all aspects of the system"),
"user_capabilities": {"unattach": False},
},
"descendant_roles": ["read_role"],
}
)
ret['summary_fields']['direct_access'].extend([y for x in (format_team_role_perm(r, all_permissive_role_ids) for r in all_team_roles) for y in x])
return ret

View File

@ -1,6 +1,7 @@
import pytest
from awx.api.versioning import reverse
from awx.main.models import Role
@pytest.mark.django_db
@ -38,7 +39,7 @@ def test_indirect_access_list(get, organization, project, team_factory, user, ad
assert len(team_admin_res['summary_fields']['direct_access']) == 1
assert len(team_admin_res['summary_fields']['indirect_access']) == 0
assert len(admin_res['summary_fields']['direct_access']) == 0
assert len(admin_res['summary_fields']['indirect_access']) == 0 # decreased to 0 because system admin role no longer exists
assert len(admin_res['summary_fields']['indirect_access']) == 1
project_admin_entry = project_admin_res['summary_fields']['direct_access'][0]['role']
assert project_admin_entry['id'] == project.admin_role.id
@ -51,3 +52,6 @@ def test_indirect_access_list(get, organization, project, team_factory, user, ad
assert project_admin_team_member_entry['id'] == project.admin_role.id
assert project_admin_team_member_entry['team_id'] == project_admin_team.id
assert project_admin_team_member_entry['team_name'] == project_admin_team.name
admin_entry = admin_res['summary_fields']['indirect_access'][0]['role']
assert admin_entry['name'] == Role.singleton('system_administrator').name

View File

@ -0,0 +1,111 @@
import pytest
from awx.main.models import User
from awx.api.versioning import reverse
@pytest.mark.django_db
def test_access_list_superuser(get, admin_user, inventory):
url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id})
response = get(url, user=admin_user, expect=200)
by_username = {}
for entry in response.data['results']:
by_username[entry['username']] = entry
assert 'admin' in by_username
assert len(by_username['admin']['summary_fields']['indirect_access']) == 1
assert len(by_username['admin']['summary_fields']['direct_access']) == 0
access_entry = by_username['admin']['summary_fields']['indirect_access'][0]
assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role'])
@pytest.mark.django_db
def test_access_list_system_auditor(get, admin_user, inventory):
sys_auditor = User.objects.create(username='sys-aud')
sys_auditor.is_system_auditor = True
assert sys_auditor.is_system_auditor
url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id})
response = get(url, user=admin_user, expect=200)
by_username = {}
for entry in response.data['results']:
by_username[entry['username']] = entry
assert 'sys-aud' in by_username
assert len(by_username['sys-aud']['summary_fields']['indirect_access']) == 1
assert len(by_username['sys-aud']['summary_fields']['direct_access']) == 0
access_entry = by_username['sys-aud']['summary_fields']['indirect_access'][0]
assert access_entry['descendant_roles'] == ['read_role']
@pytest.mark.django_db
def test_access_list_direct_access(get, admin_user, inventory):
u1 = User.objects.create(username='u1')
inventory.admin_role.members.add(u1)
url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id})
response = get(url, user=admin_user, expect=200)
by_username = {}
for entry in response.data['results']:
by_username[entry['username']] = entry
assert 'u1' in by_username
assert len(by_username['u1']['summary_fields']['direct_access']) == 1
assert len(by_username['u1']['summary_fields']['indirect_access']) == 0
access_entry = by_username['u1']['summary_fields']['direct_access'][0]
assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role'])
@pytest.mark.django_db
def test_access_list_organization_access(get, admin_user, inventory):
u2 = User.objects.create(username='u2')
inventory.organization.inventory_admin_role.members.add(u2)
# User has indirect access to the inventory
url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id})
response = get(url, user=admin_user, expect=200)
by_username = {}
for entry in response.data['results']:
by_username[entry['username']] = entry
assert 'u2' in by_username
assert len(by_username['u2']['summary_fields']['indirect_access']) == 1
assert len(by_username['u2']['summary_fields']['direct_access']) == 0
access_entry = by_username['u2']['summary_fields']['indirect_access'][0]
assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role'])
# Test that user shows up in the organization access list with direct access of expected roles
url = reverse('api:organization_access_list', kwargs={'pk': inventory.organization_id})
response = get(url, user=admin_user, expect=200)
by_username = {}
for entry in response.data['results']:
by_username[entry['username']] = entry
assert 'u2' in by_username
assert len(by_username['u2']['summary_fields']['direct_access']) == 1
assert len(by_username['u2']['summary_fields']['indirect_access']) == 0
access_entry = by_username['u2']['summary_fields']['direct_access'][0]
assert sorted(access_entry['descendant_roles']) == sorted(['inventory_admin_role', 'read_role'])
@pytest.mark.django_db
def test_team_indirect_access(get, team, admin_user, inventory):
u1 = User.objects.create(username='u1')
team.member_role.members.add(u1)
inventory.organization.inventory_admin_role.parents.add(team.member_role)
url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id})
response = get(url, user=admin_user, expect=200)
by_username = {}
for entry in response.data['results']:
by_username[entry['username']] = entry
assert 'u1' in by_username
assert len(by_username['u1']['summary_fields']['direct_access']) == 1
assert len(by_username['u1']['summary_fields']['indirect_access']) == 0
access_entry = by_username['u1']['summary_fields']['direct_access'][0]
assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role'])