diff --git a/awx/main/access.py b/awx/main/access.py index cf41ac08af..3870b1012e 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -258,15 +258,14 @@ class OrganizationAccess(BaseAccess): model = Organization def get_queryset(self): - qs = self.model.objects.filter(active=True).distinct() + qs = self.model.accessible_objects(self.user, {'read':True}) qs = qs.select_related('created_by', 'modified_by') - if self.user.is_superuser: - return qs - return qs.filter(Q(admins__in=[self.user]) | Q(users__in=[self.user])) + return qs def can_change(self, obj, data): - return bool(self.user.is_superuser or - self.user in obj.admins.all()) + if self.user.is_superuser: + return True + return obj.accessible_by(self.user, ALL_PERMISSIONS) def can_delete(self, obj): self.check_license(feature='multiple_organizations', check_expiration=False) @@ -567,55 +566,29 @@ class CredentialAccess(BaseAccess): """Return the queryset for credentials, based on what the user is permitted to see. """ - # Create a base queryset. - # If the user is a superuser, and therefore can see everything, this - # is also sufficient, and we are done. - qs = self.model.objects.filter(active=True).distinct() + qs = self.model.accessible_objects(self.user, {'read':True}) qs = qs.select_related('created_by', 'modified_by', 'user', 'team') - if self.user.is_superuser: - return qs - - # Get the list of organizations for which the user is an admin - orgs_as_admin_ids = set(self.user.admin_of_organizations.filter(active=True).values_list('id', flat=True)) - return qs.filter( - Q(user=self.user) | - Q(user__organizations__id__in=orgs_as_admin_ids) | - Q(user__admin_of_organizations__id__in=orgs_as_admin_ids) | - Q(team__organization__id__in=orgs_as_admin_ids, team__active=True) | - Q(team__users__in=[self.user], team__active=True) - ) + return qs def can_add(self, data): if self.user.is_superuser: return True - user_pk = get_pk_from_dict(data, 'user') - if user_pk: - user_obj = get_object_or_400(User, pk=user_pk) - return self.user.can_access(User, 'change', user_obj, None) - team_pk = get_pk_from_dict(data, 'team') - if team_pk: - team_obj = get_object_or_400(Team, pk=team_pk) - return self.user.can_access(Team, 'change', team_obj, None) - return False + + user, team = user_or_team(data) + if user is None and team is None: + return False + + if user is not None: + return user.resource.accessible_by(self.user, {'write': True}) + if team is not None: + return team.accessible_by(self.user, {'write':True}) def can_change(self, obj, data): if self.user.is_superuser: return True if not self.can_add(data): return False - if self.user == obj.created_by: - return True - if obj.user: - if self.user == obj.user: - return True - if obj.user.organizations.filter(active=True, admins__in=[self.user]).exists(): - return True - if obj.user.admin_of_organizations.filter(active=True, admins__in=[self.user]).exists(): - return True - if obj.team: - if self.user in obj.team.organization.admins.filter(is_active=True): - return True - return False + return obj.accessible_by(self.user, {'read':True, 'update': True, 'delete':True}) def can_delete(self, obj): # Unassociated credentials may be marked deleted by anyone, though we diff --git a/awx/main/tests/functional/test_rbac_credential.py b/awx/main/tests/functional/test_rbac_credential.py index e2febb456d..ec990472a1 100644 --- a/awx/main/tests/functional/test_rbac_credential.py +++ b/awx/main/tests/functional/test_rbac_credential.py @@ -69,11 +69,9 @@ def test_credential_access_superuser(): assert access.can_delete(credential) @pytest.mark.django_db -def test_credential_access_admin(user, organization, team, credential): +def test_credential_access_admin(user, team, credential): u = user('org-admin', False) - organization.admins.add(u) - team.organization = organization - team.save() + team.organization.admin_role.members.add(u) access = CredentialAccess(u) @@ -85,10 +83,16 @@ def test_credential_access_admin(user, organization, team, credential): # unowned credential can be deleted assert access.can_delete(credential) - team.users.add(u) - assert not access.can_change(credential, {'user': u.pk}) - + # credential is now part of a team + # that is part of an organization + # that I am an admin for credential.team = team credential.save() + credential.owner_role.rebuild_role_ancestor_list() + cred = Credential.objects.create(kind='aws', name='test-cred') + cred.team = team + cred.save() + + # should have can_change access as org-admin assert access.can_change(credential, {'user': u.pk}) diff --git a/awx/main/tests/functional/test_rbac_inventory.py b/awx/main/tests/functional/test_rbac_inventory.py index 8834545140..d4440f6902 100644 --- a/awx/main/tests/functional/test_rbac_inventory.py +++ b/awx/main/tests/functional/test_rbac_inventory.py @@ -195,3 +195,39 @@ def test_group_parent_admin(group, permissions, user): parent2.admin_role.members.add(u) assert childA.accessible_by(u, permissions['admin']) + +@pytest.mark.django_db +def test_access_admin(organization, inventory, user): + a = user('admin', False) + inventory.organization = organization + organization.admin_role.members.add(a) + + access = InventoryAccess(a) + assert access.can_read(inventory) + assert access.can_add(None) + assert access.can_add({'organization': organization.id}) + assert access.can_change(inventory, None) + assert access.can_change(inventory, {'organization': organization.id}) + assert access.can_admin(inventory, None) + assert access.can_admin(inventory, {'organization': organization.id}) + assert access.can_delete(inventory) + assert access.can_run_ad_hoc_commands(inventory) + +@pytest.mark.django_db +def test_access_auditor(organization, inventory, user): + u = user('admin', False) + inventory.organization = organization + organization.auditor_role.members.add(u) + + access = InventoryAccess(u) + assert access.can_read(inventory) + assert not access.can_add(None) + assert not access.can_add({'organization': organization.id}) + assert not access.can_change(inventory, None) + assert not access.can_change(inventory, {'organization': organization.id}) + assert not access.can_admin(inventory, None) + assert not access.can_admin(inventory, {'organization': organization.id}) + assert not access.can_delete(inventory) + assert not access.can_run_ad_hoc_commands(inventory) + + diff --git a/awx/main/tests/functional/test_rbac_organization.py b/awx/main/tests/functional/test_rbac_organization.py index c8f1d709a2..cf3f5f6cdf 100644 --- a/awx/main/tests/functional/test_rbac_organization.py +++ b/awx/main/tests/functional/test_rbac_organization.py @@ -57,27 +57,27 @@ def test_organization_access_superuser(cl, organization, user): def test_organization_access_admin(cl, organization, user): '''can_change because I am an admin of that org''' a = user('admin', False) - organization.admins.add(a) - organization.users.add(user('user', False)) + organization.admin_role.members.add(a) + organization.member_role.members.add(user('user', False)) access = OrganizationAccess(a) assert access.can_change(organization, None) assert access.can_delete(organization) org = access.get_queryset()[0] - assert len(org.admins.all()) == 1 - assert len(org.users.all()) == 1 + assert len(org.admin_role.members.all()) == 1 + assert len(org.member_role.members.all()) == 1 @mock.patch.object(BaseAccess, 'check_license', return_value=None) @pytest.mark.django_db def test_organization_access_user(cl, organization, user): access = OrganizationAccess(user('user', False)) - organization.users.add(user('user', False)) + organization.member_role.members.add(user('user', False)) assert not access.can_change(organization, None) assert not access.can_delete(organization) org = access.get_queryset()[0] - assert len(org.admins.all()) == 0 - assert len(org.users.all()) == 1 + assert len(org.admin_role.members.all()) == 0 + assert len(org.member_role.members.all()) == 1