diff --git a/awx/main/fields.py b/awx/main/fields.py index 513263ed9f..4a0edcb581 100644 --- a/awx/main/fields.py +++ b/awx/main/fields.py @@ -5,6 +5,7 @@ from django.db import connection from django.db.models.signals import ( post_init, + pre_save, post_save, post_delete, ) @@ -83,69 +84,8 @@ def resolve_role_field(obj, field): return ret - class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor): - """Descriptor Implict Role Fields. Auto-creates the appropriate role entry on first access""" - - def __init__(self, role_name, role_description, permissions, parent_role, *args, **kwargs): - self.role_name = role_name - self.role_description = role_description if role_description else "" - self.permissions = permissions - self.parent_role = parent_role - - super(ImplicitRoleDescriptor, self).__init__(*args, **kwargs) - - def __get__(self, instance, instance_type=None): - role = super(ImplicitRoleDescriptor, self).__get__(instance, instance_type) - if role: - return role - - if not self.role_name: - raise FieldError('Implicit role missing `role_name`') - - if connection.needs_rollback: - raise TransactionManagementError('Current transaction has failed, cannot create implicit role') - - - role = Role.objects.create(name=self.role_name, description=self.role_description, content_object=instance) - setattr(instance, self.field.name, role) - if instance.pk: - instance.save(update_fields=[self.field.name,]) - - if self.parent_role: - # Add all non-null parent roles as parents - paths = self.parent_role if type(self.parent_role) is list else [self.parent_role] - for path in paths: - if path.startswith("singleton:"): - parents = [Role.singleton(path[10:])] - else: - parents = resolve_role_field(instance, path) - for parent in parents: - role.parents.add(parent) - - if self.permissions is not None: - permissions = RolePermission( - role=role, - resource=instance, - auto_generated=True - ) - - if 'all' in self.permissions and self.permissions['all']: - del self.permissions['all'] - self.permissions['create'] = True - self.permissions['read'] = True - self.permissions['write'] = True - self.permissions['update'] = True - self.permissions['delete'] = True - self.permissions['scm_update'] = True - self.permissions['use'] = True - self.permissions['execute'] = True - - for k,v in self.permissions.items(): - setattr(permissions, k, v) - permissions.save() - - return role + pass class ImplicitRoleField(models.ForeignKey): @@ -153,7 +93,7 @@ class ImplicitRoleField(models.ForeignKey): def __init__(self, role_name=None, role_description=None, permissions=None, parent_role=None, *args, **kwargs): self.role_name = role_name - self.role_description = role_description + self.role_description = role_description if role_description else "" self.permissions = permissions self.parent_role = parent_role @@ -164,18 +104,15 @@ class ImplicitRoleField(models.ForeignKey): def contribute_to_class(self, cls, name): super(ImplicitRoleField, self).contribute_to_class(cls, name) - setattr(cls, - self.name, - ImplicitRoleDescriptor( - self.role_name, - self.role_description, - self.permissions, - self.parent_role, - self - ) - ) - post_init.connect(self._post_init, cls, True) - post_save.connect(self._post_save, cls, True) + setattr(cls, self.name, ImplicitRoleDescriptor(self)) + + if not hasattr(cls, '__implicit_role_fields'): + setattr(cls, '__implicit_role_fields', []) + getattr(cls, '__implicit_role_fields').append(self) + + post_init.connect(self._post_init, cls, True, dispatch_uid='implicit-role-post-init') + pre_save.connect(self._pre_save, cls, True, dispatch_uid='implicit-role-pre-save') + post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save') post_delete.connect(self._post_delete, cls, True) add_lazy_relation(cls, self, "self", self.bind_m2m_changed) @@ -233,24 +170,82 @@ class ImplicitRoleField(models.ForeignKey): def _post_init(self, instance, *args, **kwargs): - if not self.parent_role: - return + original_parent_roles = dict() + if instance.pk: + for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'): + original_parent_roles[implicit_role_field.name] = implicit_role_field._resolve_parent_roles(instance) - if not instance.pk: - return + setattr(instance, '__original_parent_roles', original_parent_roles) - self._calc_original_parents(instance) + def _create_role_instance_if_not_exists(self, instance): + role = getattr(instance, self.name, None) + if role: + return role + role = Role.objects.create( + name=self.role_name, + description=self.role_description) + setattr(instance, self.name, role) + + def _patch_role_content_object_and_grant_permissions(self, instance): + role = getattr(instance, self.name) + role.content_object = instance + role.save() + + if self.permissions is not None: + permissions = RolePermission( + role=role, + resource=instance, + auto_generated=True + ) + + if 'all' in self.permissions and self.permissions['all']: + del self.permissions['all'] + self.permissions['create'] = True + self.permissions['read'] = True + self.permissions['write'] = True + self.permissions['update'] = True + self.permissions['delete'] = True + self.permissions['scm_update'] = True + self.permissions['use'] = True + self.permissions['execute'] = True + + for k,v in self.permissions.items(): + setattr(permissions, k, v) + permissions.save() + + def _pre_save(self, instance, *args, **kwargs): + for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'): + implicit_role_field._create_role_instance_if_not_exists(instance) + + def _post_save(self, instance, created, *args, **kwargs): + if created: + for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'): + implicit_role_field._patch_role_content_object_and_grant_permissions(instance) + + original_parent_roles = getattr(instance, '__original_parent_roles') + + if created: + for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'): + original_parent_roles[implicit_role_field.name] = set() + + new_parent_roles = dict() + for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'): + new_parent_roles[implicit_role_field.name] = implicit_role_field._resolve_parent_roles(instance) + setattr(instance, '__original_parent_roles', new_parent_roles) + + with batch_role_ancestor_rebuilding(): + for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'): + cur_role = getattr(instance, implicit_role_field.name) + original_parents = original_parent_roles[implicit_role_field.name] + new_parents = new_parent_roles[implicit_role_field.name] + cur_role.parents.remove(*list(original_parents - new_parents)) + cur_role.parents.add(*list(new_parents - original_parents)) - def _calc_original_parents(self, instance): - if not hasattr(self, '__original_parent_roles'): - setattr(self, '__original_parent_roles', set()) # do not just self.__original_parent_roles=[], it's not the same here, apparently. - # NOTE: The above setattr is required to be called bofore - # _resolve_parent_roles because we can end up recursing, so the enclosing - # if not hasattr protects against this. - original_parent_roles = self._resolve_parent_roles(instance) - setattr(self, '__original_parent_roles', original_parent_roles) def _resolve_parent_roles(self, instance): + if not self.parent_role: + return set() + paths = self.parent_role if type(self.parent_role) is list else [self.parent_role] parent_roles = set() for path in paths: @@ -262,35 +257,10 @@ class ImplicitRoleField(models.ForeignKey): parent_roles.add(parent) return parent_roles - - def _post_save(self, instance, created, *args, **kwargs): - # Ensure that our field gets initialized after our first save - this_role = getattr(instance, self.name) - - # As object relations change, the role hierarchy might also change if the relations - # that changed were referenced in our magic parent_role field. This code synchronizes - # these changes. - if not self.parent_role: - return - - if created: - self._calc_original_parents(instance) - return - - original_parents = getattr(self, '__original_parent_roles') - new_parents = self._resolve_parent_roles(instance) - - with batch_role_ancestor_rebuilding(): - for role in original_parents - new_parents: - this_role.parents.remove(role) - for role in new_parents - original_parents: - this_role.parents.add(role) - - setattr(self, '__original_parent_roles', new_parents) - def _post_delete(self, instance, *args, **kwargs): this_role = getattr(instance, self.name) children = [c for c in this_role.children.all()] this_role.delete() - for child in children: - child.rebuild_role_ancestor_list() + with batch_role_ancestor_rebuilding(): + for child in children: + child.rebuild_role_ancestor_list() diff --git a/awx/main/tests/functional/test_rbac_core.py b/awx/main/tests/functional/test_rbac_core.py index d7657344f5..2ad1250f81 100644 --- a/awx/main/tests/functional/test_rbac_core.py +++ b/awx/main/tests/functional/test_rbac_core.py @@ -4,6 +4,7 @@ from awx.main.models import ( Role, RolePermission, Organization, + Project, ) @@ -195,3 +196,48 @@ def test_hierarchy_rebuilding(): assert X.is_ancestor_of(D) is False +@pytest.mark.django_db +def test_auto_parenting(): + org1 = Organization.objects.create(name='org1') + org2 = Organization.objects.create(name='org2') + + prj1 = Project.objects.create(name='prj1') + prj2 = Project.objects.create(name='prj2') + + assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False + assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False + assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False + assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False + + prj1.organization = org1 + prj1.save() + + assert org1.admin_role.is_ancestor_of(prj1.admin_role) + assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False + assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False + assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False + + prj2.organization = org1 + prj2.save() + + assert org1.admin_role.is_ancestor_of(prj1.admin_role) + assert org1.admin_role.is_ancestor_of(prj2.admin_role) + assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False + assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False + + prj1.organization = org2 + prj1.save() + + assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False + assert org1.admin_role.is_ancestor_of(prj2.admin_role) + assert org2.admin_role.is_ancestor_of(prj1.admin_role) + assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False + + prj2.organization = org2 + prj2.save() + + assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False + assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False + assert org2.admin_role.is_ancestor_of(prj1.admin_role) + assert org2.admin_role.is_ancestor_of(prj2.admin_role) +