mirror of
https://github.com/ansible/awx.git
synced 2026-05-12 20:07:37 -02:30
Consolidate validation rules for same-org restrictions (#16427)
* Consolidate implementation of same-org validation rule * Update tests for the simplified validation * Still do validation with deferance to the new callback * Correctly falsy handling in view logic
This commit is contained in:
@@ -801,22 +801,11 @@ class TeamRolesList(SubListAttachDetachAPIView):
|
||||
data = dict(msg=_("You cannot grant system-level permissions to a team."))
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
team = get_object_or_404(models.Team, pk=self.kwargs['pk'])
|
||||
credential_content_type = ContentType.objects.get_for_model(models.Credential)
|
||||
if role.content_type == credential_content_type:
|
||||
if not role.content_object.organization:
|
||||
data = dict(
|
||||
msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)")
|
||||
)
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
elif role.content_object.organization.id != team.organization.id:
|
||||
if not request.user.is_superuser:
|
||||
data = dict(
|
||||
msg=_(
|
||||
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
|
||||
)
|
||||
)
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
if not request.data.get('disassociate'):
|
||||
team = get_object_or_404(models.Team, pk=self.kwargs['pk'])
|
||||
content_object = role.content_object
|
||||
if hasattr(content_object, 'validate_role_assignment'):
|
||||
content_object.validate_role_assignment(team, role_definition=None, requesting_user=request.user)
|
||||
|
||||
return super(TeamRolesList, self).post(request, *args, **kwargs)
|
||||
|
||||
@@ -1275,19 +1264,12 @@ class UserRolesList(SubListAttachDetachAPIView):
|
||||
if not sub_id:
|
||||
return super(UserRolesList, self).post(request)
|
||||
|
||||
user = get_object_or_400(models.User, pk=self.kwargs['pk'])
|
||||
role = get_object_or_400(models.Role, pk=sub_id)
|
||||
|
||||
content_types = ContentType.objects.get_for_models(models.Organization, models.Team, models.Credential) # dict of {model: content_type}
|
||||
credential_content_type = content_types[models.Credential]
|
||||
if role.content_type == credential_content_type:
|
||||
if 'disassociate' not in request.data and role.content_object.organization and user not in role.content_object.organization.member_role:
|
||||
data = dict(msg=_("You cannot grant credential access to a user not in the credentials' organization"))
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
if not role.content_object.organization and not request.user.is_superuser:
|
||||
data = dict(msg=_("You cannot grant private credential access to another user"))
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
if not request.data.get('disassociate'):
|
||||
role = get_object_or_400(models.Role, pk=sub_id)
|
||||
user = get_object_or_400(models.User, pk=self.kwargs['pk'])
|
||||
content_object = role.content_object
|
||||
if hasattr(content_object, 'validate_role_assignment'):
|
||||
content_object.validate_role_assignment(user, role_definition=None, requesting_user=request.user)
|
||||
|
||||
return super(UserRolesList, self).post(request, *args, **kwargs)
|
||||
|
||||
@@ -4888,19 +4870,12 @@ class RoleUsersList(SubListAttachDetachAPIView):
|
||||
if not sub_id:
|
||||
return super(RoleUsersList, self).post(request)
|
||||
|
||||
user = get_object_or_400(models.User, pk=sub_id)
|
||||
role = self.get_parent_object()
|
||||
|
||||
content_types = ContentType.objects.get_for_models(models.Organization, models.Team, models.Credential) # dict of {model: content_type}
|
||||
credential_content_type = content_types[models.Credential]
|
||||
if role.content_type == credential_content_type:
|
||||
if 'disassociate' not in request.data and role.content_object.organization and user not in role.content_object.organization.member_role:
|
||||
data = dict(msg=_("You cannot grant credential access to a user not in the credentials' organization"))
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
if not role.content_object.organization and not request.user.is_superuser:
|
||||
data = dict(msg=_("You cannot grant private credential access to another user"))
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
if not request.data.get('disassociate'):
|
||||
user = get_object_or_400(models.User, pk=sub_id)
|
||||
role = self.get_parent_object()
|
||||
content_object = role.content_object
|
||||
if hasattr(content_object, 'validate_role_assignment'):
|
||||
content_object.validate_role_assignment(user, role_definition=None, requesting_user=request.user)
|
||||
|
||||
return super(RoleUsersList, self).post(request, *args, **kwargs)
|
||||
|
||||
@@ -4933,24 +4908,6 @@ class RoleTeamsList(SubListAttachDetachAPIView):
|
||||
data = dict(msg=_("You cannot assign an Organization participation role as a child role for a Team."))
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
credential_content_type = ContentType.objects.get_for_model(models.Credential)
|
||||
if role.content_type == credential_content_type:
|
||||
# Private credentials (no organization) are never allowed for teams
|
||||
if not role.content_object.organization:
|
||||
data = dict(
|
||||
msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)")
|
||||
)
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
# Cross-organization credentials are only allowed for superusers
|
||||
elif role.content_object.organization.id != team.organization.id:
|
||||
if not request.user.is_superuser:
|
||||
data = dict(
|
||||
msg=_(
|
||||
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
|
||||
)
|
||||
)
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
action = 'attach'
|
||||
if request.data.get('disassociate', None):
|
||||
action = 'unattach'
|
||||
@@ -4959,6 +4916,11 @@ class RoleTeamsList(SubListAttachDetachAPIView):
|
||||
data = dict(msg=_("You cannot grant system-level permissions to a team."))
|
||||
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
if action == 'attach':
|
||||
content_object = role.content_object
|
||||
if hasattr(content_object, 'validate_role_assignment'):
|
||||
content_object.validate_role_assignment(team, role_definition=None, requesting_user=request.user)
|
||||
|
||||
if not request.user.can_access(self.parent_model, action, role, team, self.relationship, request.data, skip_sub_obj_read_check=False):
|
||||
raise PermissionDenied()
|
||||
if request.data.get('disassociate', None):
|
||||
|
||||
@@ -49,10 +49,6 @@ from awx.main.models import Team, Organization
|
||||
from awx.main.utils import encrypt_field
|
||||
from awx_plugins.interfaces._temporary_private_licensing_api import detect_server_product_name
|
||||
|
||||
# DAB
|
||||
from ansible_base.resource_registry.tasks.sync import get_resource_server_client
|
||||
from ansible_base.resource_registry.utils.settings import resource_server_defined
|
||||
|
||||
__all__ = ['Credential', 'CredentialType', 'CredentialInputSource', 'build_safe_env']
|
||||
|
||||
logger = logging.getLogger('awx.main.models.credential')
|
||||
@@ -80,46 +76,6 @@ def build_safe_env(env):
|
||||
return safe_env
|
||||
|
||||
|
||||
def check_resource_server_for_user_in_organization(user, organization, requesting_user):
|
||||
if not resource_server_defined():
|
||||
return False
|
||||
|
||||
if not requesting_user:
|
||||
return False
|
||||
|
||||
client = get_resource_server_client(settings.RESOURCE_SERVICE_PATH, jwt_user_id=str(requesting_user.resource.ansible_id), raise_if_bad_request=False)
|
||||
# need to get the organization object_id in resource server, by querying with ansible_id
|
||||
response = client._make_request(path=f'resources/?ansible_id={str(organization.resource.ansible_id)}', method='GET')
|
||||
response_json = response.json()
|
||||
if response.status_code != 200:
|
||||
logger.error(f'Failed to get organization object_id in resource server: {response_json.get("detail", "")}')
|
||||
return False
|
||||
|
||||
if response_json.get('count', 0) == 0:
|
||||
return False
|
||||
org_id_in_resource_server = response_json['results'][0]['object_id']
|
||||
|
||||
client.base_url = client.base_url.replace('/api/gateway/v1/service-index/', '/api/gateway/v1/')
|
||||
# find role assignments with:
|
||||
# - roles Organization Member or Organization Admin
|
||||
# - user ansible id
|
||||
# - organization object id
|
||||
|
||||
response = client._make_request(
|
||||
path=f'role_user_assignments/?role_definition__name__in=Organization Member,Organization Admin&user__resource__ansible_id={str(user.resource.ansible_id)}&object_id={org_id_in_resource_server}',
|
||||
method='GET',
|
||||
)
|
||||
response_json = response.json()
|
||||
if response.status_code != 200:
|
||||
logger.error(f'Failed to get role user assignments in resource server: {response_json.get("detail", "")}')
|
||||
return False
|
||||
|
||||
if response_json.get('count', 0) > 0:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
||||
"""
|
||||
A credential contains information about how to talk to a remote resource
|
||||
@@ -396,16 +352,15 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
||||
raise ValueError('{} is not a dynamic input field'.format(field_name))
|
||||
|
||||
def validate_role_assignment(self, actor, role_definition, **kwargs):
|
||||
requesting_user = kwargs.get('requesting_user', None)
|
||||
if requesting_user and requesting_user.is_superuser:
|
||||
return
|
||||
if self.organization:
|
||||
if isinstance(actor, User):
|
||||
if actor.is_superuser:
|
||||
return
|
||||
if Organization.access_qs(actor, 'member').filter(id=self.organization.id).exists():
|
||||
return
|
||||
|
||||
requesting_user = kwargs.get('requesting_user', None)
|
||||
if check_resource_server_for_user_in_organization(actor, self.organization, requesting_user):
|
||||
return
|
||||
if isinstance(actor, Team):
|
||||
if actor.organization == self.organization:
|
||||
return
|
||||
|
||||
@@ -58,8 +58,6 @@ class ExecutionEnvironment(CommonModel):
|
||||
return reverse('api:execution_environment_detail', kwargs={'pk': self.pk}, request=request)
|
||||
|
||||
def validate_role_assignment(self, actor, role_definition, **kwargs):
|
||||
from awx.main.models.credential import check_resource_server_for_user_in_organization
|
||||
|
||||
if self.managed:
|
||||
raise ValidationError({'object_id': _('Can not assign object roles to managed Execution Environments')})
|
||||
if self.organization_id is None:
|
||||
@@ -69,8 +67,4 @@ class ExecutionEnvironment(CommonModel):
|
||||
if actor.has_obj_perm(self.organization, 'view'):
|
||||
return
|
||||
|
||||
requesting_user = kwargs.get('requesting_user', None)
|
||||
if check_resource_server_for_user_in_organization(actor, self.organization, requesting_user):
|
||||
return
|
||||
|
||||
raise ValidationError({'user': _('User must have view permission to Execution Environment organization')})
|
||||
|
||||
@@ -200,6 +200,7 @@ def test_grant_org_credential_to_org_user_through_user_roles(post, credential, o
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_org_credential_to_non_org_user_through_role_users(post, credential, organization, org_admin, alice):
|
||||
# NOTE: this endpoint is going away soon
|
||||
credential.organization = organization
|
||||
credential.save()
|
||||
response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {'id': alice.id}, org_admin)
|
||||
@@ -208,6 +209,7 @@ def test_grant_org_credential_to_non_org_user_through_role_users(post, credentia
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_org_credential_to_non_org_user_through_user_roles(post, credential, organization, org_admin, alice):
|
||||
# NOTE: this endpoint is going away soon
|
||||
credential.organization = organization
|
||||
credential.save()
|
||||
response = post(reverse('api:user_roles_list', kwargs={'pk': alice.id}), {'id': credential.use_role.id}, org_admin)
|
||||
@@ -216,18 +218,18 @@ def test_grant_org_credential_to_non_org_user_through_user_roles(post, credentia
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_private_credential_to_user_through_role_users(post, credential, alice, bob):
|
||||
# normal users can't do this
|
||||
# NOTE: this endpoint is going away soon
|
||||
credential.admin_role.members.add(alice)
|
||||
response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {'id': bob.id}, alice)
|
||||
assert response.status_code == 400
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_private_credential_to_org_user_through_role_users(post, credential, org_admin, org_member):
|
||||
# org admins can't either
|
||||
# NOTE: this endpoint is going away soon
|
||||
credential.admin_role.members.add(org_admin)
|
||||
response = post(reverse('api:role_users_list', kwargs={'pk': credential.use_role.id}), {'id': org_member.id}, org_admin)
|
||||
assert response.status_code == 400
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -239,18 +241,18 @@ def test_sa_grant_private_credential_to_user_through_role_users(post, credential
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_private_credential_to_user_through_user_roles(post, credential, alice, bob):
|
||||
# normal users can't do this
|
||||
# NOTE: this endpoint is going away soon
|
||||
credential.admin_role.members.add(alice)
|
||||
response = post(reverse('api:user_roles_list', kwargs={'pk': bob.id}), {'id': credential.use_role.id}, alice)
|
||||
assert response.status_code == 400
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_private_credential_to_org_user_through_user_roles(post, credential, org_admin, org_member):
|
||||
# org admins can't either
|
||||
# NOTE: this endpoint is going away soon
|
||||
credential.admin_role.members.add(org_admin)
|
||||
response = post(reverse('api:user_roles_list', kwargs={'pk': org_member.id}), {'id': credential.use_role.id}, org_admin)
|
||||
assert response.status_code == 400
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@@ -282,14 +284,14 @@ def test_grant_org_credential_to_team_through_team_roles(post, credential, organ
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_sa_grant_private_credential_to_team_through_role_teams(post, credential, admin, team):
|
||||
# not even a system admin can grant a private cred to a team though
|
||||
# NOTE: this endpoint is going away soon
|
||||
response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, admin)
|
||||
assert response.status_code == 400
|
||||
assert response.status_code == 204
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_credential_to_team_different_organization_through_role_teams(post, get, credential, organizations, admin, org_admin, team, team_member):
|
||||
# # Test that credential from different org can be assigned to team by a superuser through role_teams_list endpoint
|
||||
# NOTE: this endpoint is going away soon
|
||||
orgs = organizations(2)
|
||||
credential.organization = orgs[0]
|
||||
credential.save()
|
||||
@@ -299,10 +301,7 @@ def test_grant_credential_to_team_different_organization_through_role_teams(post
|
||||
# Non-superuser (org_admin) trying cross-org assignment should be denied
|
||||
response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, org_admin)
|
||||
assert response.status_code == 400
|
||||
assert (
|
||||
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
|
||||
in response.data['msg']
|
||||
)
|
||||
assert "You cannot grant credential access to a Team not in the credentials' organization" in str(response.data['detail'])
|
||||
|
||||
# Superuser (admin) can do cross-org assignment
|
||||
response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, admin)
|
||||
@@ -316,20 +315,17 @@ def test_grant_credential_to_team_different_organization_through_role_teams(post
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_grant_credential_to_team_different_organization(post, get, credential, organizations, admin, org_admin, team, team_member):
|
||||
# Test that credential from different org can be assigned to team by a superuser
|
||||
# NOTE: this endpoint is going away soon
|
||||
orgs = organizations(2)
|
||||
credential.organization = orgs[0]
|
||||
credential.save()
|
||||
team.organization = orgs[1]
|
||||
team.save()
|
||||
|
||||
# Non-superuser (org_admin, ...) trying cross-org assignment should be denied
|
||||
# Non-superuser (org_admin) trying cross-org assignment should be denied
|
||||
response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, org_admin)
|
||||
assert response.status_code == 400
|
||||
assert (
|
||||
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
|
||||
in response.data['msg']
|
||||
)
|
||||
assert "You cannot grant credential access to a Team not in the credentials' organization" in str(response.data['detail'])
|
||||
|
||||
# Superuser (system admin) can do cross-org assignment
|
||||
response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, admin)
|
||||
|
||||
@@ -131,14 +131,18 @@ def test_workflow_creation_permissions(setup_managed_roles, organization, workfl
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_assign_credential_to_user_of_another_org(setup_managed_roles, credential, admin_user, rando, org_admin, organization, post):
|
||||
'''Test that a credential can only be assigned to a user in the same organization'''
|
||||
# cannot assign credential to rando, as rando is not in the same org as the credential
|
||||
'''Test that a credential can only be assigned to a user in the same organization by non-superusers'''
|
||||
rd = RoleDefinition.objects.get(name="Credential Admin")
|
||||
credential.organization = organization
|
||||
credential.save(update_fields=['organization'])
|
||||
assert credential.organization not in Organization.access_qs(rando, 'member')
|
||||
url = django_reverse('roleuserassignment-list')
|
||||
resp = post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=400)
|
||||
|
||||
# superuser can assign cross-org
|
||||
post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=201)
|
||||
|
||||
# non-superuser (org_admin) cannot assign cross-org
|
||||
resp = post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=org_admin, expect=400)
|
||||
assert "You cannot grant credential access to a User not in the credentials' organization" in str(resp.data)
|
||||
|
||||
# can assign credential to superuser
|
||||
@@ -146,7 +150,7 @@ def test_assign_credential_to_user_of_another_org(setup_managed_roles, credentia
|
||||
rando.save()
|
||||
post(url=url, data={"user": rando.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=201)
|
||||
|
||||
# can assign credential to org_admin
|
||||
# can assign credential to org_admin (same org)
|
||||
assert credential.organization in Organization.access_qs(org_admin, 'member')
|
||||
post(url=url, data={"user": org_admin.id, "role_definition": rd.id, "object_id": credential.id}, user=admin_user, expect=201)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user