Merge pull request #813 from anoek/rbac

This commit is contained in:
Akita Noek 2016-02-05 17:05:30 -05:00
commit 1418e95155
6 changed files with 241 additions and 46 deletions

View File

@ -119,7 +119,6 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
raise FieldError('Implicit role missing `role_name`')
role = Role._default_manager.create(name=self.role_name)
role.save()
if self.parent_role:
# Add all non-null parent roles as parents
if type(self.parent_role) is list:

View File

@ -21,6 +21,7 @@ from awx.main.constants import CLOUD_PROVIDERS
from awx.main.fields import AutoOneToOneField, ImplicitRoleField
from awx.main.managers import HostManager
from awx.main.models.base import * # noqa
from awx.main.models.organization import Permission # for rbac migration
from awx.main.models.jobs import Job
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.mixins import ResourceMixin
@ -112,6 +113,48 @@ class Inventory(CommonModel, ResourceMixin):
role_name='Inventory Executor',
)
def migrate_to_rbac(self):
migrated_users = []
migrated_teams = []
for perm in Permission.objects.filter(inventory=self):
role = None
execrole = None
if perm.permission_type == 'admin':
role = self.admin_role
pass
elif perm.permission_type == 'read':
role = self.auditor_role
pass
elif perm.permission_type == 'write':
role = self.updater_role
pass
else:
raise Exception('Unhandled permission type for inventory: %s' % perm.permission_type)
if perm.run_ad_hoc_commands:
execrole = self.executor_role
if perm.team:
if role:
perm.team.member_role.children.add(role)
if execrole:
perm.team.member_role.children.add(execrole)
migrated_teams.append(perm.team)
if perm.user:
if role:
role.members.add(perm.user)
if execrole:
execrole.members.add(perm.user)
migrated_users.append(perm.user)
return {
'migrated_users': migrated_users,
'migrated_teams': migrated_teams,
}
def get_absolute_url(self):
return reverse('api:inventory_detail', args=(self.pk,))

View File

@ -11,7 +11,7 @@ from django.utils.translation import ugettext_lazy as _
# AWX
from awx.main.models.base import * # noqa
__all__ = ['Role', 'RolePermission', 'Resource', 'RoleHierarchy', 'ResourceHierarchy']
__all__ = ['Role', 'RolePermission', 'Resource', 'RoleHierarchy']
logger = logging.getLogger('awx.main.models.rbac')
@ -84,6 +84,9 @@ class Role(CommonModelNameNotUnique):
ret.save()
return ret
def is_ancestor_of(self, role):
return RoleHierarchy.objects.filter(role_id=role.id, ancestor_id=self.id).count() > 0
class RoleHierarchy(CreatedModifiedModel):
@ -112,47 +115,6 @@ class Resource(CommonModelNameNotUnique):
parent = models.ForeignKey('Resource', related_name='children', null=True, default=None)
def save(self, *args, **kwargs):
super(Resource, self).save(*args, **kwargs)
self.rebuild_resource_hierarchy_cache()
def rebuild_resource_hierarchy_cache(self):
'Rebuilds the associated entries in the ResourceHierarchy model'
# Compute what our hierarchy should be. (Note: this depends on our
# parent's cached hierarchy being correct)
actual_ancestors = set()
if self.parent:
actual_ancestors = set([r.ancestor.id for r in ResourceHierarchy.objects.filter(resource__id=self.parent.id)])
actual_ancestors.add(self.id)
# Compute what we have stored
stored_ancestors = set([r.ancestor.id for r in ResourceHierarchy.objects.filter(resource__id=self.id)])
# If it differs, update, and then update all of our children
if actual_ancestors != stored_ancestors:
ResourceHierarchy.objects.filter(resource__id=self.id).delete()
for id in actual_ancestors:
rh = ResourceHierarchy(resource=self, ancestor=Resource.objects.get(id=id))
rh.save()
for child in self.children.all():
child.rebuild_resource_hierarchy_cache()
class ResourceHierarchy(CreatedModifiedModel):
'''
Stores a flattened relation map of all resources in the system for easy joining
'''
class Meta:
app_label = 'main'
verbose_name_plural = _('resource_ancestors')
db_table = 'main_rbac_resource_hierarchy'
resource = models.ForeignKey('Resource', related_name='+', on_delete=models.CASCADE)
ancestor = models.ForeignKey('Resource', related_name='+', on_delete=models.CASCADE)
class RolePermission(CreatedModifiedModel):
'''
@ -184,4 +146,3 @@ class RolePermission(CreatedModifiedModel):
execute = models.IntegerField(default = 0)
scm_update = models.IntegerField(default = 0)
use = models.IntegerField(default = 0)

View File

@ -115,8 +115,12 @@ def store_initial_active_state(sender, **kwargs):
else:
instance._saved_active_state = True
def rebuild_role_hierarchy_cache(sender, **kwargs):
kwargs['instance'].rebuild_role_hierarchy_cache()
def rebuild_role_hierarchy_cache(sender, reverse, model, pk_set, **kwargs):
if reverse:
for id in pk_set:
model.objects.get(id=id).rebuild_role_hierarchy_cache()
else:
kwargs['instance'].rebuild_role_hierarchy_cache()
pre_save.connect(store_initial_active_state, sender=Host)
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host)

View File

@ -1,8 +1,10 @@
import pytest
from awx.main.models.credential import Credential
from awx.main.models.inventory import Inventory
from awx.main.models.organization import (
Organization,
Permission,
Team,
)
from django.contrib.auth.models import User
@ -30,6 +32,10 @@ def organization():
def credential():
return Credential.objects.create(kind='aws', name='test-cred')
@pytest.fixture
def inventory(organization):
return Inventory.objects.create(name="test-inventory", organization=organization)
@pytest.fixture
def permissions():
return {

View File

@ -0,0 +1,182 @@
import pytest
from awx.main.access import OrganizationAccess
from awx.main.models import (
Inventory,
Permission,
PERM_INVENTORY_ADMIN,
PERM_INVENTORY_READ,
PERM_INVENTORY_WRITE,
PERM_INVENTORY_DEPLOY,
PERM_INVENTORY_CHECK,
PERM_INVENTORY_SCAN,
)
@pytest.mark.django_db
def test_inventory_admin_user(inventory, permissions, user):
u = user('admin', False)
perm = Permission(user=u, inventory=inventory, permission_type='admin')
perm.save()
assert inventory.accessible_by(u, permissions['admin']) == False
migrations = inventory.migrate_to_rbac()
assert len(migrations['migrated_users']) == 1
assert len(migrations['migrated_teams']) == 0
assert inventory.accessible_by(u, permissions['admin'])
assert not inventory.executor_role.members.filter(id=u.id).exists()
assert not inventory.updater_role.members.filter(id=u.id).exists()
@pytest.mark.django_db
def test_inventory_auditor_user(inventory, permissions, user):
u = user('auditor', False)
perm = Permission(user=u, inventory=inventory, permission_type='read')
perm.save()
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == False
migrations = inventory.migrate_to_rbac()
assert len(migrations['migrated_users']) == 1
assert len(migrations['migrated_teams']) == 0
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == True
assert not inventory.executor_role.members.filter(id=u.id).exists()
assert not inventory.updater_role.members.filter(id=u.id).exists()
@pytest.mark.django_db
def test_inventory_updater_user(inventory, permissions, user):
u = user('updater', False)
perm = Permission(user=u, inventory=inventory, permission_type='write')
perm.save()
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == False
migrations = inventory.migrate_to_rbac()
assert len(migrations['migrated_users']) == 1
assert len(migrations['migrated_teams']) == 0
assert inventory.accessible_by(u, permissions['admin']) == False
assert not inventory.executor_role.members.filter(id=u.id).exists()
assert inventory.updater_role.members.filter(id=u.id).exists()
@pytest.mark.django_db
def test_inventory_executor_user(inventory, permissions, user):
u = user('executor', False)
perm = Permission(user=u, inventory=inventory, permission_type='read', run_ad_hoc_commands=True)
perm.save()
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == False
migrations = inventory.migrate_to_rbac()
assert len(migrations['migrated_users']) == 1
assert len(migrations['migrated_teams']) == 0
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == True
assert inventory.executor_role.members.filter(id=u.id).exists()
assert not inventory.updater_role.members.filter(id=u.id).exists()
@pytest.mark.django_db
def test_inventory_admin_team(inventory, permissions, user, team):
u = user('admin', False)
perm = Permission(team=team, inventory=inventory, permission_type='admin')
perm.save()
team.users.add(u)
assert inventory.accessible_by(u, permissions['admin']) == False
team_migrations = team.migrate_to_rbac()
migrations = inventory.migrate_to_rbac()
assert len(team_migrations) == 1
assert team.member_role.members.count() == 1
assert len(migrations['migrated_users']) == 0
assert len(migrations['migrated_teams']) == 1
assert not inventory.admin_role.members.filter(id=u.id).exists()
assert not inventory.auditor_role.members.filter(id=u.id).exists()
assert not inventory.executor_role.members.filter(id=u.id).exists()
assert not inventory.updater_role.members.filter(id=u.id).exists()
assert inventory.accessible_by(u, permissions['auditor'])
assert inventory.accessible_by(u, permissions['admin'])
@pytest.mark.django_db
def test_inventory_auditor(inventory, permissions, user, team):
u = user('auditor', False)
perm = Permission(team=team, inventory=inventory, permission_type='read')
perm.save()
team.users.add(u)
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == False
team_migrations = team.migrate_to_rbac()
migrations = inventory.migrate_to_rbac()
assert len(team_migrations) == 1
assert team.member_role.members.count() == 1
assert len(migrations['migrated_users']) == 0
assert len(migrations['migrated_teams']) == 1
assert not inventory.admin_role.members.filter(id=u.id).exists()
assert not inventory.auditor_role.members.filter(id=u.id).exists()
assert not inventory.executor_role.members.filter(id=u.id).exists()
assert not inventory.updater_role.members.filter(id=u.id).exists()
assert inventory.accessible_by(u, permissions['auditor'])
assert not inventory.accessible_by(u, permissions['admin'])
@pytest.mark.django_db
def test_inventory_updater(inventory, permissions, user, team):
u = user('updater', False)
perm = Permission(team=team, inventory=inventory, permission_type='write')
perm.save()
team.users.add(u)
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == False
team_migrations = team.migrate_to_rbac()
migrations = inventory.migrate_to_rbac()
assert len(team_migrations) == 1
assert team.member_role.members.count() == 1
assert len(migrations['migrated_users']) == 0
assert len(migrations['migrated_teams']) == 1
assert not inventory.admin_role.members.filter(id=u.id).exists()
assert not inventory.auditor_role.members.filter(id=u.id).exists()
assert not inventory.executor_role.members.filter(id=u.id).exists()
assert not inventory.updater_role.members.filter(id=u.id).exists()
assert team.member_role.is_ancestor_of(inventory.updater_role)
assert not team.member_role.is_ancestor_of(inventory.executor_role)
@pytest.mark.django_db
def test_inventory_executor(inventory, permissions, user, team):
u = user('executor', False)
perm = Permission(team=team, inventory=inventory, permission_type='read', run_ad_hoc_commands=True)
perm.save()
team.users.add(u)
assert inventory.accessible_by(u, permissions['admin']) == False
assert inventory.accessible_by(u, permissions['auditor']) == False
team_migrations = team.migrate_to_rbac()
migrations = inventory.migrate_to_rbac()
assert len(team_migrations) == 1
assert team.member_role.members.count() == 1
assert len(migrations['migrated_users']) == 0
assert len(migrations['migrated_teams']) == 1
assert not inventory.admin_role.members.filter(id=u.id).exists()
assert not inventory.auditor_role.members.filter(id=u.id).exists()
assert not inventory.executor_role.members.filter(id=u.id).exists()
assert not inventory.updater_role.members.filter(id=u.id).exists()
assert not team.member_role.is_ancestor_of(inventory.updater_role)
assert team.member_role.is_ancestor_of(inventory.executor_role)