From 389a729b759100862ff0450ac0fb0f6a45979651 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 2 Apr 2024 15:07:39 -0400 Subject: [PATCH] [RBAC] Fix known issues with backward compatible access_list (#15052) * Remove duplicate access_list entries for direct team access * Revert test changes for superuser in access_list --- awx/api/generics.py | 8 +- awx/api/serializers.py | 63 ++++++++-- .../api/test_resource_access_lists.py | 6 +- .../functional/dab_rbac/test_access_list.py | 111 ++++++++++++++++++ 4 files changed, 175 insertions(+), 13 deletions(-) create mode 100644 awx/main/tests/functional/dab_rbac/test_access_list.py diff --git a/awx/api/generics.py b/awx/api/generics.py index a0c609db26..bc7ae2138d 100644 --- a/awx/api/generics.py +++ b/awx/api/generics.py @@ -33,7 +33,7 @@ from rest_framework.negotiation import DefaultContentNegotiation # django-ansible-base from ansible_base.rest_filters.rest_framework.field_lookup_backend import FieldLookupBackend from ansible_base.lib.utils.models import get_all_field_names -from ansible_base.rbac.models import RoleEvaluation +from ansible_base.rbac.models import RoleEvaluation, RoleDefinition from ansible_base.rbac.permission_registry import permission_registry # AWX @@ -810,7 +810,11 @@ class ResourceAccessList(ParentMixin, ListAPIView): if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED: ancestors = set(RoleEvaluation.objects.filter(content_type_id=content_type.id, object_id=obj.id).values_list('role_id', flat=True)) - return (User.objects.filter(has_roles__in=ancestors) | User.objects.filter(is_superuser=True)).distinct() + qs = User.objects.filter(has_roles__in=ancestors) | User.objects.filter(is_superuser=True) + auditor_role = RoleDefinition.objects.filter(name="System Auditor").first() + if auditor_role: + qs |= User.objects.filter(roleuserassignment__role_definition=auditor_role) + return qs.distinct() roles = set(Role.objects.filter(content_type=content_type, object_id=obj.id)) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index c0577324c2..75844e9d84 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -21,7 +21,7 @@ from jinja2.exceptions import TemplateSyntaxError, UndefinedError, SecurityError # Django from django.conf import settings from django.contrib.auth import update_session_auth_hash -from django.contrib.auth.models import User, Permission +from django.contrib.auth.models import User from django.contrib.auth.password_validation import validate_password as django_validate_password from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ObjectDoesNotExist, ValidationError as DjangoValidationError @@ -45,7 +45,8 @@ from polymorphic.models import PolymorphicModel # django-ansible-base from ansible_base.lib.utils.models import get_type_for_model -from ansible_base.rbac.models import RoleEvaluation +from ansible_base.rbac.models import RoleEvaluation, ObjectRole +from ansible_base.rbac import permission_registry # AWX from awx.main.access import get_user_capabilities @@ -2780,7 +2781,10 @@ class ResourceAccessListElementSerializer(UserSerializer): if action in reversed_role_map: role_names.add(reversed_role_map[action]) elif codename in reversed_org_map: - role_names.add(codename) + if isinstance(obj, Organization): + role_names.add(reversed_org_map[codename]) + if 'view_organization' not in role_names: + role_names.add('read_role') return list(role_names) def format_role_perm(role): @@ -2799,10 +2803,10 @@ class ResourceAccessListElementSerializer(UserSerializer): # Singleton roles should not be managed from this view, as per copy/edit rework spec role_dict['user_capabilities'] = {'unattach': False} - if role.singleton_name: - descendant_perms = list(Permission.objects.filter(content_type=content_type).values_list('codename', flat=True)) + model_name = content_type.model + if isinstance(obj, Organization): + descendant_perms = [codename for codename in get_role_codenames(role) if codename.endswith(model_name) or codename.startswith('add_')] else: - model_name = content_type.model descendant_perms = [codename for codename in get_role_codenames(role) if codename.endswith(model_name)] return {'role': role_dict, 'descendant_roles': get_roles_from_perms(descendant_perms)} @@ -2852,7 +2856,7 @@ class ResourceAccessListElementSerializer(UserSerializer): new_roles_seen = set() all_team_roles = set() all_permissive_role_ids = set() - for evaluation in RoleEvaluation.objects.filter(role__users=user, **gfk_kwargs).prefetch_related('role'): + for evaluation in RoleEvaluation.objects.filter(role__in=user.has_roles.all(), **gfk_kwargs).prefetch_related('role'): new_role = evaluation.role if new_role.id in new_roles_seen: continue @@ -2867,9 +2871,48 @@ class ResourceAccessListElementSerializer(UserSerializer): else: ret['summary_fields']['indirect_access'].append(format_role_perm(old_role)) - ret['summary_fields']['direct_access'].extend( - [y for x in (format_team_role_perm(r, direct_permissive_role_ids) for r in all_team_roles) for y in x] - ) + # Lazy role creation gives us a big problem, where some intermediate roles are not easy to find + # like when a team has indirect permission, so here we get all roles the users teams have + # these contribute to all potential permission-granting roles of the object + user_teams_qs = permission_registry.team_model.objects.filter(member_roles__in=ObjectRole.objects.filter(users=user)) + team_obj_roles = ObjectRole.objects.filter(teams__in=user_teams_qs) + for evaluation in RoleEvaluation.objects.filter(role__in=team_obj_roles, **gfk_kwargs).prefetch_related('role'): + new_role = evaluation.role + if new_role.id in new_roles_seen: + continue + new_roles_seen.add(new_role.id) + old_role = get_role_from_object_role(new_role) + all_permissive_role_ids.add(old_role.id) + + # In DAB RBAC, superuser is strictly a user flag, and global roles are not in the RoleEvaluation table + if user.is_superuser: + ret['summary_fields'].setdefault('indirect_access', []) + all_role_names = [field.name for field in obj._meta.get_fields() if isinstance(field, ImplicitRoleField)] + ret['summary_fields']['indirect_access'].append( + { + "role": { + "id": None, + "name": _("System Administrator"), + "description": _("Can manage all aspects of the system"), + "user_capabilities": {"unattach": False}, + }, + "descendant_roles": all_role_names, + } + ) + elif user.is_system_auditor: + ret['summary_fields'].setdefault('indirect_access', []) + ret['summary_fields']['indirect_access'].append( + { + "role": { + "id": None, + "name": _("System Auditor"), + "description": _("Can view all aspects of the system"), + "user_capabilities": {"unattach": False}, + }, + "descendant_roles": ["read_role"], + } + ) + ret['summary_fields']['direct_access'].extend([y for x in (format_team_role_perm(r, all_permissive_role_ids) for r in all_team_roles) for y in x]) return ret diff --git a/awx/main/tests/functional/api/test_resource_access_lists.py b/awx/main/tests/functional/api/test_resource_access_lists.py index 3b524f50f2..71d107dbda 100644 --- a/awx/main/tests/functional/api/test_resource_access_lists.py +++ b/awx/main/tests/functional/api/test_resource_access_lists.py @@ -1,6 +1,7 @@ import pytest from awx.api.versioning import reverse +from awx.main.models import Role @pytest.mark.django_db @@ -38,7 +39,7 @@ def test_indirect_access_list(get, organization, project, team_factory, user, ad assert len(team_admin_res['summary_fields']['direct_access']) == 1 assert len(team_admin_res['summary_fields']['indirect_access']) == 0 assert len(admin_res['summary_fields']['direct_access']) == 0 - assert len(admin_res['summary_fields']['indirect_access']) == 0 # decreased to 0 because system admin role no longer exists + assert len(admin_res['summary_fields']['indirect_access']) == 1 project_admin_entry = project_admin_res['summary_fields']['direct_access'][0]['role'] assert project_admin_entry['id'] == project.admin_role.id @@ -51,3 +52,6 @@ def test_indirect_access_list(get, organization, project, team_factory, user, ad assert project_admin_team_member_entry['id'] == project.admin_role.id assert project_admin_team_member_entry['team_id'] == project_admin_team.id assert project_admin_team_member_entry['team_name'] == project_admin_team.name + + admin_entry = admin_res['summary_fields']['indirect_access'][0]['role'] + assert admin_entry['name'] == Role.singleton('system_administrator').name diff --git a/awx/main/tests/functional/dab_rbac/test_access_list.py b/awx/main/tests/functional/dab_rbac/test_access_list.py new file mode 100644 index 0000000000..2a88b5b18f --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/test_access_list.py @@ -0,0 +1,111 @@ +import pytest + +from awx.main.models import User +from awx.api.versioning import reverse + + +@pytest.mark.django_db +def test_access_list_superuser(get, admin_user, inventory): + url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id}) + + response = get(url, user=admin_user, expect=200) + by_username = {} + for entry in response.data['results']: + by_username[entry['username']] = entry + assert 'admin' in by_username + + assert len(by_username['admin']['summary_fields']['indirect_access']) == 1 + assert len(by_username['admin']['summary_fields']['direct_access']) == 0 + access_entry = by_username['admin']['summary_fields']['indirect_access'][0] + assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role']) + + +@pytest.mark.django_db +def test_access_list_system_auditor(get, admin_user, inventory): + sys_auditor = User.objects.create(username='sys-aud') + sys_auditor.is_system_auditor = True + assert sys_auditor.is_system_auditor + url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id}) + + response = get(url, user=admin_user, expect=200) + by_username = {} + for entry in response.data['results']: + by_username[entry['username']] = entry + assert 'sys-aud' in by_username + + assert len(by_username['sys-aud']['summary_fields']['indirect_access']) == 1 + assert len(by_username['sys-aud']['summary_fields']['direct_access']) == 0 + access_entry = by_username['sys-aud']['summary_fields']['indirect_access'][0] + assert access_entry['descendant_roles'] == ['read_role'] + + +@pytest.mark.django_db +def test_access_list_direct_access(get, admin_user, inventory): + u1 = User.objects.create(username='u1') + + inventory.admin_role.members.add(u1) + + url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id}) + response = get(url, user=admin_user, expect=200) + by_username = {} + for entry in response.data['results']: + by_username[entry['username']] = entry + assert 'u1' in by_username + + assert len(by_username['u1']['summary_fields']['direct_access']) == 1 + assert len(by_username['u1']['summary_fields']['indirect_access']) == 0 + access_entry = by_username['u1']['summary_fields']['direct_access'][0] + assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role']) + + +@pytest.mark.django_db +def test_access_list_organization_access(get, admin_user, inventory): + u2 = User.objects.create(username='u2') + + inventory.organization.inventory_admin_role.members.add(u2) + + # User has indirect access to the inventory + url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id}) + response = get(url, user=admin_user, expect=200) + by_username = {} + for entry in response.data['results']: + by_username[entry['username']] = entry + assert 'u2' in by_username + + assert len(by_username['u2']['summary_fields']['indirect_access']) == 1 + assert len(by_username['u2']['summary_fields']['direct_access']) == 0 + access_entry = by_username['u2']['summary_fields']['indirect_access'][0] + assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role']) + + # Test that user shows up in the organization access list with direct access of expected roles + url = reverse('api:organization_access_list', kwargs={'pk': inventory.organization_id}) + response = get(url, user=admin_user, expect=200) + by_username = {} + for entry in response.data['results']: + by_username[entry['username']] = entry + assert 'u2' in by_username + + assert len(by_username['u2']['summary_fields']['direct_access']) == 1 + assert len(by_username['u2']['summary_fields']['indirect_access']) == 0 + access_entry = by_username['u2']['summary_fields']['direct_access'][0] + assert sorted(access_entry['descendant_roles']) == sorted(['inventory_admin_role', 'read_role']) + + +@pytest.mark.django_db +def test_team_indirect_access(get, team, admin_user, inventory): + u1 = User.objects.create(username='u1') + team.member_role.members.add(u1) + + inventory.organization.inventory_admin_role.parents.add(team.member_role) + + url = reverse('api:inventory_access_list', kwargs={'pk': inventory.id}) + response = get(url, user=admin_user, expect=200) + by_username = {} + for entry in response.data['results']: + by_username[entry['username']] = entry + assert 'u1' in by_username + + assert len(by_username['u1']['summary_fields']['direct_access']) == 1 + assert len(by_username['u1']['summary_fields']['indirect_access']) == 0 + access_entry = by_username['u1']['summary_fields']['direct_access'][0] + assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role'])