Merge pull request #1280 from anoek/rbac

Refactored ImplicitRoleField
This commit is contained in:
Akita Noek 2016-03-18 15:41:54 -04:00
commit 9dc4f3e07d
2 changed files with 132 additions and 116 deletions

View File

@ -5,6 +5,7 @@
from django.db import connection
from django.db.models.signals import (
post_init,
pre_save,
post_save,
post_delete,
)
@ -83,69 +84,8 @@ def resolve_role_field(obj, field):
return ret
class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
"""Descriptor Implict Role Fields. Auto-creates the appropriate role entry on first access"""
def __init__(self, role_name, role_description, permissions, parent_role, *args, **kwargs):
self.role_name = role_name
self.role_description = role_description if role_description else ""
self.permissions = permissions
self.parent_role = parent_role
super(ImplicitRoleDescriptor, self).__init__(*args, **kwargs)
def __get__(self, instance, instance_type=None):
role = super(ImplicitRoleDescriptor, self).__get__(instance, instance_type)
if role:
return role
if not self.role_name:
raise FieldError('Implicit role missing `role_name`')
if connection.needs_rollback:
raise TransactionManagementError('Current transaction has failed, cannot create implicit role')
role = Role.objects.create(name=self.role_name, description=self.role_description, content_object=instance)
setattr(instance, self.field.name, role)
if instance.pk:
instance.save(update_fields=[self.field.name,])
if self.parent_role:
# Add all non-null parent roles as parents
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
for path in paths:
if path.startswith("singleton:"):
parents = [Role.singleton(path[10:])]
else:
parents = resolve_role_field(instance, path)
for parent in parents:
role.parents.add(parent)
if self.permissions is not None:
permissions = RolePermission(
role=role,
resource=instance,
auto_generated=True
)
if 'all' in self.permissions and self.permissions['all']:
del self.permissions['all']
self.permissions['create'] = True
self.permissions['read'] = True
self.permissions['write'] = True
self.permissions['update'] = True
self.permissions['delete'] = True
self.permissions['scm_update'] = True
self.permissions['use'] = True
self.permissions['execute'] = True
for k,v in self.permissions.items():
setattr(permissions, k, v)
permissions.save()
return role
pass
class ImplicitRoleField(models.ForeignKey):
@ -153,7 +93,7 @@ class ImplicitRoleField(models.ForeignKey):
def __init__(self, role_name=None, role_description=None, permissions=None, parent_role=None, *args, **kwargs):
self.role_name = role_name
self.role_description = role_description
self.role_description = role_description if role_description else ""
self.permissions = permissions
self.parent_role = parent_role
@ -164,18 +104,15 @@ class ImplicitRoleField(models.ForeignKey):
def contribute_to_class(self, cls, name):
super(ImplicitRoleField, self).contribute_to_class(cls, name)
setattr(cls,
self.name,
ImplicitRoleDescriptor(
self.role_name,
self.role_description,
self.permissions,
self.parent_role,
self
)
)
post_init.connect(self._post_init, cls, True)
post_save.connect(self._post_save, cls, True)
setattr(cls, self.name, ImplicitRoleDescriptor(self))
if not hasattr(cls, '__implicit_role_fields'):
setattr(cls, '__implicit_role_fields', [])
getattr(cls, '__implicit_role_fields').append(self)
post_init.connect(self._post_init, cls, True, dispatch_uid='implicit-role-post-init')
pre_save.connect(self._pre_save, cls, True, dispatch_uid='implicit-role-pre-save')
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True)
add_lazy_relation(cls, self, "self", self.bind_m2m_changed)
@ -233,24 +170,82 @@ class ImplicitRoleField(models.ForeignKey):
def _post_init(self, instance, *args, **kwargs):
if not self.parent_role:
return
original_parent_roles = dict()
if instance.pk:
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
original_parent_roles[implicit_role_field.name] = implicit_role_field._resolve_parent_roles(instance)
if not instance.pk:
return
setattr(instance, '__original_parent_roles', original_parent_roles)
self._calc_original_parents(instance)
def _create_role_instance_if_not_exists(self, instance):
role = getattr(instance, self.name, None)
if role:
return role
role = Role.objects.create(
name=self.role_name,
description=self.role_description)
setattr(instance, self.name, role)
def _patch_role_content_object_and_grant_permissions(self, instance):
role = getattr(instance, self.name)
role.content_object = instance
role.save()
if self.permissions is not None:
permissions = RolePermission(
role=role,
resource=instance,
auto_generated=True
)
if 'all' in self.permissions and self.permissions['all']:
del self.permissions['all']
self.permissions['create'] = True
self.permissions['read'] = True
self.permissions['write'] = True
self.permissions['update'] = True
self.permissions['delete'] = True
self.permissions['scm_update'] = True
self.permissions['use'] = True
self.permissions['execute'] = True
for k,v in self.permissions.items():
setattr(permissions, k, v)
permissions.save()
def _pre_save(self, instance, *args, **kwargs):
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
implicit_role_field._create_role_instance_if_not_exists(instance)
def _post_save(self, instance, created, *args, **kwargs):
if created:
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
implicit_role_field._patch_role_content_object_and_grant_permissions(instance)
original_parent_roles = getattr(instance, '__original_parent_roles')
if created:
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
original_parent_roles[implicit_role_field.name] = set()
new_parent_roles = dict()
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
new_parent_roles[implicit_role_field.name] = implicit_role_field._resolve_parent_roles(instance)
setattr(instance, '__original_parent_roles', new_parent_roles)
with batch_role_ancestor_rebuilding():
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = original_parent_roles[implicit_role_field.name]
new_parents = new_parent_roles[implicit_role_field.name]
cur_role.parents.remove(*list(original_parents - new_parents))
cur_role.parents.add(*list(new_parents - original_parents))
def _calc_original_parents(self, instance):
if not hasattr(self, '__original_parent_roles'):
setattr(self, '__original_parent_roles', set()) # do not just self.__original_parent_roles=[], it's not the same here, apparently.
# NOTE: The above setattr is required to be called bofore
# _resolve_parent_roles because we can end up recursing, so the enclosing
# if not hasattr protects against this.
original_parent_roles = self._resolve_parent_roles(instance)
setattr(self, '__original_parent_roles', original_parent_roles)
def _resolve_parent_roles(self, instance):
if not self.parent_role:
return set()
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
parent_roles = set()
for path in paths:
@ -262,35 +257,10 @@ class ImplicitRoleField(models.ForeignKey):
parent_roles.add(parent)
return parent_roles
def _post_save(self, instance, created, *args, **kwargs):
# Ensure that our field gets initialized after our first save
this_role = getattr(instance, self.name)
# As object relations change, the role hierarchy might also change if the relations
# that changed were referenced in our magic parent_role field. This code synchronizes
# these changes.
if not self.parent_role:
return
if created:
self._calc_original_parents(instance)
return
original_parents = getattr(self, '__original_parent_roles')
new_parents = self._resolve_parent_roles(instance)
with batch_role_ancestor_rebuilding():
for role in original_parents - new_parents:
this_role.parents.remove(role)
for role in new_parents - original_parents:
this_role.parents.add(role)
setattr(self, '__original_parent_roles', new_parents)
def _post_delete(self, instance, *args, **kwargs):
this_role = getattr(instance, self.name)
children = [c for c in this_role.children.all()]
this_role.delete()
for child in children:
child.rebuild_role_ancestor_list()
with batch_role_ancestor_rebuilding():
for child in children:
child.rebuild_role_ancestor_list()

View File

@ -4,6 +4,7 @@ from awx.main.models import (
Role,
RolePermission,
Organization,
Project,
)
@ -195,3 +196,48 @@ def test_hierarchy_rebuilding():
assert X.is_ancestor_of(D) is False
@pytest.mark.django_db
def test_auto_parenting():
org1 = Organization.objects.create(name='org1')
org2 = Organization.objects.create(name='org2')
prj1 = Project.objects.create(name='prj1')
prj2 = Project.objects.create(name='prj2')
assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj1.organization = org1
prj1.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role)
assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj2.organization = org1
prj2.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role)
assert org1.admin_role.is_ancestor_of(prj2.admin_role)
assert org2.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj1.organization = org2
prj1.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org1.admin_role.is_ancestor_of(prj2.admin_role)
assert org2.admin_role.is_ancestor_of(prj1.admin_role)
assert org2.admin_role.is_ancestor_of(prj2.admin_role) is False
prj2.organization = org2
prj2.save()
assert org1.admin_role.is_ancestor_of(prj1.admin_role) is False
assert org1.admin_role.is_ancestor_of(prj2.admin_role) is False
assert org2.admin_role.is_ancestor_of(prj1.admin_role)
assert org2.admin_role.is_ancestor_of(prj2.admin_role)