Merge pull request #905 from wwitzel3/rbac

RBAC tests and fixes
This commit is contained in:
Akita Noek 2016-02-10 15:22:11 -05:00
commit 4bc9ca2096
7 changed files with 144 additions and 10 deletions

View File

@ -46,8 +46,10 @@ class AutoOneToOneField(models.OneToOneField):
def resolve_field(obj, field):
for f in field.split('.'):
if obj:
if hasattr(obj, f):
obj = getattr(obj, f)
else:
obj = None
return obj
class ResourceFieldDescriptor(ReverseSingleRelatedObjectDescriptor):

View File

@ -544,23 +544,27 @@ class Group(CommonModelNameNotUnique, ResourceMixin):
)
admin_role = ImplicitRoleField(
role_name='Inventory Group Administrator',
parent_role='inventory.admin_role',
parent_role=['inventory.admin_role', 'parents.admin_role'],
resource_field='resource',
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Inventory Group Auditor',
parent_role='inventory.auditor_role',
parent_role=['inventory.auditor_role', 'parents.auditor_role'],
resource_field='resource',
permissions = {'read': True}
)
updater_role = ImplicitRoleField(
role_name='Inventory Group Updater',
parent_role='inventory.updater_role'
parent_role=['inventory.updater_role', 'parents.updater_role'],
resource_field='resource',
permissions = {'read': True, 'write': True, 'create': True, 'use': True},
)
executor_role = ImplicitRoleField(
role_name='Inventory Group Executor',
parent_role='inventory.executor_role'
parent_role=['inventory.executor_role', 'parents.executor_role'],
resource_field='resource',
permissions = {'read':True, 'execute':True},
)
def __unicode__(self):

View File

@ -18,6 +18,7 @@ from crum.signals import current_user_getter
# AWX
from awx.main.models import * # noqa
from awx.api.serializers import * # noqa
from awx.main.fields import ImplicitRoleField
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, emit_websocket_notification
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks import update_inventory_computed_fields
@ -122,6 +123,31 @@ def rebuild_role_hierarchy_cache(sender, reverse, model, pk_set, **kwargs):
else:
kwargs['instance'].rebuild_role_hierarchy_cache()
def rebuild_group_parent_roles(instance, action, reverse, **kwargs):
objects = []
if reverse:
objects = instance.children.all()
else:
objects = instance.parents.all()
for obj in objects:
fields = [f for f in instance._meta.get_fields() if type(f) is ImplicitRoleField]
for field in fields:
role = None
if reverse:
if hasattr(obj, field.name):
parent_role = getattr(instance, field.name)
role = getattr(obj, field.name)
else:
role = getattr(instance, field.name)
parent_role = getattr(obj, field.name)
if role:
if action == 'post_add':
role.parents.add(parent_role)
elif action == 'pre_remove':
role.parents.remove(parent_role)
pre_save.connect(store_initial_active_state, sender=Host)
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
@ -141,7 +167,7 @@ post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Job)
post_save.connect(emit_job_event_detail, sender=JobEvent)
post_save.connect(emit_ad_hoc_command_event_detail, sender=AdHocCommandEvent)
m2m_changed.connect(rebuild_role_hierarchy_cache, Role.parents.through)
m2m_changed.connect(rebuild_group_parent_roles, Group.parents.through)
# Migrate hosts, groups to parent group(s) whenever a group is deleted or
# marked as inactive.

View File

@ -1,7 +1,10 @@
import pytest
from awx.main.models.credential import Credential
from awx.main.models.inventory import Inventory
from awx.main.models.inventory import (
Inventory,
Group,
)
from awx.main.models.projects import Project
from awx.main.models.organization import (
Organization,
@ -45,6 +48,12 @@ def credential():
def inventory(organization):
return Inventory.objects.create(name="test-inventory", organization=organization)
@pytest.fixture
def group(inventory):
def g(name):
return Group.objects.create(inventory=inventory, name=name)
return g
@pytest.fixture
def permissions():
return {

View File

@ -172,3 +172,26 @@ def test_inventory_executor(inventory, permissions, user, team):
assert team.member_role.is_ancestor_of(inventory.updater_role) is False
assert team.member_role.is_ancestor_of(inventory.executor_role)
@pytest.mark.django_db
def test_group_parent_admin(group, permissions, user):
u = user('admin', False)
parent1 = group('parent-1')
parent2 = group('parent-2')
childA = group('child-1')
parent1.admin_role.members.add(u)
assert parent1.accessible_by(u, permissions['admin'])
assert not parent2.accessible_by(u, permissions['admin'])
assert not childA.accessible_by(u, permissions['admin'])
childA.parents.add(parent1)
assert childA.accessible_by(u, permissions['admin'])
childA.parents.remove(parent1)
assert not childA.accessible_by(u, permissions['admin'])
parent2.children.add(childA)
assert not childA.accessible_by(u, permissions['admin'])
parent2.admin_role.members.add(u)
assert childA.accessible_by(u, permissions['admin'])

View File

@ -32,17 +32,39 @@ def test_organization_migration_user(organization, permissions, user):
@pytest.mark.django_db
def test_organization_access_superuser(organization, user):
access = OrganizationAccess(user('admin', True))
organization.users.add(user('user', False))
assert access.can_change(organization, None)
assert access.can_delete(organization)
org = access.get_queryset()[0]
assert len(org.admins.all()) == 0
assert len(org.users.all()) == 1
@pytest.mark.django_db
def test_organization_access_admin(organization, user):
u = user('admin', False)
organization.admins.add(u)
'''can_change because I am an admin of that org'''
a = user('admin', False)
organization.admins.add(a)
organization.users.add(user('user', False))
access = OrganizationAccess(u)
access = OrganizationAccess(a)
assert access.can_change(organization, None)
assert access.can_delete(organization)
org = access.get_queryset()[0]
assert len(org.admins.all()) == 1
assert len(org.users.all()) == 1
@pytest.mark.django_db
def test_organization_access_user(organization, user):
access = OrganizationAccess(user('user', False))
organization.users.add(user('user', False))
assert not access.can_change(organization, None)
assert not access.can_delete(organization)
org = access.get_queryset()[0]
assert len(org.admins.all()) == 0
assert len(org.users.all()) == 1

View File

@ -1,6 +1,7 @@
import pytest
from awx.main.migrations import _rbac as rbac
from awx.main.access import TeamAccess
from django.apps import apps
@pytest.mark.django_db
@ -15,3 +16,50 @@ def test_team_migration_user(team, user, permissions):
assert len(migrated) == 1
assert team.accessible_by(u, permissions['auditor'])
@pytest.mark.django_db
def test_team_access_superuser(team, user):
team.users.add(user('member', False))
access = TeamAccess(user('admin', True))
assert access.can_add(None)
assert access.can_change(team, None)
assert access.can_delete(team)
t = access.get_queryset()[0]
assert len(t.users.all()) == 1
assert len(t.organization.admins.all()) == 0
@pytest.mark.django_db
def test_team_access_org_admin(organization, team, user):
a = user('admin', False)
organization.admins.add(a)
team.organization = organization
team.save()
access = TeamAccess(a)
assert access.can_add({'organization': organization.pk})
assert access.can_change(team, None)
assert access.can_delete(team)
t = access.get_queryset()[0]
assert len(t.users.all()) == 0
assert len(t.organization.admins.all()) == 1
@pytest.mark.django_db
def test_team_access_member(organization, team, user):
u = user('member', False)
team.users.add(u)
team.organization = organization
team.save()
access = TeamAccess(u)
assert not access.can_add({'organization': organization.pk})
assert not access.can_change(team, None)
assert not access.can_delete(team)
t = access.get_queryset()[0]
assert len(t.users.all()) == 1
assert len(t.organization.admins.all()) == 0