Merge pull request #805 from wwitzel3/rbac

Added Credential migration tests and extended organization tests.
This commit is contained in:
Wayne Witzel III
2016-02-05 10:26:13 -05:00
11 changed files with 185 additions and 94 deletions

View File

@@ -2,7 +2,7 @@
# All Rights Reserved.
# Django
from django.db.models.signals import post_save, post_init
from django.db.models.signals import post_save
from django.db import models
from django.db.models.fields.related import SingleRelatedObjectDescriptor
from django.db.models.fields.related import ReverseSingleRelatedObjectDescriptor
@@ -66,7 +66,7 @@ class ResourceFieldDescriptor(ReverseSingleRelatedObjectDescriptor):
# Take first non null parent resource
parent = None
if type(self.parent_resource) is list:
for path in self.parent_resource:
for path in self.parent_resource:
parent = resolve_field(instance, path)
if parent:
break
@@ -123,7 +123,7 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
if self.parent_role:
# Add all non-null parent roles as parents
if type(self.parent_role) is list:
for path in self.parent_role:
for path in self.parent_role:
if path.startswith("singleton:"):
parent = Role.singleton(path[10:])
else:
@@ -142,7 +142,7 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor):
if self.resource_field and self.permissions:
permissions = RolePermission(
role=role,
role=role,
resource=getattr(instance, self.resource_field)
)
@@ -180,13 +180,13 @@ class ImplicitRoleField(models.ForeignKey):
def contribute_to_class(self, cls, name):
super(ImplicitRoleField, self).contribute_to_class(cls, name)
setattr(cls,
self.name,
setattr(cls,
self.name,
ImplicitRoleDescriptor(
self.role_name,
self.resource_field,
self.permissions,
self.parent_role,
self.role_name,
self.resource_field,
self.permissions,
self.parent_role,
self
)
)

View File

@@ -156,19 +156,16 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
help_text=_('Vault password (or "ASK" to prompt the user).'),
)
owner_role = ImplicitRoleField(
role_name='Credential Owner',
parent_role=[
'user.user_role',
'team.admin_role'
],
role_name='Credential Owner',
parent_role='team.admin_role',
resource_field='resource',
permissions = { 'all': True }
permissions = {'all': True}
)
usage_role = ImplicitRoleField(
role_name='Credential User',
role_name='Credential User',
resource_field='resource',
parent_role= 'team.member_role',
permissions = { 'usage': True }
permissions = {'use': True}
)
@property
@@ -366,6 +363,15 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
update_fields.append('cloud')
super(Credential, self).save(*args, **kwargs)
def migrate_to_rbac(self):
if self.user:
self.owner_role.members.add(self.user)
return [self.user]
elif self.team:
self.owner_role.parents.add(self.team.admin_role)
self.usage_role.parents.add(self.team.member_role)
return [self.team]
def validate_ssh_private_key(data):
"""Validate that the given SSH private key or certificate is,
in fact, valid.

View File

@@ -94,22 +94,22 @@ class Inventory(CommonModel, ResourceMixin):
help_text=_('Number of external inventory sources in this inventory with failures.'),
)
admin_role = ImplicitRoleField(
role_name='Inventory Administrator',
role_name='Inventory Administrator',
parent_role='organization.admin_role',
resource_field='resource',
permissions = { 'all': True }
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Inventory Auditor',
role_name='Inventory Auditor',
parent_role='organization.auditor_role',
resource_field='resource',
permissions = { 'read': True }
permissions = {'read': True}
)
updater_role = ImplicitRoleField(
role_name='Inventory Updater',
role_name='Inventory Updater',
)
executor_role = ImplicitRoleField(
role_name='Inventory Executor',
role_name='Inventory Executor',
)
def get_absolute_url(self):
@@ -543,23 +543,23 @@ class Group(CommonModelNameNotUnique, ResourceMixin):
help_text=_('Inventory source(s) that created or modified this group.'),
)
admin_role = ImplicitRoleField(
role_name='Inventory Group Administrator',
role_name='Inventory Group Administrator',
parent_role='inventory.admin_role',
resource_field='resource',
permissions = { 'all': True }
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Inventory Group Auditor',
role_name='Inventory Group Auditor',
parent_role='inventory.auditor_role',
resource_field='resource',
permissions = { 'read': True }
permissions = {'read': True}
)
updater_role = ImplicitRoleField(
role_name='Inventory Group Updater',
role_name='Inventory Group Updater',
parent_role='inventory.updater_role'
)
executor_role = ImplicitRoleField(
role_name='Inventory Group Executor',
role_name='Inventory Group Executor',
parent_role='inventory.executor_role'
)
@@ -1186,7 +1186,7 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions, ResourceMixin)
return 'never updated'
# inherit the child job status
else:
return self.last_job.status
return self.last_job.status
else:
return 'none'

View File

@@ -182,22 +182,22 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
default={},
)
admin_role = ImplicitRoleField(
role_name='Job Template Administrator',
role_name='Job Template Administrator',
parent_role='project.admin_role',
resource_field='resource',
permissions = { 'all': True }
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Job Template Auditor',
role_name='Job Template Auditor',
parent_role='project.auditor_role',
resource_field='resource',
permissions = { 'read': True }
permissions = {'read': True}
)
executor_role = ImplicitRoleField(
role_name='Job Template Executor',
role_name='Job Template Executor',
parent_role='project.auditor_role',
resource_field='resource',
permissions = { 'execute': True }
permissions = {'execute': True}
)
@classmethod

View File

@@ -55,12 +55,12 @@ class Organization(CommonModel, ResourceMixin):
admin_role = ImplicitRoleField(
role_name='Organization Administrator',
resource_field='resource',
permissions = { 'all': True }
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Organization Auditor',
resource_field='resource',
permissions = { 'read': True }
permissions = {'read': True}
)
@@ -118,17 +118,19 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
role_name='Team Administrator',
parent_role='organization.admin_role',
resource_field='resource',
permissions = { 'all': True }
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Team Auditor',
parent_role='organization.auditor_role',
resource_field='resource',
permissions = { 'read': True }
permissions = {'read': True}
)
member_role = ImplicitRoleField(
role_name='Team Member',
parent_role='admin_role',
resource_field='resource',
permissions = {'read':True},
)
def get_absolute_url(self):
@@ -142,6 +144,13 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
cred.mark_inactive()
super(Team, self).mark_inactive(save=save)
def migrate_to_rbac(self):
migrated = []
for user in self.users.all():
self.member_role.members.add(user)
migrated.append(user)
return migrated
class Permission(CommonModelNameNotUnique):
'''
A permission allows a user, project, or team to be able to use an inventory source.

View File

@@ -216,28 +216,28 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
blank=True,
)
admin_role = ImplicitRoleField(
role_name='Project Administrator',
role_name='Project Administrator',
parent_role='organization.admin_role',
resource_field='resource',
permissions = { 'all': True }
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Project Auditor',
role_name='Project Auditor',
parent_role='organization.auditor_role',
resource_field='resource',
permissions = { 'read': True }
permissions = {'read': True}
)
member_role = ImplicitRoleField(
role_name='Project Member',
role_name='Project Member',
parent_role='admin',
resource_field='resource',
permissions = { 'usage': True }
permissions = {'usage': True}
)
scm_update_role = ImplicitRoleField(
role_name='Project Updater',
role_name='Project Updater',
parent_role='admin',
resource_field='resource',
permissions = { 'scm_update': True }
permissions = {'scm_update': True}
)
@classmethod
@@ -333,7 +333,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
if (self.last_job_run + datetime.timedelta(seconds=self.scm_update_cache_timeout)) > now():
return True
return False
@property
def needs_update_on_launch(self):
if self.active and self.scm_type and self.scm_update_on_launch:

View File

@@ -18,7 +18,7 @@ logger = logging.getLogger('awx.main.models.rbac')
class Role(CommonModelNameNotUnique):
'''
Role model
Role model
'''
class Meta:
@@ -82,7 +82,7 @@ class Role(CommonModelNameNotUnique):
except Role.DoesNotExist:
ret = Role(singleton_name=name)
ret.save()
return ret;
return ret
@@ -102,7 +102,7 @@ class RoleHierarchy(CreatedModifiedModel):
class Resource(CommonModelNameNotUnique):
'''
Role model
Role model
'''
class Meta:

View File

@@ -1,11 +1,35 @@
import pytest
from awx.main.models.organization import Organization
from awx.main.models.credential import Credential
from awx.main.models.organization import (
Organization,
Team,
)
from django.contrib.auth.models import User
@pytest.fixture
def user():
def u(name, is_superuser=False):
try:
user = User.objects.get(username=name)
except User.DoesNotExist:
user = User(username=name, is_superuser=is_superuser, password=name)
user.save()
return user
return u
@pytest.fixture
def team(organization):
return Team.objects.create(organization=organization, name='test-team')
@pytest.fixture
def organization():
return Organization.objects.create(name="test-org", description="test-org-desc")
@pytest.fixture
def credential():
return Credential.objects.create(kind='aws', name='test-cred')
@pytest.fixture
def permissions():
return {
@@ -14,5 +38,8 @@ def permissions():
'auditor':{'read':True, 'create':False, 'write':False,
'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':False,},
'usage':{'read':False, 'create':False, 'write':False,
'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,},
}

View File

@@ -0,0 +1,44 @@
import pytest
@pytest.mark.django_db
def test_credential_migration_user(credential, user, permissions):
u = user('user', False)
credential.user = u
migrated = credential.migrate_to_rbac()
assert len(migrated) == 1
assert credential.accessible_by(u, permissions['admin'])
@pytest.mark.django_db
def test_credential_usage_role(credential, user, permissions):
u = user('user', False)
credential.usage_role.members.add(u)
assert credential.accessible_by(u, permissions['usage'])
@pytest.mark.django_db
def test_credential_migration_team_member(credential, team, user, permissions):
u = user('user', False)
team.admin_role.members.add(u)
credential.team = team
# No permissions pre-migration
assert not credential.accessible_by(u, permissions['admin'])
migrated = credential.migrate_to_rbac()
# Admin permissions post migration
assert len(migrated) == 1
assert credential.accessible_by(u, permissions['admin'])
@pytest.mark.django_db
def test_credential_migration_team_admin(credential, team, user, permissions):
u = user('user', False)
team.member_role.members.add(u)
credential.team = team
# No permissions pre-migration
assert not credential.accessible_by(u, permissions['usage'])
# Usage permissions post migration
migrated = credential.migrate_to_rbac()
assert len(migrated) == 1
assert credential.accessible_by(u, permissions['usage'])

View File

@@ -1,51 +1,43 @@
import pytest
from awx.main.access import OrganizationAccess
from django.contrib.auth.models import User
def make_user(name, admin=False):
try:
user = User.objects.get(username=name)
except User.DoesNotExist:
user = User(username=name, is_superuser=admin, password=name)
user.save()
return user
@pytest.mark.django_db
@pytest.mark.parametrize("username,admin", [
("admin", True),
("user", False),
])
def test_organization_migration(organization, permissions, username, admin):
user = make_user(username, admin)
if admin:
organization.admins.add(user)
else:
organization.users.add(user)
def test_organization_migration_admin(organization, permissions, user):
u = user('admin', True)
organization.admins.add(u)
assert not organization.accessible_by(u, permissions['admin'])
migrated_users = organization.migrate_to_rbac()
assert len(migrated_users) == 1
assert migrated_users[0] == user
if admin:
assert organization.accessible_by(user, permissions['admin']) == True
else:
assert organization.accessible_by(user, permissions['auditor']) == True
assert organization.accessible_by(u, permissions['admin'])
@pytest.mark.django_db
@pytest.mark.parametrize("username,admin", [
("admin", True),
("user-admin", False),
("user", False)
])
def test_organization_access(organization, username, admin):
user = make_user(username, admin)
access = OrganizationAccess(user)
if admin:
assert access.can_change(organization, None) == True
elif username == "user-admin":
organization.admins.add(user)
assert access.can_change(organization, None) == True
else:
assert access.can_change(organization, None) == False
def test_organization_migration_user(organization, permissions, user):
u = user('user', False)
organization.users.add(u)
assert not organization.accessible_by(u, permissions['auditor'])
migrated_users = organization.migrate_to_rbac()
assert len(migrated_users) == 1
assert organization.accessible_by(u, permissions['auditor'])
@pytest.mark.django_db
def test_organization_access_superuser(organization, user):
access = OrganizationAccess(user('admin', True))
assert access.can_change(organization, None)
@pytest.mark.django_db
def test_organization_access_admin(organization, user):
u = user('admin', False)
organization.admins.add(u)
access = OrganizationAccess(u)
assert access.can_change(organization, None)
@pytest.mark.django_db
def test_organization_access_user(organization, user):
access = OrganizationAccess(user('user', False))
assert not access.can_change(organization, None)

View File

@@ -0,0 +1,13 @@
import pytest
@pytest.mark.django_db
def test_team_migration_user(team, user, permissions):
u = user('user', False)
team.users.add(u)
assert not team.accessible_by(u, permissions['auditor'])
migrated = team.migrate_to_rbac()
assert len(migrated) == 1
assert team.accessible_by(u, permissions['auditor'])