AAP-48392 Handle DAB RBAC either before or after new type model (for merge) (#16045)

* Handle DAB RBAC either before or after new type model

* Translate CT to DAB CT

* Fixes for content type switch

* Use more compatible coding pattern

* Deeper purge of content_type_id

* revert, turns out that did not work

* More content type replacements

* Revert changes to serializer

* Revert another content_type change

* Fix for rearrangement of post_migration methods

* Remove thing I am not going to do

* Revert branch pin that was temporary
This commit is contained in:
Alan Rominger 2025-07-02 14:28:43 -04:00 committed by GitHub
parent d6482d3898
commit bf0567ca41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 47 additions and 26 deletions

View File

@ -56,6 +56,7 @@ from wsgiref.util import FileWrapper
# django-ansible-base
from ansible_base.lib.utils.requests import get_remote_hosts
from ansible_base.rbac.models import RoleEvaluation, ObjectRole
from ansible_base.rbac import permission_registry
# AWX
from awx.main.tasks.system import send_notifications, update_inventory_computed_fields
@ -750,8 +751,8 @@ class TeamProjectsList(SubListAPIView):
def get_queryset(self):
team = self.get_parent_object()
self.check_parent_access(team)
model_ct = ContentType.objects.get_for_model(self.model)
parent_ct = ContentType.objects.get_for_model(self.parent_model)
model_ct = permission_registry.content_type_model.objects.get_for_model(self.model)
parent_ct = permission_registry.content_type_model.objects.get_for_model(self.parent_model)
rd = get_role_definition(team.member_role)
role = ObjectRole.objects.filter(object_id=team.id, content_type=parent_ct, role_definition=rd).first()

View File

@ -17,7 +17,13 @@ logger = logging.getLogger('awx.main.migrations._dab_rbac')
def create_permissions_as_operation(apps, schema_editor):
# NOTE: the DAB ContentType changes adjusted how they fire
# before they would fire on every app config, like contenttypes
create_dab_permissions(global_apps.get_app_config("main"), apps=apps)
# This changed to only fire once and do a global creation
# so we need to call it for specifically the dab_rbac app
# multiple calls will not hurt anything
create_dab_permissions(global_apps.get_app_config("dab_rbac"), apps=apps)
"""
@ -112,7 +118,12 @@ def get_descendents(f, children_map):
def get_permissions_for_role(role_field, children_map, apps):
Permission = apps.get_model('dab_rbac', 'DABPermission')
ContentType = apps.get_model('contenttypes', 'ContentType')
try:
# After migration for remote permissions
ContentType = apps.get_model('dab_rbac', 'DABContentType')
except LookupError:
# If using DAB from before remote permissions are implemented
ContentType = apps.get_model('contenttypes', 'ContentType')
perm_list = []
for child_field in get_descendents(role_field, children_map):
@ -281,7 +292,13 @@ def setup_managed_role_definitions(apps, schema_editor):
'special': '{cls.__name__} {action}',
}
ContentType = apps.get_model('contenttypes', 'ContentType')
try:
# After migration for remote permissions
ContentType = apps.get_model('dab_rbac', 'DABContentType')
except LookupError:
# If using DAB from before remote permissions are implemented
ContentType = apps.get_model('contenttypes', 'ContentType')
Permission = apps.get_model('dab_rbac', 'DABPermission')
RoleDefinition = apps.get_model('dab_rbac', 'RoleDefinition')
Organization = apps.get_model(settings.ANSIBLE_BASE_ORGANIZATION_MODEL)

View File

@ -86,7 +86,7 @@ class ResourceMixin(models.Model):
raise RuntimeError(f'Role filters only valid for users and ancestor role, received {accessor}')
if content_types is None:
ct_kwarg = dict(content_type_id=ContentType.objects.get_for_model(cls).id)
ct_kwarg = dict(content_type=ContentType.objects.get_for_model(cls))
else:
ct_kwarg = dict(content_type_id__in=content_types)

View File

@ -27,6 +27,7 @@ from django.conf import settings
# Ansible_base app
from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment, RoleTeamAssignment
from ansible_base.rbac import permission_registry
from ansible_base.lib.utils.models import get_type_for_model
# AWX
@ -561,7 +562,7 @@ def get_role_definition(role):
model_print = type(obj).__name__
perm_list = get_role_codenames(role)
defaults = {
'content_type_id': role.content_type_id,
'content_type': permission_registry.content_type_model.objects.get_by_natural_key(role.content_type.app_label, role.content_type.model),
'description': f'Has {action_name.title()} permission to {model_print} for backwards API compatibility',
}
# use Controller-specific role definitions for Team/Organization and member/admin

View File

@ -34,6 +34,7 @@ from polymorphic.models import PolymorphicModel
from ansible_base.lib.utils.models import prevent_search, get_type_for_model
from ansible_base.rbac import permission_registry
from ansible_base.rbac.models import RoleEvaluation
# AWX
from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel
@ -218,20 +219,21 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
# do not use this if in a subclass
if cls != UnifiedJobTemplate:
return super(UnifiedJobTemplate, cls).accessible_pk_qs(accessor, role_field)
from ansible_base.rbac.models import RoleEvaluation
action = to_permissions[role_field]
# Special condition for super auditor
role_subclasses = cls._submodels_with_roles()
role_cts = ContentType.objects.get_for_models(*role_subclasses).values()
all_codenames = {f'{action}_{cls._meta.model_name}' for cls in role_subclasses}
if not (all_codenames - accessor.singleton_permissions()):
role_cts = ContentType.objects.get_for_models(*role_subclasses).values()
qs = cls.objects.filter(polymorphic_ctype__in=role_cts)
return qs.values_list('id', flat=True)
dab_role_cts = permission_registry.content_type_model.objects.get_for_models(*role_subclasses).values()
return (
RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__in=all_codenames, content_type_id__in=[ct.id for ct in role_cts])
RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__in=all_codenames, content_type_id__in=[ct.id for ct in dab_role_cts])
.values_list('object_id')
.distinct()
)

View File

@ -1,6 +1,5 @@
import pytest
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse as django_reverse
from awx.api.versioning import reverse
@ -8,13 +7,14 @@ from awx.main.models import JobTemplate, Inventory, Organization
from awx.main.access import JobTemplateAccess, WorkflowJobTemplateAccess
from ansible_base.rbac.models import RoleDefinition
from ansible_base.rbac import permission_registry
@pytest.mark.django_db
def test_managed_roles_created(setup_managed_roles):
"Managed RoleDefinitions are created in post_migration signal, we expect to see them here"
for cls in (JobTemplate, Inventory):
ct = ContentType.objects.get_for_model(cls)
ct = permission_registry.content_type_model.objects.get_for_model(cls)
rds = list(RoleDefinition.objects.filter(content_type=ct))
assert len(rds) > 1
assert f'{cls.__name__} Admin' in [rd.name for rd in rds]
@ -30,7 +30,7 @@ def test_custom_read_role(admin_user, post, setup_managed_roles):
)
rd_id = resp.data['id']
rd = RoleDefinition.objects.get(id=rd_id)
assert rd.content_type == ContentType.objects.get_for_model(Inventory)
assert rd.content_type == permission_registry.content_type_model.objects.get_for_model(Inventory)
@pytest.mark.django_db
@ -71,7 +71,7 @@ def test_assign_custom_delete_role(admin_user, rando, inventory, delete, patch):
rd, _ = RoleDefinition.objects.get_or_create(
name='inventory-delete',
permissions=['delete_inventory', 'view_inventory', 'change_inventory'],
content_type=ContentType.objects.get_for_model(Inventory),
content_type=permission_registry.content_type_model.objects.get_for_model(Inventory),
)
rd.give_permission(rando, inventory)
inv_id = inventory.pk
@ -85,7 +85,9 @@ def test_assign_custom_delete_role(admin_user, rando, inventory, delete, patch):
@pytest.mark.django_db
def test_assign_custom_add_role(admin_user, rando, organization, post, setup_managed_roles):
rd, _ = RoleDefinition.objects.get_or_create(
name='inventory-add', permissions=['add_inventory', 'view_organization'], content_type=ContentType.objects.get_for_model(Organization)
name='inventory-add',
permissions=['add_inventory', 'view_organization'],
content_type=permission_registry.content_type_model.objects.get_for_model(Organization),
)
rd.give_permission(rando, organization)
url = reverse('api:inventory_list')

View File

@ -3,8 +3,6 @@ import json
import pytest
from django.contrib.contenttypes.models import ContentType
from crum import impersonate
from awx.main.fields import ImplicitRoleField
@ -60,7 +58,7 @@ def test_role_migration_matches(request, model, setup_managed_roles):
new_codenames = set(rd.permissions.values_list('codename', flat=True))
# all the old roles should map to a non-Compat role definition
if 'Compat' not in rd.name:
model_rds = RoleDefinition.objects.filter(content_type=ContentType.objects.get_for_model(obj))
model_rds = RoleDefinition.objects.filter(content_type=permission_registry.content_type_model.objects.get_for_model(obj))
rd_data = {}
for rd in model_rds:
rd_data[rd.name] = list(rd.permissions.values_list('codename', flat=True))
@ -76,7 +74,7 @@ def test_role_migration_matches(request, model, setup_managed_roles):
@pytest.mark.django_db
def test_role_naming(setup_managed_roles):
qs = RoleDefinition.objects.filter(content_type=ContentType.objects.get(model='jobtemplate'), name__endswith='dmin')
qs = RoleDefinition.objects.filter(content_type=permission_registry.content_type_model.objects.get(model='jobtemplate'), name__endswith='dmin')
assert qs.count() == 1 # sanity
rd = qs.first()
assert rd.name == 'JobTemplate Admin'
@ -86,7 +84,7 @@ def test_role_naming(setup_managed_roles):
@pytest.mark.django_db
def test_action_role_naming(setup_managed_roles):
qs = RoleDefinition.objects.filter(content_type=ContentType.objects.get(model='jobtemplate'), name__endswith='ecute')
qs = RoleDefinition.objects.filter(content_type=permission_registry.content_type_model.objects.get(model='jobtemplate'), name__endswith='ecute')
assert qs.count() == 1 # sanity
rd = qs.first()
assert rd.name == 'JobTemplate Execute'
@ -98,7 +96,7 @@ def test_action_role_naming(setup_managed_roles):
def test_compat_role_naming(setup_managed_roles, job_template, rando, alice):
with impersonate(alice):
job_template.read_role.members.add(rando)
qs = RoleDefinition.objects.filter(content_type=ContentType.objects.get(model='jobtemplate'), name__endswith='ompat')
qs = RoleDefinition.objects.filter(content_type=permission_registry.content_type_model.objects.get(model='jobtemplate'), name__endswith='ompat')
assert qs.count() == 1 # sanity
rd = qs.first()
assert rd.name == 'JobTemplate Read Compat'

View File

@ -1,7 +1,5 @@
import pytest
from django.contrib.contenttypes.models import ContentType
from awx.main.access import ExecutionEnvironmentAccess
from awx.main.models import ExecutionEnvironment, Organization, Team
from awx.main.models.rbac import get_role_codenames
@ -10,6 +8,7 @@ from awx.api.versioning import reverse
from django.urls import reverse as django_reverse
from ansible_base.rbac.models import RoleDefinition
from ansible_base.rbac import permission_registry
@pytest.fixture
@ -17,7 +16,7 @@ def ee_rd():
return RoleDefinition.objects.create_from_permissions(
name='EE object admin',
permissions=['change_executionenvironment', 'delete_executionenvironment'],
content_type=ContentType.objects.get_for_model(ExecutionEnvironment),
content_type=permission_registry.content_type_model.objects.get_for_model(ExecutionEnvironment),
)
@ -26,7 +25,7 @@ def org_ee_rd():
return RoleDefinition.objects.create_from_permissions(
name='EE org admin',
permissions=['add_executionenvironment', 'change_executionenvironment', 'delete_executionenvironment', 'view_organization'],
content_type=ContentType.objects.get_for_model(Organization),
content_type=permission_registry.content_type_model.objects.get_for_model(Organization),
)

View File

@ -18,6 +18,8 @@ import pytest
from ansible.module_utils.six import raise_from
from ansible_base.rbac.models import RoleDefinition, DABPermission
from ansible_base.rbac import permission_registry
from awx.main.tests.conftest import load_all_credentials # noqa: F401; pylint: disable=unused-import
from awx.main.tests.functional.conftest import _request
from awx.main.tests.functional.conftest import credentialtype_scm, credentialtype_ssh # noqa: F401; pylint: disable=unused-import
@ -37,7 +39,6 @@ from awx.main.models import (
)
from django.db import transaction
from django.contrib.contenttypes.models import ContentType
HAS_TOWER_CLI = False
@ -342,7 +343,7 @@ def notification_template(organization):
@pytest.fixture
def job_template_role_definition():
rd = RoleDefinition.objects.create(name='test_view_jt', content_type=ContentType.objects.get_for_model(JobTemplate))
rd = RoleDefinition.objects.create(name='test_view_jt', content_type=permission_registry.content_type_model.objects.get_for_model(JobTemplate))
permission_codenames = ['view_jobtemplate', 'execute_jobtemplate']
permissions = DABPermission.objects.filter(codename__in=permission_codenames)
rd.permissions.add(*permissions)