AAP-53980 Disconnect logic to fill in role parents (#15462)

* Disconnect logic to fill in role parents

Get tests passing hopefully

Whatever SonarCloud

* remove role parents/children endpoints and related views

* remove duplicate get_queryset method from RoleTeamsList

---------

Co-authored-by: Peter Braun <pbraun@redhat.com>
This commit is contained in:
Alan Rominger 2025-10-02 07:06:37 -04:00 committed by GitHub
parent 2729076f7f
commit 622f6ea166
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 23 additions and 356 deletions

View File

@ -3,7 +3,7 @@
from django.urls import re_path
from awx.api.views import RoleList, RoleDetail, RoleUsersList, RoleTeamsList, RoleParentsList, RoleChildrenList
from awx.api.views import RoleList, RoleDetail, RoleUsersList, RoleTeamsList
urls = [
@ -11,8 +11,6 @@ urls = [
re_path(r'^(?P<pk>[0-9]+)/$', RoleDetail.as_view(), name='role_detail'),
re_path(r'^(?P<pk>[0-9]+)/users/$', RoleUsersList.as_view(), name='role_users_list'),
re_path(r'^(?P<pk>[0-9]+)/teams/$', RoleTeamsList.as_view(), name='role_teams_list'),
re_path(r'^(?P<pk>[0-9]+)/parents/$', RoleParentsList.as_view(), name='role_parents_list'),
re_path(r'^(?P<pk>[0-9]+)/children/$', RoleChildrenList.as_view(), name='role_children_list'),
]
__all__ = ['urls']

View File

@ -4247,34 +4247,6 @@ class RoleTeamsList(SubListAttachDetachAPIView):
return Response(status=status.HTTP_204_NO_CONTENT)
class RoleParentsList(SubListAPIView):
deprecated = True
model = models.Role
serializer_class = serializers.RoleSerializer
parent_model = models.Role
relationship = 'parents'
permission_classes = (IsAuthenticated,)
search_fields = ('role_field', 'content_type__model')
def get_queryset(self):
role = models.Role.objects.get(pk=self.kwargs['pk'])
return models.Role.filter_visible_roles(self.request.user, role.parents.all())
class RoleChildrenList(SubListAPIView):
deprecated = True
model = models.Role
serializer_class = serializers.RoleSerializer
parent_model = models.Role
relationship = 'children'
permission_classes = (IsAuthenticated,)
search_fields = ('role_field', 'content_type__model')
def get_queryset(self):
role = models.Role.objects.get(pk=self.kwargs['pk'])
return models.Role.filter_visible_roles(self.request.user, role.children.all())
# Create view functions for all of the class-based views to simplify inclusion
# in URL patterns and reverse URL lookups, converting CamelCase names to
# lowercase_with_underscore (e.g. MyView.as_view() becomes my_view).

View File

@ -14,21 +14,14 @@ from jinja2.exceptions import UndefinedError, TemplateSyntaxError, SecurityError
# Django
from django.core import exceptions as django_exceptions
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models.signals import (
post_save,
post_delete,
)
from django.db.models.signals import m2m_changed
from django.db.models.signals import m2m_changed, post_save
from django.db import models
from django.db.models.fields.related import lazy_related_operation
from django.db.models.fields.related_descriptors import (
ReverseOneToOneDescriptor,
ForwardManyToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
create_forward_many_to_many_manager,
)
from django.utils.encoding import smart_str
from django.db.models import JSONField
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
@ -54,7 +47,6 @@ __all__ = [
'ImplicitRoleField',
'SmartFilterField',
'OrderedManyToManyField',
'update_role_parentage_for_instance',
'is_implicit_parent',
]
@ -146,34 +138,6 @@ class AutoOneToOneField(models.OneToOneField):
setattr(cls, related.get_accessor_name(), AutoSingleRelatedObjectDescriptor(related))
def resolve_role_field(obj, field):
ret = []
field_components = field.split('.', 1)
if hasattr(obj, field_components[0]):
obj = getattr(obj, field_components[0])
else:
return []
if obj is None:
return []
if len(field_components) == 1:
# use extremely generous duck typing to accomidate all possible forms
# of the model that may be used during various migrations
if obj._meta.model_name != 'role' or obj._meta.app_label != 'main':
raise Exception(smart_str('{} refers to a {}, not a Role'.format(field, type(obj))))
ret.append(obj.id)
else:
if type(obj) is ManyToManyDescriptor:
for o in obj.all():
ret += resolve_role_field(o, field_components[1])
else:
ret += resolve_role_field(obj, field_components[1])
return ret
def is_implicit_parent(parent_role, child_role):
"""
Determine if the parent_role is an implicit parent as defined by
@ -210,34 +174,6 @@ def is_implicit_parent(parent_role, child_role):
return False
def update_role_parentage_for_instance(instance):
"""update_role_parentage_for_instance
updates the parents listing for all the roles
of a given instance if they have changed
"""
parents_removed = set()
parents_added = set()
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = set(json.loads(cur_role.implicit_parents))
new_parents = implicit_role_field._resolve_parent_roles(instance)
removals = original_parents - new_parents
if removals:
cur_role.parents.remove(*list(removals))
parents_removed.add(cur_role.pk)
additions = new_parents - original_parents
if additions:
cur_role.parents.add(*list(additions))
parents_added.add(cur_role.pk)
new_parents_list = list(new_parents)
new_parents_list.sort()
new_parents_json = json.dumps(new_parents_list)
if cur_role.implicit_parents != new_parents_json:
cur_role.implicit_parents = new_parents_json
cur_role.save(update_fields=['implicit_parents'])
return (parents_added, parents_removed)
class ImplicitRoleDescriptor(ForwardManyToOneDescriptor):
pass
@ -269,65 +205,6 @@ class ImplicitRoleField(models.ForeignKey):
getattr(cls, '__implicit_role_fields').append(self)
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True, dispatch_uid='implicit-role-post-delete')
function = lambda local, related, field: self.bind_m2m_changed(field, related, local)
lazy_related_operation(function, cls, "self", field=self)
def bind_m2m_changed(self, _self, _role_class, cls):
if not self.parent_role:
return
field_names = self.parent_role
if type(field_names) is not list:
field_names = [field_names]
for field_name in field_names:
if field_name.startswith('singleton:'):
continue
field_name, sep, field_attr = field_name.partition('.')
# Non existent fields will occur if ever a parent model is
# moved inside a migration, needed for job_template_organization_field
# migration in particular
# consistency is assured by unit test awx.main.tests.functional
field = getattr(cls, field_name, None)
if field and type(field) is ReverseManyToOneDescriptor or type(field) is ManyToManyDescriptor:
if '.' in field_attr:
raise Exception('Referencing deep roles through ManyToMany fields is unsupported.')
if type(field) is ReverseManyToOneDescriptor:
sender = field.through
else:
sender = field.related.through
reverse = type(field) is ManyToManyDescriptor
m2m_changed.connect(self.m2m_update(field_attr, reverse), sender, weak=False)
def m2m_update(self, field_attr, _reverse):
def _m2m_update(instance, action, model, pk_set, reverse, **kwargs):
if action == 'post_add' or action == 'pre_remove':
if _reverse:
reverse = not reverse
if reverse:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, field_attr).children.add(getattr(obj, self.name))
if action == 'pre_remove':
getattr(instance, field_attr).children.remove(getattr(obj, self.name))
else:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, self.name).parents.add(getattr(obj, field_attr))
if action == 'pre_remove':
getattr(instance, self.name).parents.remove(getattr(obj, field_attr))
return _m2m_update
def _post_save(self, instance, created, *args, **kwargs):
Role_ = utils.get_current_apps().get_model('main', 'Role')
@ -337,10 +214,6 @@ class ImplicitRoleField(models.ForeignKey):
Model = utils.get_current_apps().get_model('main', instance.__class__.__name__)
latest_instance = Model.objects.get(pk=instance.pk)
# Avoid circular import
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
with batch_role_ancestor_rebuilding():
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(latest_instance.__class__, '__implicit_role_fields'):
@ -357,49 +230,9 @@ class ImplicitRoleField(models.ForeignKey):
updates[role.role_field] = role.id
role_ids.append(role.id)
type(latest_instance).objects.filter(pk=latest_instance.pk).update(**updates)
Role.rebuild_role_ancestor_list(role_ids, [])
update_role_parentage_for_instance(latest_instance)
instance.refresh_from_db()
def _resolve_parent_roles(self, instance):
if not self.parent_role:
return set()
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
parent_roles = set()
for path in paths:
if path.startswith("singleton:"):
singleton_name = path[10:]
Role_ = utils.get_current_apps().get_model('main', 'Role')
qs = Role_.objects.filter(singleton_name=singleton_name)
if qs.count() >= 1:
role = qs[0]
else:
role = Role_.objects.create(singleton_name=singleton_name, role_field=singleton_name)
parents = [role.id]
else:
parents = resolve_role_field(instance, path)
for parent in parents:
parent_roles.add(parent)
return parent_roles
def _post_delete(self, instance, *args, **kwargs):
role_ids = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
role_ids.append(getattr(instance, implicit_role_field.name + '_id'))
Role_ = utils.get_current_apps().get_model('main', 'Role')
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role_.objects.filter(id__in=role_ids).delete()
# Avoid circular import
from awx.main.models.rbac import Role
Role.rebuild_role_ancestor_list([], child_ids)
class SmartFilterField(models.TextField):
def get_prep_value(self, value):

View File

@ -3,7 +3,6 @@ from time import time
from django.db.models import Subquery, OuterRef, F
from awx.main.fields import update_role_parentage_for_instance
from awx.main.models.rbac import Role, batch_role_ancestor_rebuilding
logger = logging.getLogger('rbac_migrations')
@ -238,85 +237,10 @@ def restore_inventory_admins_backward(apps, schema_editor):
def rebuild_role_hierarchy(apps, schema_editor):
"""
This should be called in any migration when ownerships are changed.
Ex. I remove a user from the admin_role of a credential.
Ancestors are cached from parents for performance, this re-computes ancestors.
"""
logger.info('Computing role roots..')
start = time()
roots = Role.objects.all().values_list('id', flat=True)
stop = time()
logger.info('Found %d roots in %f seconds, rebuilding ancestry map' % (len(roots), stop - start))
start = time()
Role.rebuild_role_ancestor_list(roots, [])
stop = time()
logger.info('Rebuild ancestors completed in %f seconds' % (stop - start))
logger.info('Done.')
"""Not used after DAB RBAC migration"""
pass
def rebuild_role_parentage(apps, schema_editor, models=None):
"""
This should be called in any migration when any parent_role entry
is modified so that the cached parent fields will be updated. Ex:
foo_role = ImplicitRoleField(
parent_role=['bar_role'] # change to parent_role=['admin_role']
)
This is like rebuild_role_hierarchy, but that method updates ancestors,
whereas this method updates parents.
"""
start = time()
seen_models = set()
model_ct = 0
noop_ct = 0
ContentType = apps.get_model('contenttypes', "ContentType")
additions = set()
removals = set()
role_qs = Role.objects
if models:
# update_role_parentage_for_instance is expensive
# if the models have been downselected, ignore those which are not in the list
ct_ids = list(ContentType.objects.filter(model__in=[name.lower() for name in models]).values_list('id', flat=True))
role_qs = role_qs.filter(content_type__in=ct_ids)
for role in role_qs.iterator():
if not role.object_id:
continue
model_tuple = (role.content_type_id, role.object_id)
if model_tuple in seen_models:
continue
seen_models.add(model_tuple)
# The GenericForeignKey does not work right in migrations
# with the usage as role.content_object
# so we do the lookup ourselves with current migration models
ct = role.content_type
app = ct.app_label
ct_model = apps.get_model(app, ct.model)
content_object = ct_model.objects.get(pk=role.object_id)
parents_added, parents_removed = update_role_parentage_for_instance(content_object)
additions.update(parents_added)
removals.update(parents_removed)
if parents_added:
model_ct += 1
logger.debug('Added to parents of roles {} of {}'.format(parents_added, content_object))
if parents_removed:
model_ct += 1
logger.debug('Removed from parents of roles {} of {}'.format(parents_removed, content_object))
else:
noop_ct += 1
logger.debug('No changes to role parents for {} resources'.format(noop_ct))
logger.debug('Added parents to {} roles'.format(len(additions)))
logger.debug('Removed parents from {} roles'.format(len(removals)))
if model_ct:
logger.info('Updated implicit parents of {} resources'.format(model_ct))
logger.info('Rebuild parentage completed in %f seconds' % (time() - start))
# this is ran because the ordinary signals for
# Role.parents.add and Role.parents.remove not called in migration
Role.rebuild_role_ancestor_list(list(additions), list(removals))
"""Not used after DAB RBAC migration"""
pass

View File

@ -38,7 +38,6 @@ from awx.main.models import (
InventorySource,
Job,
JobHostSummary,
JobTemplate,
Organization,
Project,
Role,
@ -56,10 +55,7 @@ from awx.main.models import (
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks.system import update_inventory_computed_fields, handle_removed_image
from awx.main.fields import (
is_implicit_parent,
update_role_parentage_for_instance,
)
from awx.main.fields import is_implicit_parent
from awx.main import consumers
@ -192,31 +188,6 @@ def cleanup_detached_labels_on_deleted_parent(sender, instance, **kwargs):
label.delete()
def save_related_job_templates(sender, instance, **kwargs):
"""save_related_job_templates loops through all of the
job templates that use an Inventory that have had their
Organization updated. This triggers the rebuilding of the RBAC hierarchy
and ensures the proper access restrictions.
"""
if sender is not Inventory:
raise ValueError('This signal callback is only intended for use with Project or Inventory')
update_fields = kwargs.get('update_fields', None)
if (update_fields and not ('organization' in update_fields or 'organization_id' in update_fields)) or kwargs.get('created', False):
return
if instance._prior_values_store.get('organization_id') != instance.organization_id:
jtq = JobTemplate.objects.filter(**{sender.__name__.lower(): instance})
for jt in jtq:
parents_added, parents_removed = update_role_parentage_for_instance(jt)
if parents_added or parents_removed:
logger.info(
'Permissions on JT {} changed due to inventory {} organization change from {} to {}.'.format(
jt.pk, instance.pk, instance._prior_values_store.get('organization_id'), instance.organization_id
)
)
def connect_computed_field_signals():
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
@ -230,7 +201,6 @@ def connect_computed_field_signals():
connect_computed_field_signals()
post_save.connect(save_related_job_templates, sender=Inventory)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through)

View File

@ -387,36 +387,6 @@ def test_remove_team_from_role(post, team, admin, role):
assert role.parents.filter(id=team.member_role.id).count() == 0
#
# /roles/<id>/parents/
#
@pytest.mark.django_db
def test_role_parents(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_parents_list', kwargs={'pk': role.id})
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.member_role.id
#
# /roles/<id>/children/
#
@pytest.mark.django_db
def test_role_children(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_children_list', kwargs={'pk': team.member_role.id})
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 2
assert response.data['results'][0]['id'] == role.id or response.data['results'][1]['id'] == role.id
#
# Generics
#