add basic Organization migration

This commit is contained in:
Wayne Witzel III
2016-02-02 14:47:21 -05:00
committed by Akita Noek
parent c6b2e509fd
commit 932b6a4c82
2 changed files with 47 additions and 8 deletions

View File

@@ -53,12 +53,12 @@ class Organization(CommonModel, ResourceMixin):
related_name='organizations', related_name='organizations',
) )
admin_role = ImplicitRoleField( admin_role = ImplicitRoleField(
role_name='Organization Administrator', role_name='Organization Administrator',
resource_field='resource', resource_field='resource',
permissions = { 'all': True } permissions = { 'all': True }
) )
auditor_role = ImplicitRoleField( auditor_role = ImplicitRoleField(
role_name='Organization Auditor', role_name='Organization Auditor',
resource_field='resource', resource_field='resource',
permissions = { 'read': True } permissions = { 'read': True }
) )
@@ -76,6 +76,16 @@ class Organization(CommonModel, ResourceMixin):
script.save() script.save()
super(Organization, self).mark_inactive(save=save) super(Organization, self).mark_inactive(save=save)
def migrate_to_rbac(self):
migrated_users = []
for admin in self.admins.all():
self.admin_role.members.add(admin)
migrated_users.append(admin)
for user in self.users.all():
self.auditor_role.members.add(user)
migrated_user.append(user)
return migrated_users
class Team(CommonModelNameNotUnique, ResourceMixin): class Team(CommonModelNameNotUnique, ResourceMixin):
''' '''
@@ -105,19 +115,19 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
related_name='teams', related_name='teams',
) )
admin_role = ImplicitRoleField( admin_role = ImplicitRoleField(
role_name='Team Administrator', role_name='Team Administrator',
parent_role='organization.admin_role', parent_role='organization.admin_role',
resource_field='resource', resource_field='resource',
permissions = { 'all': True } permissions = { 'all': True }
) )
auditor_role = ImplicitRoleField( auditor_role = ImplicitRoleField(
role_name='Team Auditor', role_name='Team Auditor',
parent_role='organization.auditor_role', parent_role='organization.auditor_role',
resource_field='resource', resource_field='resource',
permissions = { 'read': True } permissions = { 'read': True }
) )
member_role = ImplicitRoleField( member_role = ImplicitRoleField(
role_name='Team Member', role_name='Team Member',
parent_role='admin_role', parent_role='admin_role',
) )
@@ -210,7 +220,7 @@ class Profile(CreatedModifiedModel):
) )
""" """
Since expiration and session expiration is event driven a token could be Since expiration and session expiration is event driven a token could be
invalidated for both reasons. Further, we only support a single reason for a invalidated for both reasons. Further, we only support a single reason for a
session token being invalid. For this case, mark the token as expired. session token being invalid. For this case, mark the token as expired.
@@ -234,7 +244,7 @@ class AuthToken(BaseModel):
class Meta: class Meta:
app_label = 'main' app_label = 'main'
key = models.CharField(max_length=40, primary_key=True) key = models.CharField(max_length=40, primary_key=True)
user = models.ForeignKey('auth.User', related_name='auth_tokens', user = models.ForeignKey('auth.User', related_name='auth_tokens',
on_delete=models.CASCADE) on_delete=models.CASCADE)
@@ -351,7 +361,7 @@ def user_mark_inactive(user, save=True):
user.is_active = False user.is_active = False
if save: if save:
user.save() user.save()
User.add_to_class('mark_inactive', user_mark_inactive) User.add_to_class('mark_inactive', user_mark_inactive)

View File

@@ -0,0 +1,29 @@
import pytest
from awx.main.models.organization import Organization
from django.contrib.auth.models import User
def make_user(name, admin=False):
email = '%s@example.org' % name
if admin == True:
return User.objects.create_superuser(name, email, name)
else:
return User.objects.create_user(name, email, name)
@pytest.fixture
def organization():
return Organization.objects.create(name="test-org", description="test-org-desc")
@pytest.mark.django_db
@pytest.mark.parametrize("username,admin", [
("admin", True),
("user", False),
])
def test_organization_migration(organization, username, admin):
user = make_user(username, admin)
organization.admins.add(user)
migrated_users = organization.migrate_to_rbac()
assert len(migrated_users) == 1
assert migrated_users[0] == user