diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 8e127357a3..522bf2836f 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -801,22 +801,11 @@ class TeamRolesList(SubListAttachDetachAPIView): data = dict(msg=_("You cannot grant system-level permissions to a team.")) return Response(data, status=status.HTTP_400_BAD_REQUEST) - team = get_object_or_404(models.Team, pk=self.kwargs['pk']) - credential_content_type = ContentType.objects.get_for_model(models.Credential) - if role.content_type == credential_content_type: - if not role.content_object.organization: - data = dict( - msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)") - ) - return Response(data, status=status.HTTP_400_BAD_REQUEST) - elif role.content_object.organization.id != team.organization.id: - if not request.user.is_superuser: - data = dict( - msg=_( - "You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams" - ) - ) - return Response(data, status=status.HTTP_400_BAD_REQUEST) + if not request.data.get('disassociate'): + team = get_object_or_404(models.Team, pk=self.kwargs['pk']) + content_object = role.content_object + if hasattr(content_object, 'validate_role_assignment'): + content_object.validate_role_assignment(team, role_definition=None, requesting_user=request.user) return super(TeamRolesList, self).post(request, *args, **kwargs) @@ -1275,19 +1264,12 @@ class UserRolesList(SubListAttachDetachAPIView): if not sub_id: return super(UserRolesList, self).post(request) - user = get_object_or_400(models.User, pk=self.kwargs['pk']) - role = get_object_or_400(models.Role, pk=sub_id) - - content_types = ContentType.objects.get_for_models(models.Organization, models.Team, models.Credential) # dict of {model: content_type} - credential_content_type = content_types[models.Credential] - if role.content_type == credential_content_type: - if 'disassociate' not in request.data and role.content_object.organization and user not in role.content_object.organization.member_role: - data = dict(msg=_("You cannot grant credential access to a user not in the credentials' organization")) - return Response(data, status=status.HTTP_400_BAD_REQUEST) - - if not role.content_object.organization and not request.user.is_superuser: - data = dict(msg=_("You cannot grant private credential access to another user")) - return Response(data, status=status.HTTP_400_BAD_REQUEST) + if not request.data.get('disassociate'): + role = get_object_or_400(models.Role, pk=sub_id) + user = get_object_or_400(models.User, pk=self.kwargs['pk']) + content_object = role.content_object + if hasattr(content_object, 'validate_role_assignment'): + content_object.validate_role_assignment(user, role_definition=None, requesting_user=request.user) return super(UserRolesList, self).post(request, *args, **kwargs) @@ -4888,19 +4870,12 @@ class RoleUsersList(SubListAttachDetachAPIView): if not sub_id: return super(RoleUsersList, self).post(request) - user = get_object_or_400(models.User, pk=sub_id) - role = self.get_parent_object() - - content_types = ContentType.objects.get_for_models(models.Organization, models.Team, models.Credential) # dict of {model: content_type} - credential_content_type = content_types[models.Credential] - if role.content_type == credential_content_type: - if 'disassociate' not in request.data and role.content_object.organization and user not in role.content_object.organization.member_role: - data = dict(msg=_("You cannot grant credential access to a user not in the credentials' organization")) - return Response(data, status=status.HTTP_400_BAD_REQUEST) - - if not role.content_object.organization and not request.user.is_superuser: - data = dict(msg=_("You cannot grant private credential access to another user")) - return Response(data, status=status.HTTP_400_BAD_REQUEST) + if not request.data.get('disassociate'): + user = get_object_or_400(models.User, pk=sub_id) + role = self.get_parent_object() + content_object = role.content_object + if hasattr(content_object, 'validate_role_assignment'): + content_object.validate_role_assignment(user, role_definition=None, requesting_user=request.user) return super(RoleUsersList, self).post(request, *args, **kwargs) @@ -4933,24 +4908,6 @@ class RoleTeamsList(SubListAttachDetachAPIView): data = dict(msg=_("You cannot assign an Organization participation role as a child role for a Team.")) return Response(data, status=status.HTTP_400_BAD_REQUEST) - credential_content_type = ContentType.objects.get_for_model(models.Credential) - if role.content_type == credential_content_type: - # Private credentials (no organization) are never allowed for teams - if not role.content_object.organization: - data = dict( - msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)") - ) - return Response(data, status=status.HTTP_400_BAD_REQUEST) - # Cross-organization credentials are only allowed for superusers - elif role.content_object.organization.id != team.organization.id: - if not request.user.is_superuser: - data = dict( - msg=_( - "You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams" - ) - ) - return Response(data, status=status.HTTP_400_BAD_REQUEST) - action = 'attach' if request.data.get('disassociate', None): action = 'unattach' @@ -4959,6 +4916,11 @@ class RoleTeamsList(SubListAttachDetachAPIView): data = dict(msg=_("You cannot grant system-level permissions to a team.")) return Response(data, status=status.HTTP_400_BAD_REQUEST) + if action == 'attach': + content_object = role.content_object + if hasattr(content_object, 'validate_role_assignment'): + content_object.validate_role_assignment(team, role_definition=None, requesting_user=request.user) + if not request.user.can_access(self.parent_model, action, role, team, self.relationship, request.data, skip_sub_obj_read_check=False): raise PermissionDenied() if request.data.get('disassociate', None): diff --git a/awx/main/models/credential.py b/awx/main/models/credential.py index a4bd1061bf..9959694384 100644 --- a/awx/main/models/credential.py +++ b/awx/main/models/credential.py @@ -49,10 +49,6 @@ from awx.main.models import Team, Organization from awx.main.utils import encrypt_field from awx_plugins.interfaces._temporary_private_licensing_api import detect_server_product_name -# DAB -from ansible_base.resource_registry.tasks.sync import get_resource_server_client -from ansible_base.resource_registry.utils.settings import resource_server_defined - __all__ = ['Credential', 'CredentialType', 'CredentialInputSource', 'build_safe_env'] logger = logging.getLogger('awx.main.models.credential') @@ -80,46 +76,6 @@ def build_safe_env(env): return safe_env -def check_resource_server_for_user_in_organization(user, organization, requesting_user): - if not resource_server_defined(): - return False - - if not requesting_user: - return False - - client = get_resource_server_client(settings.RESOURCE_SERVICE_PATH, jwt_user_id=str(requesting_user.resource.ansible_id), raise_if_bad_request=False) - # need to get the organization object_id in resource server, by querying with ansible_id - response = client._make_request(path=f'resources/?ansible_id={str(organization.resource.ansible_id)}', method='GET') - response_json = response.json() - if response.status_code != 200: - logger.error(f'Failed to get organization object_id in resource server: {response_json.get("detail", "")}') - return False - - if response_json.get('count', 0) == 0: - return False - org_id_in_resource_server = response_json['results'][0]['object_id'] - - client.base_url = client.base_url.replace('/api/gateway/v1/service-index/', '/api/gateway/v1/') - # find role assignments with: - # - roles Organization Member or Organization Admin - # - user ansible id - # - organization object id - - response = client._make_request( - path=f'role_user_assignments/?role_definition__name__in=Organization Member,Organization Admin&user__resource__ansible_id={str(user.resource.ansible_id)}&object_id={org_id_in_resource_server}', - method='GET', - ) - response_json = response.json() - if response.status_code != 200: - logger.error(f'Failed to get role user assignments in resource server: {response_json.get("detail", "")}') - return False - - if response_json.get('count', 0) > 0: - return True - - return False - - class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): """ A credential contains information about how to talk to a remote resource @@ -396,16 +352,15 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): raise ValueError('{} is not a dynamic input field'.format(field_name)) def validate_role_assignment(self, actor, role_definition, **kwargs): + requesting_user = kwargs.get('requesting_user', None) + if requesting_user and requesting_user.is_superuser: + return if self.organization: if isinstance(actor, User): if actor.is_superuser: return if Organization.access_qs(actor, 'member').filter(id=self.organization.id).exists(): return - - requesting_user = kwargs.get('requesting_user', None) - if check_resource_server_for_user_in_organization(actor, self.organization, requesting_user): - return if isinstance(actor, Team): if actor.organization == self.organization: return diff --git a/awx/main/models/execution_environments.py b/awx/main/models/execution_environments.py index 925b634a73..3636387a21 100644 --- a/awx/main/models/execution_environments.py +++ b/awx/main/models/execution_environments.py @@ -58,8 +58,6 @@ class ExecutionEnvironment(CommonModel): return reverse('api:execution_environment_detail', kwargs={'pk': self.pk}, request=request) def validate_role_assignment(self, actor, role_definition, **kwargs): - from awx.main.models.credential import check_resource_server_for_user_in_organization - if self.managed: raise ValidationError({'object_id': _('Can not assign object roles to managed Execution Environments')}) if self.organization_id is None: @@ -69,8 +67,4 @@ class ExecutionEnvironment(CommonModel): if actor.has_obj_perm(self.organization, 'view'): return - requesting_user = kwargs.get('requesting_user', None) - if check_resource_server_for_user_in_organization(actor, self.organization, requesting_user): - return - raise ValidationError({'user': _('User must have view permission to Execution Environment organization')}) diff --git a/awx/main/tests/functional/api/test_credential.py b/awx/main/tests/functional/api/test_credential.py index 7956ebed4f..8d2f6dc0bb 100644 --- a/awx/main/tests/functional/api/test_credential.py +++ b/awx/main/tests/functional/api/test_credential.py @@ -200,6 +200,7 @@ def test_grant_org_credential_to_org_user_through_user_roles(post, credential, o @pytest.mark.django_db def test_grant_org_credential_to_non_org_user_through_role_users(post, credential, organization, org_admin, alice): + # NOTE: this endpoint is going away soon credential.organization = organization credential.save() response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {'id': alice.id}, org_admin) @@ -208,6 +209,7 @@ def test_grant_org_credential_to_non_org_user_through_role_users(post, credentia @pytest.mark.django_db def test_grant_org_credential_to_non_org_user_through_user_roles(post, credential, organization, org_admin, alice): + # NOTE: this endpoint is going away soon credential.organization = organization credential.save() response = post(reverse('api:user_roles_list', kwargs={'pk': alice.id}), {'id': credential.use_role.id}, org_admin) @@ -216,18 +218,18 @@ def test_grant_org_credential_to_non_org_user_through_user_roles(post, credentia @pytest.mark.django_db def test_grant_private_credential_to_user_through_role_users(post, credential, alice, bob): - # normal users can't do this + # NOTE: this endpoint is going away soon credential.admin_role.members.add(alice) response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {'id': bob.id}, alice) - assert response.status_code == 400 + assert response.status_code == 403 @pytest.mark.django_db def test_grant_private_credential_to_org_user_through_role_users(post, credential, org_admin, org_member): - # org admins can't either + # NOTE: this endpoint is going away soon credential.admin_role.members.add(org_admin) response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {'id': org_member.id}, org_admin) - assert response.status_code == 400 + assert response.status_code == 204 @pytest.mark.django_db @@ -239,18 +241,18 @@ def test_sa_grant_private_credential_to_user_through_role_users(post, credential @pytest.mark.django_db def test_grant_private_credential_to_user_through_user_roles(post, credential, alice, bob): - # normal users can't do this + # NOTE: this endpoint is going away soon credential.admin_role.members.add(alice) response = post(reverse('api:user_roles_list', kwargs={'pk': bob.id}), {'id': credential.use_role.id}, alice) - assert response.status_code == 400 + assert response.status_code == 403 @pytest.mark.django_db def test_grant_private_credential_to_org_user_through_user_roles(post, credential, org_admin, org_member): - # org admins can't either + # NOTE: this endpoint is going away soon credential.admin_role.members.add(org_admin) response = post(reverse('api:user_roles_list', kwargs={'pk': org_member.id}), {'id': credential.use_role.id}, org_admin) - assert response.status_code == 400 + assert response.status_code == 204 @pytest.mark.django_db @@ -282,14 +284,14 @@ def test_grant_org_credential_to_team_through_team_roles(post, credential, organ @pytest.mark.django_db def test_sa_grant_private_credential_to_team_through_role_teams(post, credential, admin, team): - # not even a system admin can grant a private cred to a team though + # NOTE: this endpoint is going away soon response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, admin) - assert response.status_code == 400 + assert response.status_code == 204 @pytest.mark.django_db def test_grant_credential_to_team_different_organization_through_role_teams(post, get, credential, organizations, admin, org_admin, team, team_member): - # # Test that credential from different org can be assigned to team by a superuser through role_teams_list endpoint + # NOTE: this endpoint is going away soon orgs = organizations(2) credential.organization = orgs[0] credential.save() @@ -299,10 +301,7 @@ def test_grant_credential_to_team_different_organization_through_role_teams(post # Non-superuser (org_admin) trying cross-org assignment should be denied response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, org_admin) assert response.status_code == 400 - assert ( - "You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams" - in response.data['msg'] - ) + assert "You cannot grant credential access to a Team not in the credentials' organization" in str(response.data['detail']) # Superuser (admin) can do cross-org assignment response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, admin) @@ -316,20 +315,17 @@ def test_grant_credential_to_team_different_organization_through_role_teams(post @pytest.mark.django_db def test_grant_credential_to_team_different_organization(post, get, credential, organizations, admin, org_admin, team, team_member): - # Test that credential from different org can be assigned to team by a superuser + # NOTE: this endpoint is going away soon orgs = organizations(2) credential.organization = orgs[0] credential.save() team.organization = orgs[1] team.save() - # Non-superuser (org_admin, ...) trying cross-org assignment should be denied + # Non-superuser (org_admin) trying cross-org assignment should be denied response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, org_admin) assert response.status_code == 400 - assert ( - "You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams" - in response.data['msg'] - ) + assert "You cannot grant credential access to a Team not in the credentials' organization" in str(response.data['detail']) # Superuser (system admin) can do cross-org assignment response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, admin) diff --git a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py index 3b09272d8c..be87861b35 100644 --- a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py +++ b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py @@ -131,14 +131,18 @@ def test_workflow_creation_permissions(setup_managed_roles, organization, workfl @pytest.mark.django_db def test_assign_credential_to_user_of_another_org(setup_managed_roles, credential, admin_user, rando, org_admin, organization, post): - '''Test that a credential can only be assigned to a user in the same organization''' - # cannot assign credential to rando, as rando is not in the same org as the credential + '''Test that a credential can only be assigned to a user in the same organization by non-superusers''' rd = RoleDefinition.objects.get(name="Credential Admin") credential.organization = organization credential.save(update_fields=['organization']) assert credential.organization not in Organization.access_qs(rando, 'member') url = django_reverse('roleuserassignment-list') - resp = post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=400) + + # superuser can assign cross-org + post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=201) + + # non-superuser (org_admin) cannot assign cross-org + resp = post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=org_admin, expect=400) assert "You cannot grant credential access to a User not in the credentials' organization" in str(resp.data) # can assign credential to superuser @@ -146,7 +150,7 @@ def test_assign_credential_to_user_of_another_org(setup_managed_roles, credentia rando.save() post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=201) - # can assign credential to org_admin + # can assign credential to org_admin (same org) assert credential.organization in Organization.access_qs(org_admin, 'member') post(url=url, data={"user": org_admin.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=201)