mirror of
https://github.com/ansible/awx.git
synced 2026-01-09 23:12:08 -03:30
Rewrite more access logic in terms of permissions instead of roles (#15453)
* Rewrite more access logic in terms of permissions instead of roles * Cut down supported logic because that would not work anyway * Remove methods not needed anymore * Create managed roles in test before delegating permissions
This commit is contained in:
parent
500b1c47ba
commit
af537b5261
@ -242,9 +242,10 @@ class BaseAccess(object):
|
||||
return qs
|
||||
|
||||
def filtered_queryset(self):
|
||||
# Override in subclasses
|
||||
# filter objects according to user's read access
|
||||
return self.model.objects.none()
|
||||
if permission_registry.is_registered(self.model):
|
||||
return self.model.access_qs(self.user, 'view')
|
||||
else:
|
||||
raise NotImplementedError('Filtered queryset for model is not written')
|
||||
|
||||
def can_read(self, obj):
|
||||
return bool(obj and self.get_queryset().filter(pk=obj.pk).exists())
|
||||
@ -606,9 +607,6 @@ class InstanceGroupAccess(BaseAccess):
|
||||
model = InstanceGroup
|
||||
prefetch_related = ('instances',)
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.accessible_objects(self.user, 'read_role')
|
||||
|
||||
@check_superuser
|
||||
def can_use(self, obj):
|
||||
return self.user in obj.use_role
|
||||
@ -654,7 +652,7 @@ class UserAccess(BaseAccess):
|
||||
qs = User.objects.all()
|
||||
else:
|
||||
qs = (
|
||||
User.objects.filter(pk__in=Organization.accessible_objects(self.user, 'read_role').values('member_role__members'))
|
||||
User.objects.filter(pk__in=Organization.access_qs(self.user, 'view').values('member_role__members'))
|
||||
| User.objects.filter(pk=self.user.id)
|
||||
| User.objects.filter(is_superuser=True)
|
||||
).distinct()
|
||||
@ -671,7 +669,7 @@ class UserAccess(BaseAccess):
|
||||
return True
|
||||
if not settings.MANAGE_ORGANIZATION_AUTH:
|
||||
return False
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'change').exists()
|
||||
|
||||
def can_change(self, obj, data):
|
||||
if data is not None and ('is_superuser' in data or 'is_system_auditor' in data):
|
||||
@ -691,7 +689,7 @@ class UserAccess(BaseAccess):
|
||||
"""
|
||||
Returns all organizations that count `u` as a member
|
||||
"""
|
||||
return Organization.accessible_objects(u, 'member_role')
|
||||
return Organization.access_qs(u, 'member')
|
||||
|
||||
def is_all_org_admin(self, u):
|
||||
"""
|
||||
@ -774,7 +772,7 @@ class OAuth2ApplicationAccess(BaseAccess):
|
||||
prefetch_related = ('organization', 'oauth2accesstoken_set')
|
||||
|
||||
def filtered_queryset(self):
|
||||
org_access_qs = Organization.accessible_objects(self.user, 'member_role')
|
||||
org_access_qs = Organization.access_qs(self.user, 'member')
|
||||
return self.model.objects.filter(organization__in=org_access_qs)
|
||||
|
||||
def can_change(self, obj, data):
|
||||
@ -787,7 +785,7 @@ class OAuth2ApplicationAccess(BaseAccess):
|
||||
if self.user.is_superuser:
|
||||
return True
|
||||
if not data:
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'change').exists()
|
||||
return self.check_related('organization', Organization, data, role_field='admin_role', mandatory=True)
|
||||
|
||||
|
||||
@ -855,9 +853,6 @@ class OrganizationAccess(NotificationAttachMixin, BaseAccess):
|
||||
# organization admin_role is not a parent of organization auditor_role
|
||||
notification_attach_roles = ['admin_role', 'auditor_role']
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.accessible_objects(self.user, 'read_role')
|
||||
|
||||
@check_superuser
|
||||
def can_change(self, obj, data):
|
||||
if data and data.get('default_environment'):
|
||||
@ -925,9 +920,6 @@ class InventoryAccess(BaseAccess):
|
||||
Prefetch('labels', queryset=Label.objects.all().order_by('name')),
|
||||
)
|
||||
|
||||
def filtered_queryset(self, allowed=None, ad_hoc=None):
|
||||
return self.model.accessible_objects(self.user, 'read_role')
|
||||
|
||||
@check_superuser
|
||||
def can_use(self, obj):
|
||||
return self.user in obj.use_role
|
||||
@ -936,7 +928,7 @@ class InventoryAccess(BaseAccess):
|
||||
def can_add(self, data):
|
||||
# If no data is specified, just checking for generic add permission?
|
||||
if not data:
|
||||
return Organization.accessible_objects(self.user, 'inventory_admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'add_inventory').exists()
|
||||
return self.check_related('organization', Organization, data, role_field='inventory_admin_role')
|
||||
|
||||
@check_superuser
|
||||
@ -998,7 +990,7 @@ class HostAccess(BaseAccess):
|
||||
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Inventory.accessible_objects(self.user, 'admin_role').exists()
|
||||
return Inventory.access_qs(self.user, 'change').exists()
|
||||
|
||||
# Checks for admin or change permission on inventory.
|
||||
if not self.check_related('inventory', Inventory, data):
|
||||
@ -1060,7 +1052,7 @@ class GroupAccess(BaseAccess):
|
||||
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Inventory.accessible_objects(self.user, 'admin_role').exists()
|
||||
return Inventory.access_qs(self.user, 'change').exists()
|
||||
if 'inventory' not in data:
|
||||
return False
|
||||
# Checks for admin or change permission on inventory.
|
||||
@ -1102,7 +1094,7 @@ class InventorySourceAccess(NotificationAttachMixin, UnifiedCredentialsMixin, Ba
|
||||
|
||||
def can_add(self, data):
|
||||
if not data or 'inventory' not in data:
|
||||
return Inventory.accessible_objects(self.user, 'admin_role').exists()
|
||||
return Inventory.access_qs(self.user, 'change').exists()
|
||||
|
||||
if not self.check_related('source_project', Project, data, role_field='use_role'):
|
||||
return False
|
||||
@ -1216,9 +1208,6 @@ class CredentialAccess(BaseAccess):
|
||||
)
|
||||
prefetch_related = ('admin_role', 'use_role', 'read_role', 'admin_role__parents', 'admin_role__members', 'credential_type', 'organization')
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.accessible_objects(self.user, 'read_role')
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
@ -1329,7 +1318,7 @@ class TeamAccess(BaseAccess):
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'view').exists()
|
||||
if not settings.MANAGE_ORGANIZATION_AUTH:
|
||||
return False
|
||||
return self.check_related('organization', Organization, data)
|
||||
@ -1400,7 +1389,7 @@ class ExecutionEnvironmentAccess(BaseAccess):
|
||||
|
||||
def filtered_queryset(self):
|
||||
return ExecutionEnvironment.objects.filter(
|
||||
Q(organization__in=Organization.accessible_pk_qs(self.user, 'read_role'))
|
||||
Q(organization__in=Organization.access_ids_qs(self.user, 'view'))
|
||||
| Q(organization__isnull=True)
|
||||
| Q(id__in=ExecutionEnvironment.access_ids_qs(self.user, 'change'))
|
||||
).distinct()
|
||||
@ -1408,7 +1397,7 @@ class ExecutionEnvironmentAccess(BaseAccess):
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Organization.accessible_objects(self.user, 'execution_environment_admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'add_executionenvironment').exists()
|
||||
return self.check_related('organization', Organization, data, mandatory=True, role_field='execution_environment_admin_role')
|
||||
|
||||
@check_superuser
|
||||
@ -1457,13 +1446,10 @@ class ProjectAccess(NotificationAttachMixin, BaseAccess):
|
||||
prefetch_related = ('modified_by', 'created_by', 'organization', 'last_job', 'current_job')
|
||||
notification_attach_roles = ['admin_role']
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.accessible_objects(self.user, 'read_role')
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Organization.accessible_objects(self.user, 'project_admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'add_project').exists()
|
||||
|
||||
if data.get('default_environment'):
|
||||
ee = get_object_from_data('default_environment', ExecutionEnvironment, data)
|
||||
@ -1559,9 +1545,6 @@ class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAc
|
||||
Prefetch('last_job', queryset=UnifiedJob.objects.non_polymorphic()),
|
||||
)
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.accessible_objects(self.user, 'read_role')
|
||||
|
||||
def can_add(self, data):
|
||||
"""
|
||||
a user can create a job template if
|
||||
@ -1574,7 +1557,7 @@ class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAc
|
||||
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
|
||||
"""
|
||||
if not data: # So the browseable API will work
|
||||
return Project.accessible_objects(self.user, 'use_role').exists()
|
||||
return Project.access_qs(self.user, 'use_project').exists()
|
||||
|
||||
# if reference_obj is provided, determine if it can be copied
|
||||
reference_obj = data.get('reference_obj', None)
|
||||
@ -1765,13 +1748,13 @@ class JobAccess(BaseAccess):
|
||||
def filtered_queryset(self):
|
||||
qs = self.model.objects
|
||||
|
||||
qs_jt = qs.filter(job_template__in=JobTemplate.accessible_objects(self.user, 'read_role'))
|
||||
qs_jt = qs.filter(job_template__in=JobTemplate.access_qs(self.user, 'view'))
|
||||
|
||||
org_access_qs = Organization.objects.filter(Q(admin_role__members=self.user) | Q(auditor_role__members=self.user))
|
||||
if not org_access_qs.exists():
|
||||
return qs_jt
|
||||
|
||||
return qs.filter(Q(job_template__in=JobTemplate.accessible_objects(self.user, 'read_role')) | Q(organization__in=org_access_qs)).distinct()
|
||||
return qs.filter(Q(job_template__in=JobTemplate.access_qs(self.user, 'view')) | Q(organization__in=org_access_qs)).distinct()
|
||||
|
||||
def can_add(self, data, validate_license=True):
|
||||
raise NotImplementedError('Direct job creation not possible in v2 API')
|
||||
@ -1972,7 +1955,7 @@ class WorkflowJobTemplateNodeAccess(UnifiedCredentialsMixin, BaseAccess):
|
||||
prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'workflow_job_template')
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.accessible_objects(self.user, 'read_role'))
|
||||
return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.access_qs(self.user, 'view'))
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
@ -2087,9 +2070,6 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
|
||||
'read_role',
|
||||
)
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.accessible_objects(self.user, 'read_role')
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
"""
|
||||
@ -2100,7 +2080,7 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
|
||||
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
|
||||
"""
|
||||
if not data: # So the browseable API will work
|
||||
return Organization.accessible_objects(self.user, 'workflow_admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'add_workflowjobtemplate').exists()
|
||||
|
||||
if not self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True):
|
||||
if data.get('organization', None) is None:
|
||||
@ -2660,13 +2640,13 @@ class NotificationTemplateAccess(BaseAccess):
|
||||
if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED:
|
||||
return self.model.access_qs(self.user, 'view')
|
||||
return self.model.objects.filter(
|
||||
Q(organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) | Q(organization__in=self.user.auditor_of_organizations)
|
||||
Q(organization__in=Organization.access_qs(self.user, 'add_notificationtemplate')) | Q(organization__in=self.user.auditor_of_organizations)
|
||||
).distinct()
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
if not data:
|
||||
return Organization.accessible_objects(self.user, 'notification_admin_role').exists()
|
||||
return Organization.access_qs(self.user, 'add_notificationtemplate').exists()
|
||||
return self.check_related('organization', Organization, data, role_field='notification_admin_role', mandatory=True)
|
||||
|
||||
@check_superuser
|
||||
@ -2694,7 +2674,7 @@ class NotificationAccess(BaseAccess):
|
||||
|
||||
def filtered_queryset(self):
|
||||
return self.model.objects.filter(
|
||||
Q(notification_template__organization__in=Organization.accessible_objects(self.user, 'notification_admin_role'))
|
||||
Q(notification_template__organization__in=Organization.access_qs(self.user, 'add_notificationtemplate'))
|
||||
| Q(notification_template__organization__in=self.user.auditor_of_organizations)
|
||||
).distinct()
|
||||
|
||||
@ -2810,11 +2790,7 @@ class ActivityStreamAccess(BaseAccess):
|
||||
if credential_set:
|
||||
q |= Q(credential__in=credential_set)
|
||||
|
||||
auditing_orgs = (
|
||||
(Organization.accessible_objects(self.user, 'admin_role') | Organization.accessible_objects(self.user, 'auditor_role'))
|
||||
.distinct()
|
||||
.values_list('id', flat=True)
|
||||
)
|
||||
auditing_orgs = (Organization.access_qs(self.user, 'change') | Organization.access_qs(self.user, 'audit')).distinct().values_list('id', flat=True)
|
||||
if auditing_orgs:
|
||||
q |= (
|
||||
Q(user__in=auditing_orgs.values('member_role__members'))
|
||||
|
||||
@ -98,7 +98,7 @@ def test_team_can_have_permission(org_ee, ee_rd, rando, admin_user, post):
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_give_object_permission_to_ee(org_ee, ee_rd, org_member, check_user_capabilities):
|
||||
def test_give_object_permission_to_ee(setup_managed_roles, org_ee, ee_rd, org_member, check_user_capabilities):
|
||||
access = ExecutionEnvironmentAccess(org_member)
|
||||
assert access.can_read(org_ee) # by virtue of being an org member
|
||||
assert not access.can_change(org_ee, {'name': 'new'})
|
||||
@ -130,7 +130,7 @@ def test_need_related_organization_access(org_ee, ee_rd, org_member):
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('style', ['new', 'old'])
|
||||
def test_give_org_permission_to_ee(org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd):
|
||||
def test_give_org_permission_to_ee(setup_managed_roles, org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd):
|
||||
access = ExecutionEnvironmentAccess(org_member)
|
||||
assert not access.can_change(org_ee, {'name': 'new'})
|
||||
check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user