mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 10:00:01 -03:30
Merge pull request #877 from wwitzel3/rbac-port-migrations
Convert migrate_to_rbac method to Migrations
This commit is contained in:
commit
6df8017cb1
256
awx/main/migrations/0003_rbac_changes.py
Normal file
256
awx/main/migrations/0003_rbac_changes.py
Normal file
@ -0,0 +1,256 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
import taggit.managers
|
||||
import awx.main.fields
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('taggit', '0002_auto_20150616_2121'),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
('main', '0002_v300_changes'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Resource',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('description', models.TextField(default=b'', blank=True)),
|
||||
('active', models.BooleanField(default=True, editable=False)),
|
||||
('name', models.CharField(max_length=512)),
|
||||
('created_by', models.ForeignKey(related_name="{u'class': 'resource', u'app_label': 'main'}(class)s_created+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('modified_by', models.ForeignKey(related_name="{u'class': 'resource', u'app_label': 'main'}(class)s_modified+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('parent', models.ForeignKey(related_name='children', default=None, to='main.Resource', null=True)),
|
||||
('tags', taggit.managers.TaggableManager(to='taggit.Tag', through='taggit.TaggedItem', blank=True, help_text='A comma-separated list of tags.', verbose_name='Tags')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'main_rbac_resources',
|
||||
'verbose_name_plural': 'resources',
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='Role',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('description', models.TextField(default=b'', blank=True)),
|
||||
('active', models.BooleanField(default=True, editable=False)),
|
||||
('name', models.CharField(max_length=512)),
|
||||
('singleton_name', models.TextField(default=None, unique=True, null=True, db_index=True)),
|
||||
('created_by', models.ForeignKey(related_name="{u'class': 'role', u'app_label': 'main'}(class)s_created+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('members', models.ManyToManyField(related_name='roles', to=settings.AUTH_USER_MODEL)),
|
||||
('modified_by', models.ForeignKey(related_name="{u'class': 'role', u'app_label': 'main'}(class)s_modified+", on_delete=django.db.models.deletion.SET_NULL, default=None, editable=False, to=settings.AUTH_USER_MODEL, null=True)),
|
||||
('parents', models.ManyToManyField(related_name='children', to='main.Role')),
|
||||
('tags', taggit.managers.TaggableManager(to='taggit.Tag', through='taggit.TaggedItem', blank=True, help_text='A comma-separated list of tags.', verbose_name='Tags')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'main_rbac_roles',
|
||||
'verbose_name_plural': 'roles',
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='RoleHierarchy',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('ancestor', models.ForeignKey(related_name='+', to='main.Role')),
|
||||
('role', models.ForeignKey(related_name='+', to='main.Role')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'main_rbac_role_hierarchy',
|
||||
'verbose_name_plural': 'role_ancestors',
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='RolePermission',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('created', models.DateTimeField(default=None, editable=False)),
|
||||
('modified', models.DateTimeField(default=None, editable=False)),
|
||||
('create', models.IntegerField(default=0)),
|
||||
('read', models.IntegerField(default=0)),
|
||||
('write', models.IntegerField(default=0)),
|
||||
('update', models.IntegerField(default=0)),
|
||||
('delete', models.IntegerField(default=0)),
|
||||
('execute', models.IntegerField(default=0)),
|
||||
('scm_update', models.IntegerField(default=0)),
|
||||
('use', models.IntegerField(default=0)),
|
||||
('resource', models.ForeignKey(related_name='permissions', to='main.Resource')),
|
||||
('role', models.ForeignKey(related_name='permissions', to='main.Role')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'main_rbac_permissions',
|
||||
'verbose_name_plural': 'permissions',
|
||||
},
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='organization',
|
||||
field=models.ForeignKey(related_name='project_list', on_delete=django.db.models.deletion.SET_NULL, to='main.Organization', null=True),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='credential',
|
||||
name='owner_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='credential',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='credential',
|
||||
name='usage_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='group',
|
||||
name='admin_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='group',
|
||||
name='auditor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='group',
|
||||
name='executor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='group',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='group',
|
||||
name='updater_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='host',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='admin_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='auditor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='executor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventory',
|
||||
name='updater_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='inventorysource',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='jobtemplate',
|
||||
name='admin_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='jobtemplate',
|
||||
name='auditor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='jobtemplate',
|
||||
name='executor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='jobtemplate',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='organization',
|
||||
name='admin_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='organization',
|
||||
name='auditor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='organization',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='admin_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='auditor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='member_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='project',
|
||||
name='scm_update_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='team',
|
||||
name='admin_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='team',
|
||||
name='auditor_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='team',
|
||||
name='member_role',
|
||||
field=awx.main.fields.ImplicitRoleField(related_name='+', to='main.Role', null=b'True'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='team',
|
||||
name='resource',
|
||||
field=awx.main.fields.ImplicitResourceField(related_name='+', to='main.Resource', null=b'True'),
|
||||
),
|
||||
]
|
||||
19
awx/main/migrations/0004_rbac_migrations.py
Normal file
19
awx/main/migrations/0004_rbac_migrations.py
Normal file
@ -0,0 +1,19 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from awx.main.migrations import _rbac as rbac
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0003_rbac_changes'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(rbac.migrate_organization),
|
||||
migrations.RunPython(rbac.migrate_credential),
|
||||
migrations.RunPython(rbac.migrate_team),
|
||||
migrations.RunPython(rbac.migrate_inventory),
|
||||
]
|
||||
78
awx/main/migrations/_rbac.py
Normal file
78
awx/main/migrations/_rbac.py
Normal file
@ -0,0 +1,78 @@
|
||||
from collections import defaultdict
|
||||
|
||||
def migrate_organization(apps, schema_editor):
|
||||
migrations = defaultdict(list)
|
||||
organization = apps.get_model('main', "Organization")
|
||||
for org in organization.objects.all():
|
||||
for admin in org.admins.all():
|
||||
org.admin_role.members.add(admin)
|
||||
migrations[org.name].append(admin)
|
||||
for user in org.users.all():
|
||||
org.auditor_role.members.add(user)
|
||||
migrations[org.name].append(user)
|
||||
return migrations
|
||||
|
||||
def migrate_team(apps, schema_editor):
|
||||
migrations = defaultdict(list)
|
||||
team = apps.get_model('main', 'Team')
|
||||
for t in team.objects.all():
|
||||
for user in t.users.all():
|
||||
t.member_role.members.add(user)
|
||||
migrations[t.name].append(user)
|
||||
return migrations
|
||||
|
||||
def migrate_credential(apps, schema_editor):
|
||||
migrations = defaultdict(list)
|
||||
credential = apps.get_model('main', "Credential")
|
||||
for cred in credential.objects.all():
|
||||
if cred.user:
|
||||
cred.owner_role.members.add(cred.user)
|
||||
migrations[cred.name].append(cred.user)
|
||||
elif cred.team:
|
||||
cred.owner_role.parents.add(cred.team.admin_role)
|
||||
cred.usage_role.parents.add(cred.team.member_role)
|
||||
migrations[cred.name].append(cred.team)
|
||||
return migrations
|
||||
|
||||
def migrate_inventory(apps, schema_editor):
|
||||
migrations = defaultdict(dict)
|
||||
|
||||
Inventory = apps.get_model('main', 'Inventory')
|
||||
Permission = apps.get_model('main', 'Permission')
|
||||
|
||||
for inventory in Inventory.objects.all():
|
||||
teams, users = [], []
|
||||
for perm in Permission.objects.filter(inventory=inventory):
|
||||
role = None
|
||||
execrole = None
|
||||
if perm.permission_type == 'admin':
|
||||
role = inventory.admin_role
|
||||
pass
|
||||
elif perm.permission_type == 'read':
|
||||
role = inventory.auditor_role
|
||||
pass
|
||||
elif perm.permission_type == 'write':
|
||||
role = inventory.updater_role
|
||||
pass
|
||||
else:
|
||||
raise Exception('Unhandled permission type for inventory: %s' % perm.permission_type)
|
||||
if perm.run_ad_hoc_commands:
|
||||
execrole = inventory.executor_role
|
||||
|
||||
if perm.team:
|
||||
if role:
|
||||
perm.team.member_role.children.add(role)
|
||||
if execrole:
|
||||
perm.team.member_role.children.add(execrole)
|
||||
|
||||
teams.append(perm.team)
|
||||
|
||||
if perm.user:
|
||||
if role:
|
||||
role.members.add(perm.user)
|
||||
if execrole:
|
||||
execrole.members.add(perm.user)
|
||||
users.append(perm.user)
|
||||
migrations[inventory.name]['teams'] = teams
|
||||
migrations[inventory.name]['users'] = users
|
||||
return migrations
|
||||
@ -363,14 +363,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
|
||||
update_fields.append('cloud')
|
||||
super(Credential, self).save(*args, **kwargs)
|
||||
|
||||
def migrate_to_rbac(self):
|
||||
if self.user:
|
||||
self.owner_role.members.add(self.user)
|
||||
return [self.user]
|
||||
elif self.team:
|
||||
self.owner_role.parents.add(self.team.admin_role)
|
||||
self.usage_role.parents.add(self.team.member_role)
|
||||
return [self.team]
|
||||
|
||||
def validate_ssh_private_key(data):
|
||||
"""Validate that the given SSH private key or certificate is,
|
||||
|
||||
@ -113,48 +113,6 @@ class Inventory(CommonModel, ResourceMixin):
|
||||
role_name='Inventory Executor',
|
||||
)
|
||||
|
||||
def migrate_to_rbac(self):
|
||||
migrated_users = []
|
||||
migrated_teams = []
|
||||
|
||||
for perm in Permission.objects.filter(inventory=self):
|
||||
role = None
|
||||
execrole = None
|
||||
if perm.permission_type == 'admin':
|
||||
role = self.admin_role
|
||||
pass
|
||||
elif perm.permission_type == 'read':
|
||||
role = self.auditor_role
|
||||
pass
|
||||
elif perm.permission_type == 'write':
|
||||
role = self.updater_role
|
||||
pass
|
||||
else:
|
||||
raise Exception('Unhandled permission type for inventory: %s' % perm.permission_type)
|
||||
if perm.run_ad_hoc_commands:
|
||||
execrole = self.executor_role
|
||||
|
||||
if perm.team:
|
||||
if role:
|
||||
perm.team.member_role.children.add(role)
|
||||
if execrole:
|
||||
perm.team.member_role.children.add(execrole)
|
||||
|
||||
migrated_teams.append(perm.team)
|
||||
|
||||
if perm.user:
|
||||
if role:
|
||||
role.members.add(perm.user)
|
||||
if execrole:
|
||||
execrole.members.add(perm.user)
|
||||
migrated_users.append(perm.user)
|
||||
|
||||
return {
|
||||
'migrated_users': migrated_users,
|
||||
'migrated_teams': migrated_teams,
|
||||
}
|
||||
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('api:inventory_detail', args=(self.pk,))
|
||||
|
||||
|
||||
@ -76,16 +76,6 @@ class Organization(CommonModel, ResourceMixin):
|
||||
script.save()
|
||||
super(Organization, self).mark_inactive(save=save)
|
||||
|
||||
def migrate_to_rbac(self):
|
||||
migrated_users = []
|
||||
for admin in self.admins.all():
|
||||
self.admin_role.members.add(admin)
|
||||
migrated_users.append(admin)
|
||||
for user in self.users.all():
|
||||
self.auditor_role.members.add(user)
|
||||
migrated_users.append(user)
|
||||
return migrated_users
|
||||
|
||||
|
||||
class Team(CommonModelNameNotUnique, ResourceMixin):
|
||||
'''
|
||||
@ -144,12 +134,6 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
|
||||
cred.mark_inactive()
|
||||
super(Team, self).mark_inactive(save=save)
|
||||
|
||||
def migrate_to_rbac(self):
|
||||
migrated = []
|
||||
for user in self.users.all():
|
||||
self.member_role.members.add(user)
|
||||
migrated.append(user)
|
||||
return migrated
|
||||
|
||||
class Permission(CommonModelNameNotUnique):
|
||||
'''
|
||||
|
||||
@ -1,10 +1,16 @@
|
||||
import pytest
|
||||
|
||||
from awx.main.migrations import _rbac as rbac
|
||||
from django.apps import apps
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_credential_migration_user(credential, user, permissions):
|
||||
u = user('user', False)
|
||||
credential.user = u
|
||||
migrated = credential.migrate_to_rbac()
|
||||
credential.save()
|
||||
|
||||
migrated = rbac.migrate_credential(apps, None)
|
||||
|
||||
assert len(migrated) == 1
|
||||
assert credential.accessible_by(u, permissions['admin'])
|
||||
|
||||
@ -19,11 +25,13 @@ def test_credential_migration_team_member(credential, team, user, permissions):
|
||||
u = user('user', False)
|
||||
team.admin_role.members.add(u)
|
||||
credential.team = team
|
||||
credential.save()
|
||||
|
||||
# No permissions pre-migration
|
||||
assert not credential.accessible_by(u, permissions['admin'])
|
||||
|
||||
migrated = credential.migrate_to_rbac()
|
||||
migrated = rbac.migrate_credential(apps, None)
|
||||
|
||||
# Admin permissions post migration
|
||||
assert len(migrated) == 1
|
||||
assert credential.accessible_by(u, permissions['admin'])
|
||||
@ -33,12 +41,13 @@ def test_credential_migration_team_admin(credential, team, user, permissions):
|
||||
u = user('user', False)
|
||||
team.member_role.members.add(u)
|
||||
credential.team = team
|
||||
credential.save()
|
||||
|
||||
# No permissions pre-migration
|
||||
assert not credential.accessible_by(u, permissions['usage'])
|
||||
|
||||
# Usage permissions post migration
|
||||
migrated = credential.migrate_to_rbac()
|
||||
migrated = rbac.migrate_credential(apps, None)
|
||||
assert len(migrated) == 1
|
||||
assert credential.accessible_by(u, permissions['usage'])
|
||||
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
import pytest
|
||||
|
||||
from awx.main.migrations import _rbac as rbac
|
||||
from awx.main.models import Permission
|
||||
from django.apps import apps
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_inventory_admin_user(inventory, permissions, user):
|
||||
@ -10,10 +12,10 @@ def test_inventory_admin_user(inventory, permissions, user):
|
||||
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(migrations['migrated_users']) == 1
|
||||
assert len(migrations['migrated_teams']) == 0
|
||||
assert len(migrations[inventory.name]['users']) == 1
|
||||
assert len(migrations[inventory.name]['teams']) == 0
|
||||
assert inventory.accessible_by(u, permissions['admin'])
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.updater_role.members.filter(id=u.id).exists() is False
|
||||
@ -27,10 +29,10 @@ def test_inventory_auditor_user(inventory, permissions, user):
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(migrations['migrated_users']) == 1
|
||||
assert len(migrations['migrated_teams']) == 0
|
||||
assert len(migrations[inventory.name]['users']) == 1
|
||||
assert len(migrations[inventory.name]['teams']) == 0
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is True
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||
@ -45,10 +47,10 @@ def test_inventory_updater_user(inventory, permissions, user):
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(migrations['migrated_users']) == 1
|
||||
assert len(migrations['migrated_teams']) == 0
|
||||
assert len(migrations[inventory.name]['users']) == 1
|
||||
assert len(migrations[inventory.name]['teams']) == 0
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.updater_role.members.filter(id=u.id).exists()
|
||||
@ -62,10 +64,10 @@ def test_inventory_executor_user(inventory, permissions, user):
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(migrations['migrated_users']) == 1
|
||||
assert len(migrations['migrated_teams']) == 0
|
||||
assert len(migrations[inventory.name]['users']) == 1
|
||||
assert len(migrations[inventory.name]['teams']) == 0
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is True
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists()
|
||||
@ -82,13 +84,13 @@ def test_inventory_admin_team(inventory, permissions, user, team):
|
||||
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
|
||||
team_migrations = team.migrate_to_rbac()
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
team_migrations = rbac.migrate_team(apps, None)
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(team_migrations) == 1
|
||||
assert team.member_role.members.count() == 1
|
||||
assert len(migrations['migrated_users']) == 0
|
||||
assert len(migrations['migrated_teams']) == 1
|
||||
assert len(migrations[inventory.name]['users']) == 0
|
||||
assert len(migrations[inventory.name]['teams']) == 1
|
||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||
@ -107,13 +109,13 @@ def test_inventory_auditor(inventory, permissions, user, team):
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||
|
||||
team_migrations = team.migrate_to_rbac()
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
team_migrations = rbac.migrate_team(apps,None)
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(team_migrations) == 1
|
||||
assert team.member_role.members.count() == 1
|
||||
assert len(migrations['migrated_users']) == 0
|
||||
assert len(migrations['migrated_teams']) == 1
|
||||
assert len(migrations[inventory.name]['users']) == 0
|
||||
assert len(migrations[inventory.name]['teams']) == 1
|
||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||
@ -131,13 +133,13 @@ def test_inventory_updater(inventory, permissions, user, team):
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||
|
||||
team_migrations = team.migrate_to_rbac()
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
team_migrations = rbac.migrate_team(apps,None)
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(team_migrations) == 1
|
||||
assert team.member_role.members.count() == 1
|
||||
assert len(migrations['migrated_users']) == 0
|
||||
assert len(migrations['migrated_teams']) == 1
|
||||
assert len(migrations[inventory.name]['users']) == 0
|
||||
assert len(migrations[inventory.name]['teams']) == 1
|
||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||
@ -156,13 +158,13 @@ def test_inventory_executor(inventory, permissions, user, team):
|
||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||
|
||||
team_migrations = team.migrate_to_rbac()
|
||||
migrations = inventory.migrate_to_rbac()
|
||||
team_migrations = rbac.migrate_team(apps, None)
|
||||
migrations = rbac.migrate_inventory(apps, None)
|
||||
|
||||
assert len(team_migrations) == 1
|
||||
assert team.member_role.members.count() == 1
|
||||
assert len(migrations['migrated_users']) == 0
|
||||
assert len(migrations['migrated_teams']) == 1
|
||||
assert len(migrations[inventory.name]['users']) == 0
|
||||
assert len(migrations[inventory.name]['teams']) == 1
|
||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
import pytest
|
||||
|
||||
from awx.main.migrations import _rbac as rbac
|
||||
from awx.main.access import OrganizationAccess
|
||||
from django.apps import apps
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_migration_admin(organization, permissions, user):
|
||||
@ -9,8 +12,9 @@ def test_organization_migration_admin(organization, permissions, user):
|
||||
|
||||
assert not organization.accessible_by(u, permissions['admin'])
|
||||
|
||||
migrated_users = organization.migrate_to_rbac()
|
||||
assert len(migrated_users) == 1
|
||||
migrations = rbac.migrate_organization(apps, None)
|
||||
|
||||
assert len(migrations) == 1
|
||||
assert organization.accessible_by(u, permissions['admin'])
|
||||
|
||||
@pytest.mark.django_db
|
||||
@ -20,8 +24,9 @@ def test_organization_migration_user(organization, permissions, user):
|
||||
|
||||
assert not organization.accessible_by(u, permissions['auditor'])
|
||||
|
||||
migrated_users = organization.migrate_to_rbac()
|
||||
assert len(migrated_users) == 1
|
||||
migrations = rbac.migrate_organization(apps, None)
|
||||
|
||||
assert len(migrations) == 1
|
||||
assert organization.accessible_by(u, permissions['auditor'])
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
||||
@ -1,13 +1,17 @@
|
||||
import pytest
|
||||
|
||||
from awx.main.migrations import _rbac as rbac
|
||||
from django.apps import apps
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_team_migration_user(team, user, permissions):
|
||||
u = user('user', False)
|
||||
team.users.add(u)
|
||||
team.save()
|
||||
|
||||
assert not team.accessible_by(u, permissions['auditor'])
|
||||
|
||||
migrated = team.migrate_to_rbac()
|
||||
migrated = rbac.migrate_team(apps, None)
|
||||
|
||||
assert len(migrated) == 1
|
||||
assert team.accessible_by(u, permissions['auditor'])
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user