Cache organization child evaluations and remove hacks

This commit is contained in:
Alan Rominger
2024-02-24 22:23:39 -05:00
parent f50e597548
commit eb93660b36
5 changed files with 48 additions and 24 deletions

View File

@@ -1805,15 +1805,7 @@ class JobAccess(BaseAccess):
return True
# Standard permissions model without job template involved
# NOTE: this is the best we can do without caching way more permissions
from django.contrib.contenttypes.models import ContentType
filter_kwargs = dict(
content_type_id=ContentType.objects.get_for_model(Organization),
object_id=obj.organization_id,
role_definition__permissions__codename='execute_jobtemplate',
)
if self.user.has_roles.filter(**filter_kwargs).exists():
if obj.organization and self.user in obj.organization.execute_role:
return True
elif not (obj.job_template or obj.organization):
raise PermissionDenied(_('Job has been orphaned from its job template and organization.'))

View File

@@ -13,7 +13,6 @@ from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models.query import QuerySet
from django.db.models.functions import Cast
from django.utils.crypto import get_random_string
from django.utils.translation import gettext_lazy as _
@@ -67,21 +66,16 @@ class ResourceMixin(models.Model):
@staticmethod
def _accessible_pk_qs(cls, accessor, role_field, content_types=None):
if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED:
if role_field not in to_permissions and cls._meta.model_name == 'organization':
# superficial alternative for narrow exceptions with org roles
# I think this mostly applies to organization members, which is not fully defined yet
if cls._meta.model_name == 'organization' and role_field in org_role_to_permission:
# Organization roles can not use the DAB RBAC shortcuts
# like Organization.access_qs(user, 'change_jobtemplate') is needed
# not just Organization.access_qs(user, 'change') is needed
if accessor.is_superuser:
return cls.objects.values_list('id')
from ansible_base.rbac.models import ObjectRole
codename = org_role_to_permission[role_field]
return (
ObjectRole.objects.filter(role_definition__permissions__codename=codename, content_type=ContentType.objects.get_for_model(cls))
.annotate(int_object_id=Cast('object_id', models.IntegerField()))
.values_list('int_object_id')
.distinct()
)
return cls.access_ids_qs(accessor, codename, content_types=content_types)
return cls.access_ids_qs(accessor, to_permissions[role_field], content_types=content_types)
if accessor._meta.model_name == 'user':
ancestor_roles = accessor.roles.all()

View File

@@ -192,10 +192,7 @@ class Role(models.Model):
return True
if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED:
if self.role_field not in to_permissions and self.content_object and self.content_object._meta.model_name == 'organization':
# valid alternative for narrow exceptions with org roles
if self.role_field not in org_role_to_permission:
raise Exception(f'org {self.role_field} evaluated but not a translatable permission')
if self.content_object and self.content_object._meta.model_name == 'organization' and self.role_field in org_role_to_permission:
codename = org_role_to_permission[self.role_field]
return accessor.has_obj_perm(self.content_object, codename)

View File

@@ -1,6 +1,7 @@
import pytest
from awx.main.models.rbac import get_role_from_object_role
from awx.main.models import User, Organization
from ansible_base.rbac.models import RoleUserAssignment
@@ -21,3 +22,40 @@ def test_round_trip_roles(organization, rando, role_name):
print(assignment.role_definition.name)
old_role = get_role_from_object_role(assignment.object_role)
assert old_role.id == getattr(organization, role_name).id
@pytest.mark.django_db
def test_organization_level_permissions(organization, inventory):
u1 = User.objects.create(username='alice')
u2 = User.objects.create(username='bob')
organization.inventory_admin_role.members.add(u1)
organization.workflow_admin_role.members.add(u2)
assert u1 in inventory.admin_role
assert u1 in organization.inventory_admin_role
assert u2 in organization.workflow_admin_role
assert u2 not in organization.inventory_admin_role
assert u1 not in organization.workflow_admin_role
assert not (set(u1.has_roles.all()) & set(u2.has_roles.all())) # user have no roles in common
# Old style
assert set(Organization.accessible_objects(u1, 'inventory_admin_role')) == set([organization])
assert set(Organization.accessible_objects(u2, 'inventory_admin_role')) == set()
assert set(Organization.accessible_objects(u1, 'workflow_admin_role')) == set()
assert set(Organization.accessible_objects(u2, 'workflow_admin_role')) == set([organization])
# New style
assert set(Organization.access_qs(u1, 'add_inventory')) == set([organization])
assert set(Organization.access_qs(u1, 'change_inventory')) == set([organization])
assert set(Organization.access_qs(u2, 'add_inventory')) == set()
assert set(Organization.access_qs(u1, 'add_workflowjobtemplate')) == set()
assert set(Organization.access_qs(u2, 'add_workflowjobtemplate')) == set([organization])
@pytest.mark.django_db
def test_organization_execute_role(organization, rando):
organization.execute_role.members.add(rando)
assert rando in organization.execute_role
assert set(Organization.accessible_objects(rando, 'execute_role')) == set([organization])

View File

@@ -1161,5 +1161,8 @@ ANSIBLE_BASE_CREATOR_DEFAULTS = ['change', 'delete', 'execute', 'use', 'adhoc',
# This is a stopgap, will delete after resource registry integration
ANSIBLE_BASE_SERVICE_PREFIX = "awx"
# Temporary, for old roles API compatibility, save child permissions at organization level
ANSIBLE_BASE_CACHE_PARENT_PERMISSIONS = True
# system username for django-ansible-base
SYSTEM_USERNAME = None