mirror of
https://github.com/ansible/awx.git
synced 2026-01-10 15:32:07 -03:30
Update ExecutionEnvironment model so object-level roles work with DAB RBAC system (#15289)
* Add initial test for deletion of stale permission * Delete existing EE view permission * Hypothetically complete update of EE model permissions setup * Tests passing locally * Issue with user_capabilities was a test bug, fixed
This commit is contained in:
parent
a70b0c1ddc
commit
b59aff50dc
@ -1387,12 +1387,11 @@ class TeamAccess(BaseAccess):
|
||||
class ExecutionEnvironmentAccess(BaseAccess):
|
||||
"""
|
||||
I can see an execution environment when:
|
||||
- I'm a superuser
|
||||
- I'm a member of the same organization
|
||||
- it is a global ExecutionEnvironment
|
||||
- I can see its organization
|
||||
- It is a global ExecutionEnvironment
|
||||
I can create/change an execution environment when:
|
||||
- I'm a superuser
|
||||
- I'm an admin for the organization(s)
|
||||
- I have an organization or object role that gives access
|
||||
"""
|
||||
|
||||
model = ExecutionEnvironment
|
||||
@ -1416,7 +1415,7 @@ class ExecutionEnvironmentAccess(BaseAccess):
|
||||
raise PermissionDenied
|
||||
if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED:
|
||||
if not self.user.has_obj_perm(obj, 'change'):
|
||||
raise PermissionDenied
|
||||
return False
|
||||
else:
|
||||
if self.user not in obj.organization.execution_environment_admin_role:
|
||||
raise PermissionDenied
|
||||
@ -1424,7 +1423,7 @@ class ExecutionEnvironmentAccess(BaseAccess):
|
||||
new_org = get_object_from_data('organization', Organization, data, obj=obj)
|
||||
if not new_org or self.user not in new_org.execution_environment_admin_role:
|
||||
return False
|
||||
return self.check_related('organization', Organization, data, obj=obj, mandatory=True, role_field='execution_environment_admin_role')
|
||||
return self.check_related('organization', Organization, data, obj=obj, role_field='execution_environment_admin_role')
|
||||
|
||||
def can_delete(self, obj):
|
||||
if obj.managed:
|
||||
|
||||
26
awx/main/migrations/0195_EE_permissions.py
Normal file
26
awx/main/migrations/0195_EE_permissions.py
Normal file
@ -0,0 +1,26 @@
|
||||
# Generated by Django 4.2.6 on 2024-06-20 15:55
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
def delete_execution_environment_read_role(apps, schema_editor):
|
||||
permission_classes = [apps.get_model('auth', 'Permission'), apps.get_model('dab_rbac', 'DABPermission')]
|
||||
for permission_cls in permission_classes:
|
||||
ee_read_perm = permission_cls.objects.filter(codename='view_executionenvironment').first()
|
||||
if ee_read_perm:
|
||||
ee_read_perm.delete()
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0194_alter_inventorysource_source_and_more'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterModelOptions(
|
||||
name='executionenvironment',
|
||||
options={'default_permissions': ('add', 'change', 'delete'), 'ordering': ('-created',)},
|
||||
),
|
||||
migrations.RunPython(delete_execution_environment_read_role, migrations.RunPython.noop),
|
||||
]
|
||||
@ -134,8 +134,7 @@ def get_permissions_for_role(role_field, children_map, apps):
|
||||
|
||||
# more special cases for those same above special org-level roles
|
||||
if role_field.name == 'auditor_role':
|
||||
for codename in ('view_notificationtemplate', 'view_executionenvironment'):
|
||||
perm_list.append(Permission.objects.get(codename=codename))
|
||||
perm_list.append(Permission.objects.get(codename='view_notificationtemplate'))
|
||||
|
||||
return perm_list
|
||||
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
from django.db import models
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from rest_framework.exceptions import ValidationError
|
||||
|
||||
from awx.api.versioning import reverse
|
||||
from awx.main.models.base import CommonModel
|
||||
from awx.main.validators import validate_container_image_name
|
||||
@ -12,6 +14,8 @@ __all__ = ['ExecutionEnvironment']
|
||||
class ExecutionEnvironment(CommonModel):
|
||||
class Meta:
|
||||
ordering = ('-created',)
|
||||
# Remove view permission, as a temporary solution, defer to organization read permission
|
||||
default_permissions = ('add', 'change', 'delete')
|
||||
|
||||
PULL_CHOICES = [
|
||||
('always', _("Always pull container before running.")),
|
||||
@ -53,3 +57,16 @@ class ExecutionEnvironment(CommonModel):
|
||||
|
||||
def get_absolute_url(self, request=None):
|
||||
return reverse('api:execution_environment_detail', kwargs={'pk': self.pk}, request=request)
|
||||
|
||||
def validate_role_assignment(self, actor, role_definition):
|
||||
if self.managed:
|
||||
raise ValidationError({'object_id': _('Can not assign object roles to managed Execution Environments')})
|
||||
if self.organization_id is None:
|
||||
raise ValidationError({'object_id': _('Can not assign object roles to global Execution Environments')})
|
||||
|
||||
if actor._meta.model_name == 'user' and (not actor.has_obj_perm(self.organization, 'view')):
|
||||
raise ValidationError({'user': _('User must have view permission to Execution Environment organization')})
|
||||
if actor._meta.model_name == 'team':
|
||||
organization_cls = self._meta.get_field('organization').related_model
|
||||
if self.orgaanization not in organization_cls.access_qs(actor, 'view'):
|
||||
raise ValidationError({'team': _('Team must have view permission to Execution Environment organization')})
|
||||
|
||||
@ -92,3 +92,10 @@ class TestMigrationSmoke:
|
||||
# Test special cases in managed role creation
|
||||
assert not RoleDefinition.objects.filter(name='Organization Team Admin').exists()
|
||||
assert not RoleDefinition.objects.filter(name='Organization InstanceGroup Admin').exists()
|
||||
|
||||
# Test that a removed EE model permission has been deleted
|
||||
new_state = migrator.apply_tested_migration(
|
||||
('main', '0195_EE_permissions'),
|
||||
)
|
||||
DABPermission = new_state.apps.get_model('dab_rbac', 'DABPermission')
|
||||
assert not DABPermission.objects.filter(codename='view_executionenvironment').exists()
|
||||
|
||||
107
awx/main/tests/functional/test_rbac_execution_environment.py
Normal file
107
awx/main/tests/functional/test_rbac_execution_environment.py
Normal file
@ -0,0 +1,107 @@
|
||||
import pytest
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
from awx.main.access import ExecutionEnvironmentAccess
|
||||
from awx.main.models import ExecutionEnvironment, Organization
|
||||
from awx.main.models.rbac import get_role_codenames
|
||||
|
||||
from awx.api.versioning import reverse
|
||||
from django.urls import reverse as django_reverse
|
||||
|
||||
from ansible_base.rbac.models import RoleDefinition
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ee_rd():
|
||||
return RoleDefinition.objects.create_from_permissions(
|
||||
name='EE object admin',
|
||||
permissions=['change_executionenvironment', 'delete_executionenvironment'],
|
||||
content_type=ContentType.objects.get_for_model(ExecutionEnvironment),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def org_ee_rd():
|
||||
return RoleDefinition.objects.create_from_permissions(
|
||||
name='EE org admin',
|
||||
permissions=['add_executionenvironment', 'change_executionenvironment', 'delete_executionenvironment', 'view_organization'],
|
||||
content_type=ContentType.objects.get_for_model(Organization),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_old_ee_role_maps_to_correct_permissions(organization):
|
||||
assert set(get_role_codenames(organization.execution_environment_admin_role)) == {
|
||||
'view_organization',
|
||||
'add_executionenvironment',
|
||||
'change_executionenvironment',
|
||||
'delete_executionenvironment',
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def org_ee(organization):
|
||||
return ExecutionEnvironment.objects.create(name='some user ee', organization=organization)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def check_user_capabilities(get, setup_managed_roles):
|
||||
def _rf(user, obj, expected):
|
||||
url = reverse('api:execution_environment_list')
|
||||
r = get(url, user=user, expect=200)
|
||||
for item in r.data['results']:
|
||||
if item['id'] == obj.pk:
|
||||
assert expected == item['summary_fields']['user_capabilities']
|
||||
break
|
||||
else:
|
||||
raise RuntimeError(f'Could not find expected object ({obj}) in EE list result: {r.data}')
|
||||
|
||||
return _rf
|
||||
|
||||
|
||||
# ___ begin tests ___
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_managed_ee_not_assignable(control_plane_execution_environment, ee_rd, rando, admin_user, post):
|
||||
url = django_reverse('roleuserassignment-list')
|
||||
r = post(url, {'role_definition': ee_rd.pk, 'user': rando.id, 'object_id': control_plane_execution_environment.pk}, user=admin_user, expect=400)
|
||||
assert 'Can not assign object roles to managed Execution Environment' in str(r.data)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_org_member_required_for_assignment(org_ee, ee_rd, rando, admin_user, post):
|
||||
url = django_reverse('roleuserassignment-list')
|
||||
r = post(url, {'role_definition': ee_rd.pk, 'user': rando.id, 'object_id': org_ee.pk}, user=admin_user, expect=400)
|
||||
assert 'User must have view permission to Execution Environment organization' in str(r.data)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_give_object_permission_to_ee(org_ee, ee_rd, org_member, check_user_capabilities):
|
||||
access = ExecutionEnvironmentAccess(org_member)
|
||||
assert access.can_read(org_ee) # by virtue of being an org member
|
||||
assert not access.can_change(org_ee, {'name': 'new'})
|
||||
check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False})
|
||||
|
||||
ee_rd.give_permission(org_member, org_ee)
|
||||
assert access.can_change(org_ee, {'name': 'new'})
|
||||
|
||||
check_user_capabilities(org_member, org_ee, {'edit': True, 'delete': True, 'copy': False})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('style', ['new', 'old'])
|
||||
def test_give_org_permission_to_ee(org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd):
|
||||
access = ExecutionEnvironmentAccess(org_member)
|
||||
assert not access.can_change(org_ee, {'name': 'new'})
|
||||
check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False})
|
||||
|
||||
if style == 'new':
|
||||
org_ee_rd.give_permission(org_member, organization)
|
||||
assert org_member.has_obj_perm(org_ee.organization, 'add_executionenvironment') # sanity
|
||||
else:
|
||||
organization.execution_environment_admin_role.members.add(org_member)
|
||||
|
||||
assert access.can_change(org_ee, {'name': 'new'})
|
||||
check_user_capabilities(org_member, org_ee, {'edit': True, 'delete': True, 'copy': True})
|
||||
Loading…
x
Reference in New Issue
Block a user