mirror of
https://github.com/ansible/awx.git
synced 2026-03-04 18:21:03 -03:30
convert Inventory to django migrations
This commit is contained in:
@@ -15,4 +15,5 @@ class Migration(migrations.Migration):
|
|||||||
migrations.RunPython(rbac.migrate_organization),
|
migrations.RunPython(rbac.migrate_organization),
|
||||||
migrations.RunPython(rbac.migrate_credential),
|
migrations.RunPython(rbac.migrate_credential),
|
||||||
migrations.RunPython(rbac.migrate_team),
|
migrations.RunPython(rbac.migrate_team),
|
||||||
|
migrations.RunPython(rbac.migrate_inventory),
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -33,3 +33,46 @@ def migrate_credential(apps, schema_editor):
|
|||||||
cred.usage_role.parents.add(cred.team.member_role)
|
cred.usage_role.parents.add(cred.team.member_role)
|
||||||
migrations[cred.name].append(cred.team)
|
migrations[cred.name].append(cred.team)
|
||||||
return migrations
|
return migrations
|
||||||
|
|
||||||
|
def migrate_inventory(apps, schema_editor):
|
||||||
|
migrations = defaultdict(dict)
|
||||||
|
|
||||||
|
Inventory = apps.get_model('main', 'Inventory')
|
||||||
|
Permission = apps.get_model('main', 'Permission')
|
||||||
|
|
||||||
|
for inventory in Inventory.objects.all():
|
||||||
|
teams, users = [], []
|
||||||
|
for perm in Permission.objects.filter(inventory=inventory):
|
||||||
|
role = None
|
||||||
|
execrole = None
|
||||||
|
if perm.permission_type == 'admin':
|
||||||
|
role = inventory.admin_role
|
||||||
|
pass
|
||||||
|
elif perm.permission_type == 'read':
|
||||||
|
role = inventory.auditor_role
|
||||||
|
pass
|
||||||
|
elif perm.permission_type == 'write':
|
||||||
|
role = inventory.updater_role
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise Exception('Unhandled permission type for inventory: %s' % perm.permission_type)
|
||||||
|
if perm.run_ad_hoc_commands:
|
||||||
|
execrole = inventory.executor_role
|
||||||
|
|
||||||
|
if perm.team:
|
||||||
|
if role:
|
||||||
|
perm.team.member_role.children.add(role)
|
||||||
|
if execrole:
|
||||||
|
perm.team.member_role.children.add(execrole)
|
||||||
|
|
||||||
|
teams.append(perm.team)
|
||||||
|
|
||||||
|
if perm.user:
|
||||||
|
if role:
|
||||||
|
role.members.add(perm.user)
|
||||||
|
if execrole:
|
||||||
|
execrole.members.add(perm.user)
|
||||||
|
users.append(perm.user)
|
||||||
|
migrations[inventory.name]['teams'] = teams
|
||||||
|
migrations[inventory.name]['users'] = users
|
||||||
|
return migrations
|
||||||
|
|||||||
@@ -113,48 +113,6 @@ class Inventory(CommonModel, ResourceMixin):
|
|||||||
role_name='Inventory Executor',
|
role_name='Inventory Executor',
|
||||||
)
|
)
|
||||||
|
|
||||||
def migrate_to_rbac(self):
|
|
||||||
migrated_users = []
|
|
||||||
migrated_teams = []
|
|
||||||
|
|
||||||
for perm in Permission.objects.filter(inventory=self):
|
|
||||||
role = None
|
|
||||||
execrole = None
|
|
||||||
if perm.permission_type == 'admin':
|
|
||||||
role = self.admin_role
|
|
||||||
pass
|
|
||||||
elif perm.permission_type == 'read':
|
|
||||||
role = self.auditor_role
|
|
||||||
pass
|
|
||||||
elif perm.permission_type == 'write':
|
|
||||||
role = self.updater_role
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise Exception('Unhandled permission type for inventory: %s' % perm.permission_type)
|
|
||||||
if perm.run_ad_hoc_commands:
|
|
||||||
execrole = self.executor_role
|
|
||||||
|
|
||||||
if perm.team:
|
|
||||||
if role:
|
|
||||||
perm.team.member_role.children.add(role)
|
|
||||||
if execrole:
|
|
||||||
perm.team.member_role.children.add(execrole)
|
|
||||||
|
|
||||||
migrated_teams.append(perm.team)
|
|
||||||
|
|
||||||
if perm.user:
|
|
||||||
if role:
|
|
||||||
role.members.add(perm.user)
|
|
||||||
if execrole:
|
|
||||||
execrole.members.add(perm.user)
|
|
||||||
migrated_users.append(perm.user)
|
|
||||||
|
|
||||||
return {
|
|
||||||
'migrated_users': migrated_users,
|
|
||||||
'migrated_teams': migrated_teams,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def get_absolute_url(self):
|
def get_absolute_url(self):
|
||||||
return reverse('api:inventory_detail', args=(self.pk,))
|
return reverse('api:inventory_detail', args=(self.pk,))
|
||||||
|
|
||||||
|
|||||||
@@ -12,10 +12,10 @@ def test_inventory_admin_user(inventory, permissions, user):
|
|||||||
|
|
||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
|
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(migrations['migrated_users']) == 1
|
assert len(migrations[inventory.name]['users']) == 1
|
||||||
assert len(migrations['migrated_teams']) == 0
|
assert len(migrations[inventory.name]['teams']) == 0
|
||||||
assert inventory.accessible_by(u, permissions['admin'])
|
assert inventory.accessible_by(u, permissions['admin'])
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.updater_role.members.filter(id=u.id).exists() is False
|
assert inventory.updater_role.members.filter(id=u.id).exists() is False
|
||||||
@@ -29,10 +29,10 @@ def test_inventory_auditor_user(inventory, permissions, user):
|
|||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||||
|
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(migrations['migrated_users']) == 1
|
assert len(migrations[inventory.name]['users']) == 1
|
||||||
assert len(migrations['migrated_teams']) == 0
|
assert len(migrations[inventory.name]['teams']) == 0
|
||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
assert inventory.accessible_by(u, permissions['auditor']) is True
|
assert inventory.accessible_by(u, permissions['auditor']) is True
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||||
@@ -47,10 +47,10 @@ def test_inventory_updater_user(inventory, permissions, user):
|
|||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||||
|
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(migrations['migrated_users']) == 1
|
assert len(migrations[inventory.name]['users']) == 1
|
||||||
assert len(migrations['migrated_teams']) == 0
|
assert len(migrations[inventory.name]['teams']) == 0
|
||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.updater_role.members.filter(id=u.id).exists()
|
assert inventory.updater_role.members.filter(id=u.id).exists()
|
||||||
@@ -64,10 +64,10 @@ def test_inventory_executor_user(inventory, permissions, user):
|
|||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||||
|
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(migrations['migrated_users']) == 1
|
assert len(migrations[inventory.name]['users']) == 1
|
||||||
assert len(migrations['migrated_teams']) == 0
|
assert len(migrations[inventory.name]['teams']) == 0
|
||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
assert inventory.accessible_by(u, permissions['auditor']) is True
|
assert inventory.accessible_by(u, permissions['auditor']) is True
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists()
|
assert inventory.executor_role.members.filter(id=u.id).exists()
|
||||||
@@ -85,12 +85,12 @@ def test_inventory_admin_team(inventory, permissions, user, team):
|
|||||||
assert inventory.accessible_by(u, permissions['admin']) is False
|
assert inventory.accessible_by(u, permissions['admin']) is False
|
||||||
|
|
||||||
team_migrations = rbac.migrate_team(apps, None)
|
team_migrations = rbac.migrate_team(apps, None)
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(team_migrations) == 1
|
assert len(team_migrations) == 1
|
||||||
assert team.member_role.members.count() == 1
|
assert team.member_role.members.count() == 1
|
||||||
assert len(migrations['migrated_users']) == 0
|
assert len(migrations[inventory.name]['users']) == 0
|
||||||
assert len(migrations['migrated_teams']) == 1
|
assert len(migrations[inventory.name]['teams']) == 1
|
||||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||||
@@ -110,12 +110,12 @@ def test_inventory_auditor(inventory, permissions, user, team):
|
|||||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||||
|
|
||||||
team_migrations = rbac.migrate_team(apps,None)
|
team_migrations = rbac.migrate_team(apps,None)
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(team_migrations) == 1
|
assert len(team_migrations) == 1
|
||||||
assert team.member_role.members.count() == 1
|
assert team.member_role.members.count() == 1
|
||||||
assert len(migrations['migrated_users']) == 0
|
assert len(migrations[inventory.name]['users']) == 0
|
||||||
assert len(migrations['migrated_teams']) == 1
|
assert len(migrations[inventory.name]['teams']) == 1
|
||||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||||
@@ -134,12 +134,12 @@ def test_inventory_updater(inventory, permissions, user, team):
|
|||||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||||
|
|
||||||
team_migrations = rbac.migrate_team(apps,None)
|
team_migrations = rbac.migrate_team(apps,None)
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(team_migrations) == 1
|
assert len(team_migrations) == 1
|
||||||
assert team.member_role.members.count() == 1
|
assert team.member_role.members.count() == 1
|
||||||
assert len(migrations['migrated_users']) == 0
|
assert len(migrations[inventory.name]['users']) == 0
|
||||||
assert len(migrations['migrated_teams']) == 1
|
assert len(migrations[inventory.name]['teams']) == 1
|
||||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||||
@@ -159,12 +159,12 @@ def test_inventory_executor(inventory, permissions, user, team):
|
|||||||
assert inventory.accessible_by(u, permissions['auditor']) is False
|
assert inventory.accessible_by(u, permissions['auditor']) is False
|
||||||
|
|
||||||
team_migrations = rbac.migrate_team(apps, None)
|
team_migrations = rbac.migrate_team(apps, None)
|
||||||
migrations = inventory.migrate_to_rbac()
|
migrations = rbac.migrate_inventory(apps, None)
|
||||||
|
|
||||||
assert len(team_migrations) == 1
|
assert len(team_migrations) == 1
|
||||||
assert team.member_role.members.count() == 1
|
assert team.member_role.members.count() == 1
|
||||||
assert len(migrations['migrated_users']) == 0
|
assert len(migrations[inventory.name]['users']) == 0
|
||||||
assert len(migrations['migrated_teams']) == 1
|
assert len(migrations[inventory.name]['teams']) == 1
|
||||||
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
assert inventory.admin_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
assert inventory.auditor_role.members.filter(id=u.id).exists() is False
|
||||||
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
assert inventory.executor_role.members.filter(id=u.id).exists() is False
|
||||||
|
|||||||
Reference in New Issue
Block a user