Rewrite more access logic in terms of permissions instead of roles (#15453) (#6661)

* Rewrite more access logic in terms of permissions instead of roles

* Cut down supported logic because that would not work anyway

* Remove methods not needed anymore

* Create managed roles in test before delegating permissions
This commit is contained in:
Alan Rominger
2024-08-26 14:37:52 -04:00
committed by GitHub
parent 01aa760510
commit 77e999f7c8
2 changed files with 28 additions and 52 deletions

View File

@@ -242,9 +242,10 @@ class BaseAccess(object):
return qs return qs
def filtered_queryset(self): def filtered_queryset(self):
# Override in subclasses if permission_registry.is_registered(self.model):
# filter objects according to user's read access return self.model.access_qs(self.user, 'view')
return self.model.objects.none() else:
raise NotImplementedError('Filtered queryset for model is not written')
def can_read(self, obj): def can_read(self, obj):
return bool(obj and self.get_queryset().filter(pk=obj.pk).exists()) return bool(obj and self.get_queryset().filter(pk=obj.pk).exists())
@@ -606,9 +607,6 @@ class InstanceGroupAccess(BaseAccess):
model = InstanceGroup model = InstanceGroup
prefetch_related = ('instances',) prefetch_related = ('instances',)
def filtered_queryset(self):
return self.model.accessible_objects(self.user, 'read_role')
@check_superuser @check_superuser
def can_use(self, obj): def can_use(self, obj):
return self.user in obj.use_role return self.user in obj.use_role
@@ -654,7 +652,7 @@ class UserAccess(BaseAccess):
qs = User.objects.all() qs = User.objects.all()
else: else:
qs = ( qs = (
User.objects.filter(pk__in=Organization.accessible_objects(self.user, 'read_role').values('member_role__members')) User.objects.filter(pk__in=Organization.access_qs(self.user, 'view').values('member_role__members'))
| User.objects.filter(pk=self.user.id) | User.objects.filter(pk=self.user.id)
| User.objects.filter(is_superuser=True) | User.objects.filter(is_superuser=True)
).distinct() ).distinct()
@@ -671,7 +669,7 @@ class UserAccess(BaseAccess):
return True return True
if not settings.MANAGE_ORGANIZATION_AUTH: if not settings.MANAGE_ORGANIZATION_AUTH:
return False return False
return Organization.accessible_objects(self.user, 'admin_role').exists() return Organization.access_qs(self.user, 'change').exists()
def can_change(self, obj, data): def can_change(self, obj, data):
if data is not None and ('is_superuser' in data or 'is_system_auditor' in data): if data is not None and ('is_superuser' in data or 'is_system_auditor' in data):
@@ -691,7 +689,7 @@ class UserAccess(BaseAccess):
""" """
Returns all organizations that count `u` as a member Returns all organizations that count `u` as a member
""" """
return Organization.accessible_objects(u, 'member_role') return Organization.access_qs(u, 'member')
def is_all_org_admin(self, u): def is_all_org_admin(self, u):
""" """
@@ -774,7 +772,7 @@ class OAuth2ApplicationAccess(BaseAccess):
prefetch_related = ('organization', 'oauth2accesstoken_set') prefetch_related = ('organization', 'oauth2accesstoken_set')
def filtered_queryset(self): def filtered_queryset(self):
org_access_qs = Organization.accessible_objects(self.user, 'member_role') org_access_qs = Organization.access_qs(self.user, 'member')
return self.model.objects.filter(organization__in=org_access_qs) return self.model.objects.filter(organization__in=org_access_qs)
def can_change(self, obj, data): def can_change(self, obj, data):
@@ -787,7 +785,7 @@ class OAuth2ApplicationAccess(BaseAccess):
if self.user.is_superuser: if self.user.is_superuser:
return True return True
if not data: if not data:
return Organization.accessible_objects(self.user, 'admin_role').exists() return Organization.access_qs(self.user, 'change').exists()
return self.check_related('organization', Organization, data, role_field='admin_role', mandatory=True) return self.check_related('organization', Organization, data, role_field='admin_role', mandatory=True)
@@ -855,9 +853,6 @@ class OrganizationAccess(NotificationAttachMixin, BaseAccess):
# organization admin_role is not a parent of organization auditor_role # organization admin_role is not a parent of organization auditor_role
notification_attach_roles = ['admin_role', 'auditor_role'] notification_attach_roles = ['admin_role', 'auditor_role']
def filtered_queryset(self):
return self.model.accessible_objects(self.user, 'read_role')
@check_superuser @check_superuser
def can_change(self, obj, data): def can_change(self, obj, data):
if data and data.get('default_environment'): if data and data.get('default_environment'):
@@ -925,9 +920,6 @@ class InventoryAccess(BaseAccess):
Prefetch('labels', queryset=Label.objects.all().order_by('name')), Prefetch('labels', queryset=Label.objects.all().order_by('name')),
) )
def filtered_queryset(self, allowed=None, ad_hoc=None):
return self.model.accessible_objects(self.user, 'read_role')
@check_superuser @check_superuser
def can_use(self, obj): def can_use(self, obj):
return self.user in obj.use_role return self.user in obj.use_role
@@ -936,7 +928,7 @@ class InventoryAccess(BaseAccess):
def can_add(self, data): def can_add(self, data):
# If no data is specified, just checking for generic add permission? # If no data is specified, just checking for generic add permission?
if not data: if not data:
return Organization.accessible_objects(self.user, 'inventory_admin_role').exists() return Organization.access_qs(self.user, 'add_inventory').exists()
return self.check_related('organization', Organization, data, role_field='inventory_admin_role') return self.check_related('organization', Organization, data, role_field='inventory_admin_role')
@check_superuser @check_superuser
@@ -998,7 +990,7 @@ class HostAccess(BaseAccess):
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Inventory.accessible_objects(self.user, 'admin_role').exists() return Inventory.access_qs(self.user, 'change').exists()
# Checks for admin or change permission on inventory. # Checks for admin or change permission on inventory.
if not self.check_related('inventory', Inventory, data): if not self.check_related('inventory', Inventory, data):
@@ -1060,7 +1052,7 @@ class GroupAccess(BaseAccess):
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Inventory.accessible_objects(self.user, 'admin_role').exists() return Inventory.access_qs(self.user, 'change').exists()
if 'inventory' not in data: if 'inventory' not in data:
return False return False
# Checks for admin or change permission on inventory. # Checks for admin or change permission on inventory.
@@ -1102,7 +1094,7 @@ class InventorySourceAccess(NotificationAttachMixin, UnifiedCredentialsMixin, Ba
def can_add(self, data): def can_add(self, data):
if not data or 'inventory' not in data: if not data or 'inventory' not in data:
return Inventory.accessible_objects(self.user, 'admin_role').exists() return Inventory.access_qs(self.user, 'change').exists()
if not self.check_related('source_project', Project, data, role_field='use_role'): if not self.check_related('source_project', Project, data, role_field='use_role'):
return False return False
@@ -1216,9 +1208,6 @@ class CredentialAccess(BaseAccess):
) )
prefetch_related = ('admin_role', 'use_role', 'read_role', 'admin_role__parents', 'admin_role__members', 'credential_type', 'organization') prefetch_related = ('admin_role', 'use_role', 'read_role', 'admin_role__parents', 'admin_role__members', 'credential_type', 'organization')
def filtered_queryset(self):
return self.model.accessible_objects(self.user, 'read_role')
@check_superuser @check_superuser
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
@@ -1329,7 +1318,7 @@ class TeamAccess(BaseAccess):
@check_superuser @check_superuser
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'admin_role').exists() return Organization.access_qs(self.user, 'view').exists()
if not settings.MANAGE_ORGANIZATION_AUTH: if not settings.MANAGE_ORGANIZATION_AUTH:
return False return False
return self.check_related('organization', Organization, data) return self.check_related('organization', Organization, data)
@@ -1400,7 +1389,7 @@ class ExecutionEnvironmentAccess(BaseAccess):
def filtered_queryset(self): def filtered_queryset(self):
return ExecutionEnvironment.objects.filter( return ExecutionEnvironment.objects.filter(
Q(organization__in=Organization.accessible_pk_qs(self.user, 'read_role')) Q(organization__in=Organization.access_ids_qs(self.user, 'view'))
| Q(organization__isnull=True) | Q(organization__isnull=True)
| Q(id__in=ExecutionEnvironment.access_ids_qs(self.user, 'change')) | Q(id__in=ExecutionEnvironment.access_ids_qs(self.user, 'change'))
).distinct() ).distinct()
@@ -1408,7 +1397,7 @@ class ExecutionEnvironmentAccess(BaseAccess):
@check_superuser @check_superuser
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'execution_environment_admin_role').exists() return Organization.access_qs(self.user, 'add_executionenvironment').exists()
return self.check_related('organization', Organization, data, mandatory=True, role_field='execution_environment_admin_role') return self.check_related('organization', Organization, data, mandatory=True, role_field='execution_environment_admin_role')
@check_superuser @check_superuser
@@ -1457,13 +1446,10 @@ class ProjectAccess(NotificationAttachMixin, BaseAccess):
prefetch_related = ('modified_by', 'created_by', 'organization', 'last_job', 'current_job') prefetch_related = ('modified_by', 'created_by', 'organization', 'last_job', 'current_job')
notification_attach_roles = ['admin_role'] notification_attach_roles = ['admin_role']
def filtered_queryset(self):
return self.model.accessible_objects(self.user, 'read_role')
@check_superuser @check_superuser
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'project_admin_role').exists() return Organization.access_qs(self.user, 'add_project').exists()
if data.get('default_environment'): if data.get('default_environment'):
ee = get_object_from_data('default_environment', ExecutionEnvironment, data) ee = get_object_from_data('default_environment', ExecutionEnvironment, data)
@@ -1559,9 +1545,6 @@ class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAc
Prefetch('last_job', queryset=UnifiedJob.objects.non_polymorphic()), Prefetch('last_job', queryset=UnifiedJob.objects.non_polymorphic()),
) )
def filtered_queryset(self):
return self.model.accessible_objects(self.user, 'read_role')
def can_add(self, data): def can_add(self, data):
""" """
a user can create a job template if a user can create a job template if
@@ -1574,7 +1557,7 @@ class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAc
Users who are able to create deploy jobs can also run normal and check (dry run) jobs. Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
""" """
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Project.accessible_objects(self.user, 'use_role').exists() return Project.access_qs(self.user, 'use_project').exists()
# if reference_obj is provided, determine if it can be copied # if reference_obj is provided, determine if it can be copied
reference_obj = data.get('reference_obj', None) reference_obj = data.get('reference_obj', None)
@@ -1765,13 +1748,13 @@ class JobAccess(BaseAccess):
def filtered_queryset(self): def filtered_queryset(self):
qs = self.model.objects qs = self.model.objects
qs_jt = qs.filter(job_template__in=JobTemplate.accessible_objects(self.user, 'read_role')) qs_jt = qs.filter(job_template__in=JobTemplate.access_qs(self.user, 'view'))
org_access_qs = Organization.objects.filter(Q(admin_role__members=self.user) | Q(auditor_role__members=self.user)) org_access_qs = Organization.objects.filter(Q(admin_role__members=self.user) | Q(auditor_role__members=self.user))
if not org_access_qs.exists(): if not org_access_qs.exists():
return qs_jt return qs_jt
return qs.filter(Q(job_template__in=JobTemplate.accessible_objects(self.user, 'read_role')) | Q(organization__in=org_access_qs)).distinct() return qs.filter(Q(job_template__in=JobTemplate.access_qs(self.user, 'view')) | Q(organization__in=org_access_qs)).distinct()
def can_add(self, data, validate_license=True): def can_add(self, data, validate_license=True):
raise NotImplementedError('Direct job creation not possible in v2 API') raise NotImplementedError('Direct job creation not possible in v2 API')
@@ -1972,7 +1955,7 @@ class WorkflowJobTemplateNodeAccess(UnifiedCredentialsMixin, BaseAccess):
prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'workflow_job_template') prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'workflow_job_template')
def filtered_queryset(self): def filtered_queryset(self):
return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.accessible_objects(self.user, 'read_role')) return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.access_qs(self.user, 'view'))
@check_superuser @check_superuser
def can_add(self, data): def can_add(self, data):
@@ -2087,9 +2070,6 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
'read_role', 'read_role',
) )
def filtered_queryset(self):
return self.model.accessible_objects(self.user, 'read_role')
@check_superuser @check_superuser
def can_add(self, data): def can_add(self, data):
""" """
@@ -2100,7 +2080,7 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
Users who are able to create deploy jobs can also run normal and check (dry run) jobs. Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
""" """
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'workflow_admin_role').exists() return Organization.access_qs(self.user, 'add_workflowjobtemplate').exists()
if not self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True): if not self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True):
if data.get('organization', None) is None: if data.get('organization', None) is None:
@@ -2660,13 +2640,13 @@ class NotificationTemplateAccess(BaseAccess):
if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED: if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED:
return self.model.access_qs(self.user, 'view') return self.model.access_qs(self.user, 'view')
return self.model.objects.filter( return self.model.objects.filter(
Q(organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) | Q(organization__in=self.user.auditor_of_organizations) Q(organization__in=Organization.access_qs(self.user, 'add_notificationtemplate')) | Q(organization__in=self.user.auditor_of_organizations)
).distinct() ).distinct()
@check_superuser @check_superuser
def can_add(self, data): def can_add(self, data):
if not data: if not data:
return Organization.accessible_objects(self.user, 'notification_admin_role').exists() return Organization.access_qs(self.user, 'add_notificationtemplate').exists()
return self.check_related('organization', Organization, data, role_field='notification_admin_role', mandatory=True) return self.check_related('organization', Organization, data, role_field='notification_admin_role', mandatory=True)
@check_superuser @check_superuser
@@ -2694,7 +2674,7 @@ class NotificationAccess(BaseAccess):
def filtered_queryset(self): def filtered_queryset(self):
return self.model.objects.filter( return self.model.objects.filter(
Q(notification_template__organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) Q(notification_template__organization__in=Organization.access_qs(self.user, 'add_notificationtemplate'))
| Q(notification_template__organization__in=self.user.auditor_of_organizations) | Q(notification_template__organization__in=self.user.auditor_of_organizations)
).distinct() ).distinct()
@@ -2810,11 +2790,7 @@ class ActivityStreamAccess(BaseAccess):
if credential_set: if credential_set:
q |= Q(credential__in=credential_set) q |= Q(credential__in=credential_set)
auditing_orgs = ( auditing_orgs = (Organization.access_qs(self.user, 'change') | Organization.access_qs(self.user, 'audit')).distinct().values_list('id', flat=True)
(Organization.accessible_objects(self.user, 'admin_role') | Organization.accessible_objects(self.user, 'auditor_role'))
.distinct()
.values_list('id', flat=True)
)
if auditing_orgs: if auditing_orgs:
q |= ( q |= (
Q(user__in=auditing_orgs.values('member_role__members')) Q(user__in=auditing_orgs.values('member_role__members'))

View File

@@ -98,7 +98,7 @@ def test_team_can_have_permission(org_ee, ee_rd, rando, admin_user, post):
@pytest.mark.django_db @pytest.mark.django_db
def test_give_object_permission_to_ee(org_ee, ee_rd, org_member, check_user_capabilities): def test_give_object_permission_to_ee(setup_managed_roles, org_ee, ee_rd, org_member, check_user_capabilities):
access = ExecutionEnvironmentAccess(org_member) access = ExecutionEnvironmentAccess(org_member)
assert access.can_read(org_ee) # by virtue of being an org member assert access.can_read(org_ee) # by virtue of being an org member
assert not access.can_change(org_ee, {'name': 'new'}) assert not access.can_change(org_ee, {'name': 'new'})
@@ -130,7 +130,7 @@ def test_need_related_organization_access(org_ee, ee_rd, org_member):
@pytest.mark.django_db @pytest.mark.django_db
@pytest.mark.parametrize('style', ['new', 'old']) @pytest.mark.parametrize('style', ['new', 'old'])
def test_give_org_permission_to_ee(org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd): def test_give_org_permission_to_ee(setup_managed_roles, org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd):
access = ExecutionEnvironmentAccess(org_member) access = ExecutionEnvironmentAccess(org_member)
assert not access.can_change(org_ee, {'name': 'new'}) assert not access.can_change(org_ee, {'name': 'new'})
check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False}) check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False})