From 0936b28f9b59a4b42bffb2e94125e2a25fb76314 Mon Sep 17 00:00:00 2001 From: jessicamack Date: Fri, 18 Jul 2025 17:09:44 +0200 Subject: [PATCH] Migrate nested team memberships to direct team memberships (#7005) * migrate team on team users add setting to prevent team on team cases. remove tests that should fail now * adjust tests for disallowing team on teams * use RoleUserAssignment to retrieve users * assign users with RoleUserAssignment instead * fix broken test * move methods out to utils file. add tests * add missed positional arg * test old rbac system also consolidates * fix test --- .../migrations/0203_remove_team_of_teams.py | 18 +++ awx/main/migrations/_dab_rbac.py | 113 ++++++++++++++ .../dab_rbac/test_consolidate_teams.py | 147 ++++++++++++++++++ .../functional/dab_rbac/test_dab_rbac_api.py | 10 +- .../dab_rbac/test_translation_layer.py | 14 -- .../functional/test_fixture_factories.py | 6 +- awx/settings/defaults.py | 1 + 7 files changed, 288 insertions(+), 21 deletions(-) create mode 100644 awx/main/migrations/0203_remove_team_of_teams.py create mode 100644 awx/main/tests/functional/dab_rbac/test_consolidate_teams.py diff --git a/awx/main/migrations/0203_remove_team_of_teams.py b/awx/main/migrations/0203_remove_team_of_teams.py new file mode 100644 index 0000000000..c5a3da0340 --- /dev/null +++ b/awx/main/migrations/0203_remove_team_of_teams.py @@ -0,0 +1,18 @@ +import logging + +from django.db import migrations + +from awx.main.migrations._dab_rbac import consolidate_indirect_user_roles + +logger = logging.getLogger('awx.main.migrations') + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0202_convert_controller_role_definitions'), + ] + + operations = [ + migrations.RunPython(consolidate_indirect_user_roles, migrations.RunPython.noop), + ] diff --git a/awx/main/migrations/_dab_rbac.py b/awx/main/migrations/_dab_rbac.py index 20a7e9633a..1d64ba85c9 100644 --- a/awx/main/migrations/_dab_rbac.py +++ b/awx/main/migrations/_dab_rbac.py @@ -1,5 +1,6 @@ import json import logging +from collections import defaultdict from django.apps import apps as global_apps from django.db.models import ForeignKey @@ -403,3 +404,115 @@ def setup_managed_role_definitions(apps, schema_editor): for role_definition in unexpected_role_definitions: logger.info(f'Deleting old managed role definition {role_definition.name}, pk={role_definition.pk}') role_definition.delete() + + +def get_team_to_team_relationships(apps, team_member_role): + """ + Find all team-to-team relationships where one team is a member of another. + Returns a dict mapping parent_team_id -> [child_team_id, ...] + """ + team_to_team_relationships = defaultdict(list) + + # Find all team assignments with the Team Member role + RoleTeamAssignment = apps.get_model('dab_rbac', 'RoleTeamAssignment') + team_assignments = RoleTeamAssignment.objects.filter(role_definition=team_member_role).select_related('team') + + for assignment in team_assignments: + parent_team_id = int(assignment.object_id) + child_team_id = assignment.team.id + team_to_team_relationships[parent_team_id].append(child_team_id) + + return team_to_team_relationships + + +def get_all_user_members_of_team(apps, team_member_role, team_id, team_to_team_map, visited=None): + """ + Recursively find all users who are members of a team, including through nested teams. + """ + if visited is None: + visited = set() + + if team_id in visited: + return set() # Avoid infinite recursion + + visited.add(team_id) + all_users = set() + + # Get direct user assignments to this team + RoleUserAssignment = apps.get_model('dab_rbac', 'RoleUserAssignment') + user_assignments = RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_id).select_related('user') + + for assignment in user_assignments: + all_users.add(assignment.user) + + # Get team-to-team assignments and recursively find their users + child_team_ids = team_to_team_map.get(team_id, []) + for child_team_id in child_team_ids: + nested_users = get_all_user_members_of_team(apps, team_member_role, child_team_id, team_to_team_map, visited.copy()) + all_users.update(nested_users) + + return all_users + + +def remove_team_to_team_assignment(apps, team_member_role, parent_team_id, child_team_id): + """ + Remove team-to-team memberships. + """ + Team = apps.get_model('main', 'Team') + RoleTeamAssignment = apps.get_model('dab_rbac', 'RoleTeamAssignment') + + parent_team = Team.objects.get(id=parent_team_id) + child_team = Team.objects.get(id=child_team_id) + + # Remove all team-to-team RoleTeamAssignments + RoleTeamAssignment.objects.filter(role_definition=team_member_role, object_id=parent_team_id, team=child_team).delete() + + # Check mirroring Team model for children under member_role + parent_team.member_role.children.filter(object_id=child_team_id).delete() + + +def consolidate_indirect_user_roles(apps, schema_editor): + """ + A user should have a member role for every team they were indirectly + a member of. ex. Team A is a member of Team B. All users in Team A + previously were only members of Team A. They should now be members of + Team A and Team B. + """ + + # get models for membership on teams + RoleDefinition = apps.get_model('dab_rbac', 'RoleDefinition') + Team = apps.get_model('main', 'Team') + + team_member_role = RoleDefinition.objects.get(name='Team Member') + + team_to_team_map = get_team_to_team_relationships(apps, team_member_role) + + if not team_to_team_map: + return # No team-to-team relationships to consolidate + + # Get content type for Team - needed for give_permissions + try: + from django.contrib.contenttypes.models import ContentType + + team_content_type = ContentType.objects.get_for_model(Team) + except ImportError: + # Fallback if ContentType is not available + ContentType = apps.get_model('contenttypes', 'ContentType') + team_content_type = ContentType.objects.get_for_model(Team) + + # Get all users who should be direct members of a team + for parent_team_id, child_team_ids in team_to_team_map.items(): + all_users = get_all_user_members_of_team(apps, team_member_role, parent_team_id, team_to_team_map) + + # Create direct RoleUserAssignments for all users + if all_users: + give_permissions(apps=apps, rd=team_member_role, users=list(all_users), object_id=parent_team_id, content_type_id=team_content_type.id) + + # Mirror assignments to Team model + parent_team = Team.objects.get(id=parent_team_id) + for user in all_users: + parent_team.member_role.members.add(user.id) + + # Remove all team-to-team assignments for parent team + for child_team_id in child_team_ids: + remove_team_to_team_assignment(apps, team_member_role, parent_team_id, child_team_id) diff --git a/awx/main/tests/functional/dab_rbac/test_consolidate_teams.py b/awx/main/tests/functional/dab_rbac/test_consolidate_teams.py new file mode 100644 index 0000000000..1e42059e56 --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/test_consolidate_teams.py @@ -0,0 +1,147 @@ +import pytest + +from django.contrib.contenttypes.models import ContentType +from django.test import override_settings +from django.apps import apps + +from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment, RoleTeamAssignment +from ansible_base.rbac.migrations._utils import give_permissions + +from awx.main.models import User, Team +from awx.main.migrations._dab_rbac import consolidate_indirect_user_roles + + +@pytest.mark.django_db +@override_settings(ANSIBLE_BASE_ALLOW_TEAM_PARENTS=True) +def test_consolidate_indirect_user_roles_with_nested_teams(setup_managed_roles, organization): + """ + Test the consolidate_indirect_user_roles function with a nested team hierarchy. + Setup: + - Users: A, B, C, D + - Teams: E, F, G + - Direct assignments: A→(E,F,G), B→E, C→F, D→G + - Team hierarchy: F→E (F is member of E), G→F (G is member of F) + Expected result after consolidation: + - Team E should have users: A, B, C, D (A directly, B directly, C through F, D through G→F) + - Team F should have users: A, C, D (A directly, C directly, D through G) + - Team G should have users: A, D (A directly, D directly) + """ + user_a = User.objects.create_user(username='user_a') + user_b = User.objects.create_user(username='user_b') + user_c = User.objects.create_user(username='user_c') + user_d = User.objects.create_user(username='user_d') + + team_e = Team.objects.create(name='Team E', organization=organization) + team_f = Team.objects.create(name='Team F', organization=organization) + team_g = Team.objects.create(name='Team G', organization=organization) + + # Get role definition and content type for give_permissions + team_member_role = RoleDefinition.objects.get(name='Team Member') + team_content_type = ContentType.objects.get_for_model(Team) + + # Assign users to teams + give_permissions(apps=apps, rd=team_member_role, users=[user_a], object_id=team_e.id, content_type_id=team_content_type.id) + give_permissions(apps=apps, rd=team_member_role, users=[user_a], object_id=team_f.id, content_type_id=team_content_type.id) + give_permissions(apps=apps, rd=team_member_role, users=[user_a], object_id=team_g.id, content_type_id=team_content_type.id) + give_permissions(apps=apps, rd=team_member_role, users=[user_b], object_id=team_e.id, content_type_id=team_content_type.id) + give_permissions(apps=apps, rd=team_member_role, users=[user_c], object_id=team_f.id, content_type_id=team_content_type.id) + give_permissions(apps=apps, rd=team_member_role, users=[user_d], object_id=team_g.id, content_type_id=team_content_type.id) + + # Mirror user assignments in the old RBAC system because signals don't run in tests + team_e.member_role.members.add(user_a.id, user_b.id) + team_f.member_role.members.add(user_a.id, user_c.id) + team_g.member_role.members.add(user_a.id, user_d.id) + + # Setup team-to-team relationships + give_permissions(apps=apps, rd=team_member_role, teams=[team_f], object_id=team_e.id, content_type_id=team_content_type.id) + give_permissions(apps=apps, rd=team_member_role, teams=[team_g], object_id=team_f.id, content_type_id=team_content_type.id) + + # Verify initial direct assignments + team_e_users_before = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_e.id).values_list('user_id', flat=True)) + assert team_e_users_before == {user_a.id, user_b.id} + team_f_users_before = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_f.id).values_list('user_id', flat=True)) + assert team_f_users_before == {user_a.id, user_c.id} + team_g_users_before = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_g.id).values_list('user_id', flat=True)) + assert team_g_users_before == {user_a.id, user_d.id} + + # Verify team-to-team relationships exist + assert RoleTeamAssignment.objects.filter(role_definition=team_member_role, team=team_f, object_id=team_e.id).exists() + assert RoleTeamAssignment.objects.filter(role_definition=team_member_role, team=team_g, object_id=team_f.id).exists() + + # Run the consolidation function + consolidate_indirect_user_roles(apps, None) + + # Verify consolidation + team_e_users_after = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_e.id).values_list('user_id', flat=True)) + assert team_e_users_after == {user_a.id, user_b.id, user_c.id, user_d.id}, f"Team E should have users A, B, C, D but has {team_e_users_after}" + team_f_users_after = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_f.id).values_list('user_id', flat=True)) + assert team_f_users_after == {user_a.id, user_c.id, user_d.id}, f"Team F should have users A, C, D but has {team_f_users_after}" + team_g_users_after = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_g.id).values_list('user_id', flat=True)) + assert team_g_users_after == {user_a.id, user_d.id}, f"Team G should have users A, D but has {team_g_users_after}" + + # Verify team member changes are mirrored to the old RBAC system + assert team_e_users_after == set(team_e.member_role.members.all().values_list('id', flat=True)) + assert team_f_users_after == set(team_f.member_role.members.all().values_list('id', flat=True)) + assert team_g_users_after == set(team_g.member_role.members.all().values_list('id', flat=True)) + + # Verify team-to-team relationships are removed after consolidation + assert not RoleTeamAssignment.objects.filter( + role_definition=team_member_role, team=team_f, object_id=team_e.id + ).exists(), "Team-to-team relationship F→E should be removed" + assert not RoleTeamAssignment.objects.filter( + role_definition=team_member_role, team=team_g, object_id=team_f.id + ).exists(), "Team-to-team relationship G→F should be removed" + + +@pytest.mark.django_db +@override_settings(ANSIBLE_BASE_ALLOW_TEAM_PARENTS=True) +def test_consolidate_indirect_user_roles_no_team_relationships(setup_managed_roles, organization): + """ + Test that the function handles the case where there are no team-to-team relationships. + It should return early without making any changes. + """ + # Create a user and team with direct assignment + user = User.objects.create_user(username='test_user') + team = Team.objects.create(name='Test Team', organization=organization) + + team_member_role = RoleDefinition.objects.get(name='Team Member') + team_content_type = ContentType.objects.get_for_model(Team) + give_permissions(apps=apps, rd=team_member_role, users=[user], object_id=team.id, content_type_id=team_content_type.id) + + # Compare count of assignments before and after consolidation + assignments_before = RoleUserAssignment.objects.filter(role_definition=team_member_role).count() + consolidate_indirect_user_roles(apps, None) + assignments_after = RoleUserAssignment.objects.filter(role_definition=team_member_role).count() + + assert assignments_before == assignments_after, "Number of assignments should not change when there are no team-to-team relationships" + + +@pytest.mark.django_db +@override_settings(ANSIBLE_BASE_ALLOW_TEAM_PARENTS=True) +def test_consolidate_indirect_user_roles_circular_reference(setup_managed_roles, organization): + """ + Test that the function handles circular team references without infinite recursion. + """ + team_a = Team.objects.create(name='Team A', organization=organization) + team_b = Team.objects.create(name='Team B', organization=organization) + + # Create a user assigned to team A + user = User.objects.create_user(username='test_user') + + team_member_role = RoleDefinition.objects.get(name='Team Member') + team_content_type = ContentType.objects.get_for_model(Team) + give_permissions(apps=apps, rd=team_member_role, users=[user], object_id=team_a.id, content_type_id=team_content_type.id) + + # Create circular team relationships: A → B → A + give_permissions(apps=apps, rd=team_member_role, teams=[team_b], object_id=team_a.id, content_type_id=team_content_type.id) + give_permissions(apps=apps, rd=team_member_role, teams=[team_a], object_id=team_b.id, content_type_id=team_content_type.id) + + # Run the consolidation function - should not raise an exception + consolidate_indirect_user_roles(apps, None) + + # Both teams should have the user assigned + team_a_users = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_a.id).values_list('user_id', flat=True)) + team_b_users = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_b.id).values_list('user_id', flat=True)) + + assert user.id in team_a_users, "User should be assigned to team A" + assert user.id in team_b_users, "User should be assigned to team B" diff --git a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py index 84e13c9895..793eeccbf4 100644 --- a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py +++ b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py @@ -167,11 +167,12 @@ def test_adding_user_to_org_member_role(setup_managed_roles, organization, admin @pytest.mark.parametrize('role_name', ['Organization Admin', 'Organization Member', 'Team Admin', 'Team Member']) def test_adding_actor_to_platform_roles(setup_managed_roles, role_name, actor, organization, team, admin, bob, post): ''' - Allow user or team to be added to platform-level roles + Allow user to be added to platform-level roles Exceptions: - Team cannot be added to Organization Member or Admin role + - Team cannot be added to Team Admin or Team Member role ''' - if actor == 'team' and 'Organization' in role_name: + if actor == 'team': expect = 400 else: expect = 201 @@ -184,4 +185,7 @@ def test_adding_actor_to_platform_roles(setup_managed_roles, role_name, actor, o data[actor] = actor_id r = post(url, data=data, user=admin, expect=expect) if expect == 400: - assert 'Assigning organization member permission to teams is not allowed' in str(r.data) + if 'Organization' in role_name: + assert 'Assigning organization member permission to teams is not allowed' in str(r.data) + if 'Team' in role_name: + assert 'Assigning team permissions to other teams is not allowed' in str(r.data) diff --git a/awx/main/tests/functional/dab_rbac/test_translation_layer.py b/awx/main/tests/functional/dab_rbac/test_translation_layer.py index 5134b8ce3e..7972890211 100644 --- a/awx/main/tests/functional/dab_rbac/test_translation_layer.py +++ b/awx/main/tests/functional/dab_rbac/test_translation_layer.py @@ -175,20 +175,6 @@ def test_creator_permission(rando, admin_user, inventory, setup_managed_roles): assert rando in inventory.admin_role.members.all() -@pytest.mark.django_db -def test_team_team_read_role(rando, team, admin_user, post, setup_managed_roles): - orgs = [Organization.objects.create(name=f'foo-{i}') for i in range(2)] - teams = [Team.objects.create(name=f'foo-{i}', organization=orgs[i]) for i in range(2)] - teams[1].member_role.members.add(rando) - - # give second team read permission to first team through the API for regression testing - url = reverse('api:role_teams_list', kwargs={'pk': teams[0].read_role.pk, 'version': 'v2'}) - post(url, {'id': teams[1].id}, user=admin_user) - - # user should be able to view the first team - assert rando in teams[0].read_role - - @pytest.mark.django_db def test_implicit_parents_no_assignments(organization): """Through the normal course of creating models, we should not be changing DAB RBAC permissions""" diff --git a/awx/main/tests/functional/test_fixture_factories.py b/awx/main/tests/functional/test_fixture_factories.py index 5792197177..30fa1247ae 100644 --- a/awx/main/tests/functional/test_fixture_factories.py +++ b/awx/main/tests/functional/test_fixture_factories.py @@ -50,13 +50,11 @@ def test_org_factory_roles(organization_factory): teams=['team1', 'team2'], users=['team1:foo', 'bar'], projects=['baz', 'bang'], - roles=['team2.member_role:foo', 'team1.admin_role:bar', 'team1.member_role:team2.admin_role', 'baz.admin_role:foo'], + roles=['team2.member_role:foo', 'team1.admin_role:bar', 'baz.admin_role:foo'], ) - - assert objects.users.bar in objects.teams.team2.admin_role + assert objects.users.bar in objects.teams.team1.admin_role assert objects.users.foo in objects.projects.baz.admin_role assert objects.users.foo in objects.teams.team1.member_role - assert objects.teams.team2.admin_role in objects.teams.team1.member_role.children.all() @pytest.mark.django_db diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 89f12f07dd..81a6aba5a5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1213,6 +1213,7 @@ ANSIBLE_BASE_CACHE_PARENT_PERMISSIONS = True # Currently features are enabled to keep compatibility with old system, except custom roles ANSIBLE_BASE_ALLOW_TEAM_ORG_ADMIN = False # ANSIBLE_BASE_ALLOW_CUSTOM_ROLES = True +ANSIBLE_BASE_ALLOW_TEAM_PARENTS = False ANSIBLE_BASE_ALLOW_CUSTOM_TEAM_ROLES = False ANSIBLE_BASE_ALLOW_SINGLETON_USER_ROLES = True ANSIBLE_BASE_ALLOW_SINGLETON_TEAM_ROLES = False # System auditor has always been restricted to users