diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index a8f2854197..93d9d1527d 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -61,7 +61,7 @@ from wsgiref.util import FileWrapper # django-ansible-base from ansible_base.lib.utils.requests import get_remote_hosts -from ansible_base.rbac.models import RoleEvaluation, ObjectRole +from ansible_base.rbac.models import RoleEvaluation from ansible_base.resource_registry.shared_types import OrganizationType, TeamType, UserType # AWX @@ -91,7 +91,6 @@ from awx.api.generics import ( from awx.api.views.labels import LabelSubListCreateAttachDetachView from awx.api.versioning import reverse from awx.main import models -from awx.main.models.rbac import get_role_definition from awx.main.utils import ( camelcase_to_underscore, extract_ansible_vars, @@ -856,17 +855,9 @@ class TeamProjectsList(SubListAPIView): def get_queryset(self): team = self.get_parent_object() self.check_parent_access(team) - model_ct = ContentType.objects.get_for_model(self.model) - parent_ct = ContentType.objects.get_for_model(self.parent_model) - - rd = get_role_definition(team.member_role) - role = ObjectRole.objects.filter(object_id=team.id, content_type=parent_ct, role_definition=rd).first() - if role is None: - # Team has no permissions, therefore team has no projects - return self.model.objects.none() - else: - project_qs = self.model.accessible_objects(self.request.user, 'read_role') - return project_qs.filter(id__in=RoleEvaluation.objects.filter(content_type_id=model_ct.id, role=role).values_list('object_id')) + my_qs = self.model.accessible_objects(self.request.user, 'read_role') + team_qs = models.Project.accessible_objects(team, 'read_role') + return my_qs & team_qs class TeamActivityStreamList(SubListAPIView): @@ -981,13 +972,23 @@ class ProjectTeamsList(ListAPIView): serializer_class = serializers.TeamSerializer def get_queryset(self): - p = get_object_or_404(models.Project, pk=self.kwargs['pk']) - if not self.request.user.can_access(models.Project, 'read', p): + parent = get_object_or_404(models.Project, pk=self.kwargs['pk']) + if not self.request.user.can_access(models.Project, 'read', parent): raise PermissionDenied() - project_ct = ContentType.objects.get_for_model(models.Project) + + project_ct = ContentType.objects.get_for_model(parent) team_ct = ContentType.objects.get_for_model(self.model) - all_roles = models.Role.objects.filter(Q(descendents__content_type=project_ct) & Q(descendents__object_id=p.pk), content_type=team_ct) - return self.model.accessible_objects(self.request.user, 'read_role').filter(pk__in=[t.content_object.pk for t in all_roles]) + + roles_on_project = models.Role.objects.filter( + content_type=project_ct, + object_id=parent.pk, + ) + + team_member_parent_roles = models.Role.objects.filter(children__in=roles_on_project, role_field='member_role', content_type=team_ct).distinct() + + team_ids = team_member_parent_roles.values_list('object_id', flat=True) + my_qs = self.model.accessible_objects(self.request.user, 'read_role').filter(pk__in=team_ids) + return my_qs class ProjectSchedulesList(SubListCreateAPIView): diff --git a/awx/main/tests/functional/test_projects.py b/awx/main/tests/functional/test_projects.py index 1824c888f1..7d389d1e16 100644 --- a/awx/main/tests/functional/test_projects.py +++ b/awx/main/tests/functional/test_projects.py @@ -334,6 +334,69 @@ def test_team_project_list(get, team_project_list): ) +@pytest.mark.django_db +def test_project_teams_list_multiple_roles_distinct(get, organization_factory): + # test projects with multiple roles on the same team + objects = organization_factory( + 'org1', + superusers=['admin'], + teams=['teamA'], + projects=['proj1'], + roles=[ + 'teamA.member_role:proj1.admin_role', + 'teamA.member_role:proj1.use_role', + 'teamA.member_role:proj1.update_role', + 'teamA.member_role:proj1.read_role', + ], + ) + admin = objects.superusers.admin + proj1 = objects.projects.proj1 + + res = get(reverse('api:project_teams_list', kwargs={'pk': proj1.pk}), admin).data + names = [t['name'] for t in res['results']] + assert names == ['teamA'] + + +@pytest.mark.django_db +def test_project_teams_list_multiple_teams(get, organization_factory): + # test projects with multiple teams + objs = organization_factory( + 'org1', + superusers=['admin'], + teams=['teamA', 'teamB', 'teamC', 'teamD'], + projects=['proj1'], + roles=[ + 'teamA.member_role:proj1.admin_role', + 'teamB.member_role:proj1.update_role', + 'teamC.member_role:proj1.use_role', + 'teamD.member_role:proj1.read_role', + ], + ) + admin = objs.superusers.admin + proj1 = objs.projects.proj1 + + res = get(reverse('api:project_teams_list', kwargs={'pk': proj1.pk}), admin).data + names = sorted([t['name'] for t in res['results']]) + assert names == ['teamA', 'teamB', 'teamC', 'teamD'] + + +@pytest.mark.django_db +def test_project_teams_list_no_direct_assignments(get, organization_factory): + # test projects with no direct team assignments + objects = organization_factory( + 'org1', + superusers=['admin'], + teams=['teamA'], + projects=['proj1'], + roles=[], + ) + admin = objects.superusers.admin + proj1 = objects.projects.proj1 + + res = get(reverse('api:project_teams_list', kwargs={'pk': proj1.pk}), admin).data + assert res['count'] == 0 + + @pytest.mark.parametrize("u,expected_status_code", [('rando', 403), ('org_member', 403), ('org_admin', 201), ('admin', 201)]) @pytest.mark.django_db def test_create_project(post, organization, org_admin, org_member, admin, rando, u, expected_status_code):