mirror of
https://github.com/ansible/awx.git
synced 2026-02-24 14:36:00 -03:30
Merge pull request #5 from wwitzel3/11th-hour
fix User.accessible_objects
This commit is contained in:
@@ -34,7 +34,6 @@ def init_rbac_migration(apps, schema_editor):
|
|||||||
def migrate_users(apps, schema_editor):
|
def migrate_users(apps, schema_editor):
|
||||||
User = apps.get_model('auth', "User")
|
User = apps.get_model('auth', "User")
|
||||||
Role = apps.get_model('main', "Role")
|
Role = apps.get_model('main', "Role")
|
||||||
RolePermission = apps.get_model('main', "RolePermission")
|
|
||||||
ContentType = apps.get_model('contenttypes', "ContentType")
|
ContentType = apps.get_model('contenttypes', "ContentType")
|
||||||
user_content_type = ContentType.objects.get_for_model(User)
|
user_content_type = ContentType.objects.get_for_model(User)
|
||||||
|
|
||||||
@@ -52,15 +51,6 @@ def migrate_users(apps, schema_editor):
|
|||||||
object_id = user.id
|
object_id = user.id
|
||||||
)
|
)
|
||||||
role.members.add(user)
|
role.members.add(user)
|
||||||
RolePermission.objects.create(
|
|
||||||
created=now(),
|
|
||||||
modified=now(),
|
|
||||||
role = role,
|
|
||||||
content_type = user_content_type,
|
|
||||||
object_id = user.id,
|
|
||||||
create=1, read=1, write=1, delete=1, update=1,
|
|
||||||
execute=1, scm_update=1, use=1,
|
|
||||||
)
|
|
||||||
logger.info(smart_text(u"migrating to new role for user: {}".format(user.username)))
|
logger.info(smart_text(u"migrating to new role for user: {}".format(user.username)))
|
||||||
|
|
||||||
if user.is_superuser:
|
if user.is_superuser:
|
||||||
|
|||||||
@@ -33,6 +33,23 @@ class ResourceMixin(models.Model):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _accessible_objects(cls, accessor, role_name):
|
def _accessible_objects(cls, accessor, role_name):
|
||||||
|
if type(cls()) == User:
|
||||||
|
cls_type = ContentType.objects.get_for_model(cls)
|
||||||
|
roles = Role.objects.filter(content_type__pk=cls_type.id)
|
||||||
|
|
||||||
|
if type(accessor) == User:
|
||||||
|
roles = roles.filter(ancestors__members = accessor)
|
||||||
|
elif type(accessor) == Role:
|
||||||
|
roles = roles.filter(ancestors = accessor)
|
||||||
|
else:
|
||||||
|
accessor_type = ContentType.objects.get_for_model(accessor)
|
||||||
|
accessor_roles = Role.objects.filter(content_type__pk=accessor_type.id,
|
||||||
|
object_id=accessor.id)
|
||||||
|
roles = roles.filter(ancestors__in=accessor_roles)
|
||||||
|
|
||||||
|
kwargs = {'id__in':roles.values_list('object_id', flat=True)}
|
||||||
|
return cls.objects.filter(**kwargs)
|
||||||
|
|
||||||
if type(accessor) == User:
|
if type(accessor) == User:
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
kwargs[role_name + '__ancestors__members'] = accessor
|
kwargs[role_name + '__ancestors__members'] = accessor
|
||||||
@@ -49,7 +66,6 @@ class ResourceMixin(models.Model):
|
|||||||
kwargs[role_name + '__ancestors__in'] = roles
|
kwargs[role_name + '__ancestors__in'] = roles
|
||||||
qs = cls.objects.filter(**kwargs)
|
qs = cls.objects.filter(**kwargs)
|
||||||
|
|
||||||
#return cls.objects.filter(resource__in=qs)
|
|
||||||
return qs
|
return qs
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -40,14 +40,14 @@ def test_user_queryset(user):
|
|||||||
def test_user_accessible_objects(user, organization):
|
def test_user_accessible_objects(user, organization):
|
||||||
admin = user('admin', False)
|
admin = user('admin', False)
|
||||||
u = user('john', False)
|
u = user('john', False)
|
||||||
assert User.accessible_objects(admin, {'read':True}).count() == 1
|
assert User.accessible_objects(admin, 'admin_role').count() == 1
|
||||||
|
|
||||||
organization.member_role.members.add(u)
|
organization.member_role.members.add(u)
|
||||||
organization.admin_role.members.add(admin)
|
organization.admin_role.members.add(admin)
|
||||||
assert User.accessible_objects(admin, {'read':True}).count() == 2
|
assert User.accessible_objects(admin, 'admin_role').count() == 2
|
||||||
|
|
||||||
organization.member_role.members.remove(u)
|
organization.member_role.members.remove(u)
|
||||||
assert User.accessible_objects(admin, {'read':True}).count() == 1
|
assert User.accessible_objects(admin, 'admin_role').count() == 1
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_org_user_admin(user, organization):
|
def test_org_user_admin(user, organization):
|
||||||
|
|||||||
Reference in New Issue
Block a user