mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 10:00:01 -03:30
Migrate nested team memberships to direct team memberships (#7005)
* migrate team on team users add setting to prevent team on team cases. remove tests that should fail now * adjust tests for disallowing team on teams * use RoleUserAssignment to retrieve users * assign users with RoleUserAssignment instead * fix broken test * move methods out to utils file. add tests * add missed positional arg * test old rbac system also consolidates * fix test
This commit is contained in:
parent
8e58fee49c
commit
0936b28f9b
18
awx/main/migrations/0203_remove_team_of_teams.py
Normal file
18
awx/main/migrations/0203_remove_team_of_teams.py
Normal file
@ -0,0 +1,18 @@
|
||||
import logging
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
from awx.main.migrations._dab_rbac import consolidate_indirect_user_roles
|
||||
|
||||
logger = logging.getLogger('awx.main.migrations')
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0202_convert_controller_role_definitions'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(consolidate_indirect_user_roles, migrations.RunPython.noop),
|
||||
]
|
||||
@ -1,5 +1,6 @@
|
||||
import json
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
|
||||
from django.apps import apps as global_apps
|
||||
from django.db.models import ForeignKey
|
||||
@ -403,3 +404,115 @@ def setup_managed_role_definitions(apps, schema_editor):
|
||||
for role_definition in unexpected_role_definitions:
|
||||
logger.info(f'Deleting old managed role definition {role_definition.name}, pk={role_definition.pk}')
|
||||
role_definition.delete()
|
||||
|
||||
|
||||
def get_team_to_team_relationships(apps, team_member_role):
|
||||
"""
|
||||
Find all team-to-team relationships where one team is a member of another.
|
||||
Returns a dict mapping parent_team_id -> [child_team_id, ...]
|
||||
"""
|
||||
team_to_team_relationships = defaultdict(list)
|
||||
|
||||
# Find all team assignments with the Team Member role
|
||||
RoleTeamAssignment = apps.get_model('dab_rbac', 'RoleTeamAssignment')
|
||||
team_assignments = RoleTeamAssignment.objects.filter(role_definition=team_member_role).select_related('team')
|
||||
|
||||
for assignment in team_assignments:
|
||||
parent_team_id = int(assignment.object_id)
|
||||
child_team_id = assignment.team.id
|
||||
team_to_team_relationships[parent_team_id].append(child_team_id)
|
||||
|
||||
return team_to_team_relationships
|
||||
|
||||
|
||||
def get_all_user_members_of_team(apps, team_member_role, team_id, team_to_team_map, visited=None):
|
||||
"""
|
||||
Recursively find all users who are members of a team, including through nested teams.
|
||||
"""
|
||||
if visited is None:
|
||||
visited = set()
|
||||
|
||||
if team_id in visited:
|
||||
return set() # Avoid infinite recursion
|
||||
|
||||
visited.add(team_id)
|
||||
all_users = set()
|
||||
|
||||
# Get direct user assignments to this team
|
||||
RoleUserAssignment = apps.get_model('dab_rbac', 'RoleUserAssignment')
|
||||
user_assignments = RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_id).select_related('user')
|
||||
|
||||
for assignment in user_assignments:
|
||||
all_users.add(assignment.user)
|
||||
|
||||
# Get team-to-team assignments and recursively find their users
|
||||
child_team_ids = team_to_team_map.get(team_id, [])
|
||||
for child_team_id in child_team_ids:
|
||||
nested_users = get_all_user_members_of_team(apps, team_member_role, child_team_id, team_to_team_map, visited.copy())
|
||||
all_users.update(nested_users)
|
||||
|
||||
return all_users
|
||||
|
||||
|
||||
def remove_team_to_team_assignment(apps, team_member_role, parent_team_id, child_team_id):
|
||||
"""
|
||||
Remove team-to-team memberships.
|
||||
"""
|
||||
Team = apps.get_model('main', 'Team')
|
||||
RoleTeamAssignment = apps.get_model('dab_rbac', 'RoleTeamAssignment')
|
||||
|
||||
parent_team = Team.objects.get(id=parent_team_id)
|
||||
child_team = Team.objects.get(id=child_team_id)
|
||||
|
||||
# Remove all team-to-team RoleTeamAssignments
|
||||
RoleTeamAssignment.objects.filter(role_definition=team_member_role, object_id=parent_team_id, team=child_team).delete()
|
||||
|
||||
# Check mirroring Team model for children under member_role
|
||||
parent_team.member_role.children.filter(object_id=child_team_id).delete()
|
||||
|
||||
|
||||
def consolidate_indirect_user_roles(apps, schema_editor):
|
||||
"""
|
||||
A user should have a member role for every team they were indirectly
|
||||
a member of. ex. Team A is a member of Team B. All users in Team A
|
||||
previously were only members of Team A. They should now be members of
|
||||
Team A and Team B.
|
||||
"""
|
||||
|
||||
# get models for membership on teams
|
||||
RoleDefinition = apps.get_model('dab_rbac', 'RoleDefinition')
|
||||
Team = apps.get_model('main', 'Team')
|
||||
|
||||
team_member_role = RoleDefinition.objects.get(name='Team Member')
|
||||
|
||||
team_to_team_map = get_team_to_team_relationships(apps, team_member_role)
|
||||
|
||||
if not team_to_team_map:
|
||||
return # No team-to-team relationships to consolidate
|
||||
|
||||
# Get content type for Team - needed for give_permissions
|
||||
try:
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
team_content_type = ContentType.objects.get_for_model(Team)
|
||||
except ImportError:
|
||||
# Fallback if ContentType is not available
|
||||
ContentType = apps.get_model('contenttypes', 'ContentType')
|
||||
team_content_type = ContentType.objects.get_for_model(Team)
|
||||
|
||||
# Get all users who should be direct members of a team
|
||||
for parent_team_id, child_team_ids in team_to_team_map.items():
|
||||
all_users = get_all_user_members_of_team(apps, team_member_role, parent_team_id, team_to_team_map)
|
||||
|
||||
# Create direct RoleUserAssignments for all users
|
||||
if all_users:
|
||||
give_permissions(apps=apps, rd=team_member_role, users=list(all_users), object_id=parent_team_id, content_type_id=team_content_type.id)
|
||||
|
||||
# Mirror assignments to Team model
|
||||
parent_team = Team.objects.get(id=parent_team_id)
|
||||
for user in all_users:
|
||||
parent_team.member_role.members.add(user.id)
|
||||
|
||||
# Remove all team-to-team assignments for parent team
|
||||
for child_team_id in child_team_ids:
|
||||
remove_team_to_team_assignment(apps, team_member_role, parent_team_id, child_team_id)
|
||||
|
||||
147
awx/main/tests/functional/dab_rbac/test_consolidate_teams.py
Normal file
147
awx/main/tests/functional/dab_rbac/test_consolidate_teams.py
Normal file
@ -0,0 +1,147 @@
|
||||
import pytest
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.test import override_settings
|
||||
from django.apps import apps
|
||||
|
||||
from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment, RoleTeamAssignment
|
||||
from ansible_base.rbac.migrations._utils import give_permissions
|
||||
|
||||
from awx.main.models import User, Team
|
||||
from awx.main.migrations._dab_rbac import consolidate_indirect_user_roles
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(ANSIBLE_BASE_ALLOW_TEAM_PARENTS=True)
|
||||
def test_consolidate_indirect_user_roles_with_nested_teams(setup_managed_roles, organization):
|
||||
"""
|
||||
Test the consolidate_indirect_user_roles function with a nested team hierarchy.
|
||||
Setup:
|
||||
- Users: A, B, C, D
|
||||
- Teams: E, F, G
|
||||
- Direct assignments: A→(E,F,G), B→E, C→F, D→G
|
||||
- Team hierarchy: F→E (F is member of E), G→F (G is member of F)
|
||||
Expected result after consolidation:
|
||||
- Team E should have users: A, B, C, D (A directly, B directly, C through F, D through G→F)
|
||||
- Team F should have users: A, C, D (A directly, C directly, D through G)
|
||||
- Team G should have users: A, D (A directly, D directly)
|
||||
"""
|
||||
user_a = User.objects.create_user(username='user_a')
|
||||
user_b = User.objects.create_user(username='user_b')
|
||||
user_c = User.objects.create_user(username='user_c')
|
||||
user_d = User.objects.create_user(username='user_d')
|
||||
|
||||
team_e = Team.objects.create(name='Team E', organization=organization)
|
||||
team_f = Team.objects.create(name='Team F', organization=organization)
|
||||
team_g = Team.objects.create(name='Team G', organization=organization)
|
||||
|
||||
# Get role definition and content type for give_permissions
|
||||
team_member_role = RoleDefinition.objects.get(name='Team Member')
|
||||
team_content_type = ContentType.objects.get_for_model(Team)
|
||||
|
||||
# Assign users to teams
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user_a], object_id=team_e.id, content_type_id=team_content_type.id)
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user_a], object_id=team_f.id, content_type_id=team_content_type.id)
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user_a], object_id=team_g.id, content_type_id=team_content_type.id)
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user_b], object_id=team_e.id, content_type_id=team_content_type.id)
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user_c], object_id=team_f.id, content_type_id=team_content_type.id)
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user_d], object_id=team_g.id, content_type_id=team_content_type.id)
|
||||
|
||||
# Mirror user assignments in the old RBAC system because signals don't run in tests
|
||||
team_e.member_role.members.add(user_a.id, user_b.id)
|
||||
team_f.member_role.members.add(user_a.id, user_c.id)
|
||||
team_g.member_role.members.add(user_a.id, user_d.id)
|
||||
|
||||
# Setup team-to-team relationships
|
||||
give_permissions(apps=apps, rd=team_member_role, teams=[team_f], object_id=team_e.id, content_type_id=team_content_type.id)
|
||||
give_permissions(apps=apps, rd=team_member_role, teams=[team_g], object_id=team_f.id, content_type_id=team_content_type.id)
|
||||
|
||||
# Verify initial direct assignments
|
||||
team_e_users_before = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_e.id).values_list('user_id', flat=True))
|
||||
assert team_e_users_before == {user_a.id, user_b.id}
|
||||
team_f_users_before = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_f.id).values_list('user_id', flat=True))
|
||||
assert team_f_users_before == {user_a.id, user_c.id}
|
||||
team_g_users_before = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_g.id).values_list('user_id', flat=True))
|
||||
assert team_g_users_before == {user_a.id, user_d.id}
|
||||
|
||||
# Verify team-to-team relationships exist
|
||||
assert RoleTeamAssignment.objects.filter(role_definition=team_member_role, team=team_f, object_id=team_e.id).exists()
|
||||
assert RoleTeamAssignment.objects.filter(role_definition=team_member_role, team=team_g, object_id=team_f.id).exists()
|
||||
|
||||
# Run the consolidation function
|
||||
consolidate_indirect_user_roles(apps, None)
|
||||
|
||||
# Verify consolidation
|
||||
team_e_users_after = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_e.id).values_list('user_id', flat=True))
|
||||
assert team_e_users_after == {user_a.id, user_b.id, user_c.id, user_d.id}, f"Team E should have users A, B, C, D but has {team_e_users_after}"
|
||||
team_f_users_after = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_f.id).values_list('user_id', flat=True))
|
||||
assert team_f_users_after == {user_a.id, user_c.id, user_d.id}, f"Team F should have users A, C, D but has {team_f_users_after}"
|
||||
team_g_users_after = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_g.id).values_list('user_id', flat=True))
|
||||
assert team_g_users_after == {user_a.id, user_d.id}, f"Team G should have users A, D but has {team_g_users_after}"
|
||||
|
||||
# Verify team member changes are mirrored to the old RBAC system
|
||||
assert team_e_users_after == set(team_e.member_role.members.all().values_list('id', flat=True))
|
||||
assert team_f_users_after == set(team_f.member_role.members.all().values_list('id', flat=True))
|
||||
assert team_g_users_after == set(team_g.member_role.members.all().values_list('id', flat=True))
|
||||
|
||||
# Verify team-to-team relationships are removed after consolidation
|
||||
assert not RoleTeamAssignment.objects.filter(
|
||||
role_definition=team_member_role, team=team_f, object_id=team_e.id
|
||||
).exists(), "Team-to-team relationship F→E should be removed"
|
||||
assert not RoleTeamAssignment.objects.filter(
|
||||
role_definition=team_member_role, team=team_g, object_id=team_f.id
|
||||
).exists(), "Team-to-team relationship G→F should be removed"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(ANSIBLE_BASE_ALLOW_TEAM_PARENTS=True)
|
||||
def test_consolidate_indirect_user_roles_no_team_relationships(setup_managed_roles, organization):
|
||||
"""
|
||||
Test that the function handles the case where there are no team-to-team relationships.
|
||||
It should return early without making any changes.
|
||||
"""
|
||||
# Create a user and team with direct assignment
|
||||
user = User.objects.create_user(username='test_user')
|
||||
team = Team.objects.create(name='Test Team', organization=organization)
|
||||
|
||||
team_member_role = RoleDefinition.objects.get(name='Team Member')
|
||||
team_content_type = ContentType.objects.get_for_model(Team)
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user], object_id=team.id, content_type_id=team_content_type.id)
|
||||
|
||||
# Compare count of assignments before and after consolidation
|
||||
assignments_before = RoleUserAssignment.objects.filter(role_definition=team_member_role).count()
|
||||
consolidate_indirect_user_roles(apps, None)
|
||||
assignments_after = RoleUserAssignment.objects.filter(role_definition=team_member_role).count()
|
||||
|
||||
assert assignments_before == assignments_after, "Number of assignments should not change when there are no team-to-team relationships"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(ANSIBLE_BASE_ALLOW_TEAM_PARENTS=True)
|
||||
def test_consolidate_indirect_user_roles_circular_reference(setup_managed_roles, organization):
|
||||
"""
|
||||
Test that the function handles circular team references without infinite recursion.
|
||||
"""
|
||||
team_a = Team.objects.create(name='Team A', organization=organization)
|
||||
team_b = Team.objects.create(name='Team B', organization=organization)
|
||||
|
||||
# Create a user assigned to team A
|
||||
user = User.objects.create_user(username='test_user')
|
||||
|
||||
team_member_role = RoleDefinition.objects.get(name='Team Member')
|
||||
team_content_type = ContentType.objects.get_for_model(Team)
|
||||
give_permissions(apps=apps, rd=team_member_role, users=[user], object_id=team_a.id, content_type_id=team_content_type.id)
|
||||
|
||||
# Create circular team relationships: A → B → A
|
||||
give_permissions(apps=apps, rd=team_member_role, teams=[team_b], object_id=team_a.id, content_type_id=team_content_type.id)
|
||||
give_permissions(apps=apps, rd=team_member_role, teams=[team_a], object_id=team_b.id, content_type_id=team_content_type.id)
|
||||
|
||||
# Run the consolidation function - should not raise an exception
|
||||
consolidate_indirect_user_roles(apps, None)
|
||||
|
||||
# Both teams should have the user assigned
|
||||
team_a_users = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_a.id).values_list('user_id', flat=True))
|
||||
team_b_users = set(RoleUserAssignment.objects.filter(role_definition=team_member_role, object_id=team_b.id).values_list('user_id', flat=True))
|
||||
|
||||
assert user.id in team_a_users, "User should be assigned to team A"
|
||||
assert user.id in team_b_users, "User should be assigned to team B"
|
||||
@ -167,11 +167,12 @@ def test_adding_user_to_org_member_role(setup_managed_roles, organization, admin
|
||||
@pytest.mark.parametrize('role_name', ['Organization Admin', 'Organization Member', 'Team Admin', 'Team Member'])
|
||||
def test_adding_actor_to_platform_roles(setup_managed_roles, role_name, actor, organization, team, admin, bob, post):
|
||||
'''
|
||||
Allow user or team to be added to platform-level roles
|
||||
Allow user to be added to platform-level roles
|
||||
Exceptions:
|
||||
- Team cannot be added to Organization Member or Admin role
|
||||
- Team cannot be added to Team Admin or Team Member role
|
||||
'''
|
||||
if actor == 'team' and 'Organization' in role_name:
|
||||
if actor == 'team':
|
||||
expect = 400
|
||||
else:
|
||||
expect = 201
|
||||
@ -184,4 +185,7 @@ def test_adding_actor_to_platform_roles(setup_managed_roles, role_name, actor, o
|
||||
data[actor] = actor_id
|
||||
r = post(url, data=data, user=admin, expect=expect)
|
||||
if expect == 400:
|
||||
assert 'Assigning organization member permission to teams is not allowed' in str(r.data)
|
||||
if 'Organization' in role_name:
|
||||
assert 'Assigning organization member permission to teams is not allowed' in str(r.data)
|
||||
if 'Team' in role_name:
|
||||
assert 'Assigning team permissions to other teams is not allowed' in str(r.data)
|
||||
|
||||
@ -175,20 +175,6 @@ def test_creator_permission(rando, admin_user, inventory, setup_managed_roles):
|
||||
assert rando in inventory.admin_role.members.all()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_team_team_read_role(rando, team, admin_user, post, setup_managed_roles):
|
||||
orgs = [Organization.objects.create(name=f'foo-{i}') for i in range(2)]
|
||||
teams = [Team.objects.create(name=f'foo-{i}', organization=orgs[i]) for i in range(2)]
|
||||
teams[1].member_role.members.add(rando)
|
||||
|
||||
# give second team read permission to first team through the API for regression testing
|
||||
url = reverse('api:role_teams_list', kwargs={'pk': teams[0].read_role.pk, 'version': 'v2'})
|
||||
post(url, {'id': teams[1].id}, user=admin_user)
|
||||
|
||||
# user should be able to view the first team
|
||||
assert rando in teams[0].read_role
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_implicit_parents_no_assignments(organization):
|
||||
"""Through the normal course of creating models, we should not be changing DAB RBAC permissions"""
|
||||
|
||||
@ -50,13 +50,11 @@ def test_org_factory_roles(organization_factory):
|
||||
teams=['team1', 'team2'],
|
||||
users=['team1:foo', 'bar'],
|
||||
projects=['baz', 'bang'],
|
||||
roles=['team2.member_role:foo', 'team1.admin_role:bar', 'team1.member_role:team2.admin_role', 'baz.admin_role:foo'],
|
||||
roles=['team2.member_role:foo', 'team1.admin_role:bar', 'baz.admin_role:foo'],
|
||||
)
|
||||
|
||||
assert objects.users.bar in objects.teams.team2.admin_role
|
||||
assert objects.users.bar in objects.teams.team1.admin_role
|
||||
assert objects.users.foo in objects.projects.baz.admin_role
|
||||
assert objects.users.foo in objects.teams.team1.member_role
|
||||
assert objects.teams.team2.admin_role in objects.teams.team1.member_role.children.all()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
||||
@ -1213,6 +1213,7 @@ ANSIBLE_BASE_CACHE_PARENT_PERMISSIONS = True
|
||||
# Currently features are enabled to keep compatibility with old system, except custom roles
|
||||
ANSIBLE_BASE_ALLOW_TEAM_ORG_ADMIN = False
|
||||
# ANSIBLE_BASE_ALLOW_CUSTOM_ROLES = True
|
||||
ANSIBLE_BASE_ALLOW_TEAM_PARENTS = False
|
||||
ANSIBLE_BASE_ALLOW_CUSTOM_TEAM_ROLES = False
|
||||
ANSIBLE_BASE_ALLOW_SINGLETON_USER_ROLES = True
|
||||
ANSIBLE_BASE_ALLOW_SINGLETON_TEAM_ROLES = False # System auditor has always been restricted to users
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user