mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 01:57:35 -03:30
[RBAC] Rename managed role definitions, and move migration logic here (#15087)
* Rename managed role definitions, and move migration logic here * Fix naming capitalization
This commit is contained in:
parent
c98727d83e
commit
818c326160
@ -2,9 +2,7 @@
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
from awx.main.migrations._dab_rbac import migrate_to_new_rbac, create_permissions_as_operation
|
||||
|
||||
from ansible_base.rbac.migrations._managed_definitions import setup_managed_role_definitions
|
||||
from awx.main.migrations._dab_rbac import migrate_to_new_rbac, create_permissions_as_operation, setup_managed_role_definitions
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
@ -3,12 +3,15 @@ import logging
|
||||
|
||||
from django.apps import apps as global_apps
|
||||
from django.db.models import ForeignKey
|
||||
from django.conf import settings
|
||||
from ansible_base.rbac.migrations._utils import give_permissions
|
||||
from ansible_base.rbac.management import create_dab_permissions
|
||||
|
||||
from awx.main.fields import ImplicitRoleField
|
||||
from awx.main.constants import role_name_to_perm_mapping
|
||||
|
||||
from ansible_base.rbac.permission_registry import permission_registry
|
||||
|
||||
|
||||
logger = logging.getLogger('awx.main.migrations._dab_rbac')
|
||||
|
||||
@ -194,7 +197,7 @@ def migrate_to_new_rbac(apps, schema_editor):
|
||||
role_definition = managed_definitions[permissions]
|
||||
else:
|
||||
action = role.role_field.rsplit('_', 1)[0] # remove the _field ending of the name
|
||||
role_definition_name = f'{role.content_type.model}-{action}'
|
||||
role_definition_name = f'{role.content_type.model_class().__name__} {action.title()}'
|
||||
|
||||
description = role_descriptions[role.role_field]
|
||||
if type(description) == dict:
|
||||
@ -241,3 +244,100 @@ def migrate_to_new_rbac(apps, schema_editor):
|
||||
ct += 1
|
||||
if ct:
|
||||
logger.info(f'Migrated {ct} users to new system auditor flag')
|
||||
|
||||
|
||||
def get_or_create_managed(name, description, ct, permissions, RoleDefinition):
|
||||
role_definition, created = RoleDefinition.objects.get_or_create(name=name, defaults={'managed': True, 'description': description, 'content_type': ct})
|
||||
role_definition.permissions.set(list(permissions))
|
||||
|
||||
if not role_definition.managed:
|
||||
role_definition.managed = True
|
||||
role_definition.save(update_fields=['managed'])
|
||||
|
||||
if created:
|
||||
logger.info(f'Created RoleDefinition {role_definition.name} pk={role_definition} with {len(permissions)} permissions')
|
||||
|
||||
return role_definition
|
||||
|
||||
|
||||
def setup_managed_role_definitions(apps, schema_editor):
|
||||
"""
|
||||
Idepotent method to create or sync the managed role definitions
|
||||
"""
|
||||
to_create = settings.ANSIBLE_BASE_ROLE_PRECREATE
|
||||
|
||||
ContentType = apps.get_model('contenttypes', 'ContentType')
|
||||
Permission = apps.get_model('dab_rbac', 'DABPermission')
|
||||
RoleDefinition = apps.get_model('dab_rbac', 'RoleDefinition')
|
||||
Organization = apps.get_model(settings.ANSIBLE_BASE_ORGANIZATION_MODEL)
|
||||
org_ct = ContentType.objects.get_for_model(Organization)
|
||||
managed_role_definitions = []
|
||||
|
||||
org_perms = set()
|
||||
for cls in permission_registry._registry:
|
||||
ct = ContentType.objects.get_for_model(cls)
|
||||
object_perms = set(Permission.objects.filter(content_type=ct))
|
||||
# Special case for InstanceGroup which has an organiation field, but is not an organization child object
|
||||
if cls._meta.model_name != 'instancegroup':
|
||||
org_perms.update(object_perms)
|
||||
|
||||
if 'object_admin' in to_create and cls != Organization:
|
||||
indiv_perms = object_perms.copy()
|
||||
add_perms = [perm for perm in indiv_perms if perm.codename.startswith('add_')]
|
||||
if add_perms:
|
||||
for perm in add_perms:
|
||||
indiv_perms.remove(perm)
|
||||
|
||||
managed_role_definitions.append(
|
||||
get_or_create_managed(
|
||||
to_create['object_admin'].format(cls=cls), f'Has all permissions to a single {cls._meta.verbose_name}', ct, indiv_perms, RoleDefinition
|
||||
)
|
||||
)
|
||||
|
||||
if 'org_children' in to_create and cls != Organization:
|
||||
org_child_perms = object_perms.copy()
|
||||
org_child_perms.add(Permission.objects.get(codename='view_organization'))
|
||||
|
||||
managed_role_definitions.append(
|
||||
get_or_create_managed(
|
||||
to_create['org_children'].format(cls=cls),
|
||||
f'Has all permissions to {cls._meta.verbose_name_plural} within an organization',
|
||||
org_ct,
|
||||
org_child_perms,
|
||||
RoleDefinition,
|
||||
)
|
||||
)
|
||||
|
||||
if 'special' in to_create:
|
||||
special_perms = []
|
||||
for perm in object_perms:
|
||||
if perm.codename.split('_')[0] not in ('add', 'change', 'update', 'delete', 'view'):
|
||||
special_perms.append(perm)
|
||||
for perm in special_perms:
|
||||
action = perm.codename.split('_')[0]
|
||||
view_perm = Permission.objects.get(content_type=ct, codename__startswith='view_')
|
||||
managed_role_definitions.append(
|
||||
get_or_create_managed(
|
||||
to_create['special'].format(cls=cls, action=action.title()),
|
||||
f'Has {action} permissions to a single {cls._meta.verbose_name}',
|
||||
ct,
|
||||
[perm, view_perm],
|
||||
RoleDefinition,
|
||||
)
|
||||
)
|
||||
|
||||
if 'org_admin' in to_create:
|
||||
managed_role_definitions.append(
|
||||
get_or_create_managed(
|
||||
to_create['org_admin'].format(cls=Organization),
|
||||
'Has all permissions to a single organization and all objects inside of it',
|
||||
org_ct,
|
||||
org_perms,
|
||||
RoleDefinition,
|
||||
)
|
||||
)
|
||||
|
||||
unexpected_role_definitions = RoleDefinition.objects.filter(managed=True).exclude(pk__in=[rd.pk for rd in managed_role_definitions])
|
||||
for role_definition in unexpected_role_definitions:
|
||||
logger.info(f'Deleting old managed role definition {role_definition.name}, pk={role_definition.pk}')
|
||||
role_definition.delete()
|
||||
|
||||
@ -553,7 +553,7 @@ def get_role_definition(role):
|
||||
return
|
||||
f = obj._meta.get_field(role.role_field)
|
||||
action_name = f.name.rsplit("_", 1)[0]
|
||||
rd_name = f'{obj._meta.model_name}-{action_name}-compat'
|
||||
rd_name = f'{type(obj).__name__} {action_name.title()} Compat'
|
||||
perm_list = get_role_codenames(role)
|
||||
defaults = {'content_type_id': role.content_type_id}
|
||||
try:
|
||||
@ -573,23 +573,26 @@ def get_role_from_object_role(object_role):
|
||||
reverses naming from get_role_definition, and the ANSIBLE_BASE_ROLE_PRECREATE setting.
|
||||
"""
|
||||
rd = object_role.role_definition
|
||||
if rd.name.endswith('-compat'):
|
||||
model_name, role_name, _ = rd.name.split('-')
|
||||
if rd.name.endswith(' Compat'):
|
||||
model_name, role_name, _ = rd.name.split()
|
||||
role_name = role_name.lower()
|
||||
role_name += '_role'
|
||||
elif rd.name.endswith('-admin') and rd.name.count('-') == 2:
|
||||
# cases like "organization-project-admin"
|
||||
model_name, target_model_name, role_name = rd.name.split('-')
|
||||
elif rd.name.endswith(' Admin') and rd.name.count(' ') == 2:
|
||||
# cases like "Organization Project Admin"
|
||||
model_name, target_model_name, role_name = rd.name.split()
|
||||
role_name = role_name.lower()
|
||||
model_cls = apps.get_model('main', target_model_name)
|
||||
target_model_name = get_type_for_model(model_cls)
|
||||
if target_model_name == 'notification_template':
|
||||
target_model_name = 'notification' # total exception
|
||||
role_name = f'{target_model_name}_admin_role'
|
||||
elif rd.name.endswith('-admin'):
|
||||
elif rd.name.endswith(' Admin'):
|
||||
# cases like "project-admin"
|
||||
model_name, _ = rd.name.rsplit('-', 1)
|
||||
role_name = 'admin_role'
|
||||
else:
|
||||
model_name, role_name = rd.name.split('-')
|
||||
print(rd.name)
|
||||
model_name, role_name = rd.name.split()
|
||||
role_name = role_name.lower()
|
||||
role_name += '_role'
|
||||
return getattr(object_role.content_object, role_name)
|
||||
|
||||
|
||||
10
awx/main/tests/functional/dab_rbac/conftest.py
Normal file
10
awx/main/tests/functional/dab_rbac/conftest.py
Normal file
@ -0,0 +1,10 @@
|
||||
import pytest
|
||||
from django.apps import apps
|
||||
|
||||
from awx.main.migrations._dab_rbac import setup_managed_role_definitions
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def managed_roles():
|
||||
"Run the migration script to pre-create managed role definitions"
|
||||
setup_managed_role_definitions(apps, None)
|
||||
45
awx/main/tests/functional/dab_rbac/test_dab_migration.py
Normal file
45
awx/main/tests/functional/dab_rbac/test_dab_migration.py
Normal file
@ -0,0 +1,45 @@
|
||||
import pytest
|
||||
from django.apps import apps
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from awx.main.migrations._dab_rbac import setup_managed_role_definitions
|
||||
|
||||
from ansible_base.rbac.models import RoleDefinition
|
||||
|
||||
INVENTORY_OBJ_PERMISSIONS = ['view_inventory', 'adhoc_inventory', 'use_inventory', 'change_inventory', 'delete_inventory', 'update_inventory']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_managed_definitions_precreate():
|
||||
with override_settings(
|
||||
ANSIBLE_BASE_ROLE_PRECREATE={
|
||||
'object_admin': '{cls._meta.model_name}-admin',
|
||||
'org_admin': 'organization-admin',
|
||||
'org_children': 'organization-{cls._meta.model_name}-admin',
|
||||
'special': '{cls._meta.model_name}-{action}',
|
||||
}
|
||||
):
|
||||
setup_managed_role_definitions(apps, None)
|
||||
rd = RoleDefinition.objects.get(name='inventory-admin')
|
||||
assert rd.managed is True
|
||||
# add permissions do not go in the object-level admin
|
||||
assert set(rd.permissions.values_list('codename', flat=True)) == set(INVENTORY_OBJ_PERMISSIONS)
|
||||
|
||||
# test org-level object admin permissions
|
||||
rd = RoleDefinition.objects.get(name='organization-inventory-admin')
|
||||
assert rd.managed is True
|
||||
assert set(rd.permissions.values_list('codename', flat=True)) == set(['add_inventory', 'view_organization'] + INVENTORY_OBJ_PERMISSIONS)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_managed_definitions_custom_obj_admin_name():
|
||||
with override_settings(
|
||||
ANSIBLE_BASE_ROLE_PRECREATE={
|
||||
'object_admin': 'foo-{cls._meta.model_name}-foo',
|
||||
}
|
||||
):
|
||||
setup_managed_role_definitions(apps, None)
|
||||
rd = RoleDefinition.objects.get(name='foo-inventory-foo')
|
||||
assert rd.managed is True
|
||||
# add permissions do not go in the object-level admin
|
||||
assert set(rd.permissions.values_list('codename', flat=True)) == set(INVENTORY_OBJ_PERMISSIONS)
|
||||
@ -10,19 +10,19 @@ from ansible_base.rbac.models import RoleDefinition
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_managed_roles_created():
|
||||
def test_managed_roles_created(managed_roles):
|
||||
"Managed RoleDefinitions are created in post_migration signal, we expect to see them here"
|
||||
for cls in (JobTemplate, Inventory):
|
||||
ct = ContentType.objects.get_for_model(cls)
|
||||
rds = list(RoleDefinition.objects.filter(content_type=ct))
|
||||
assert len(rds) > 1
|
||||
assert f'{cls._meta.model_name}-admin' in [rd.name for rd in rds]
|
||||
assert f'{cls.__name__} Admin' in [rd.name for rd in rds]
|
||||
for rd in rds:
|
||||
assert rd.managed is True
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_custom_read_role(admin_user, post):
|
||||
def test_custom_read_role(admin_user, post, managed_roles):
|
||||
rd_url = django_reverse('roledefinition-list')
|
||||
resp = post(
|
||||
url=rd_url, data={"name": "read role made for test", "content_type": "awx.inventory", "permissions": ['view_inventory']}, user=admin_user, expect=201
|
||||
@ -40,8 +40,8 @@ def test_custom_system_roles_prohibited(admin_user, post):
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_assign_managed_role(admin_user, alice, rando, inventory, post):
|
||||
rd = RoleDefinition.objects.get(name='inventory-admin')
|
||||
def test_assign_managed_role(admin_user, alice, rando, inventory, post, managed_roles):
|
||||
rd = RoleDefinition.objects.get(name='Inventory Admin')
|
||||
rd.give_permission(alice, inventory)
|
||||
# Now that alice has full permissions to the inventory, she will give rando permission
|
||||
url = django_reverse('roleuserassignment-list')
|
||||
@ -63,7 +63,7 @@ def test_assign_custom_delete_role(admin_user, rando, inventory, delete, patch):
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_assign_custom_add_role(admin_user, rando, organization, post):
|
||||
def test_assign_custom_add_role(admin_user, rando, organization, post, managed_roles):
|
||||
rd, _ = RoleDefinition.objects.get_or_create(
|
||||
name='inventory-add', permissions=['add_inventory', 'view_organization'], content_type=ContentType.objects.get_for_model(Organization)
|
||||
)
|
||||
|
||||
@ -14,7 +14,7 @@ from ansible_base.rbac.models import RoleUserAssignment
|
||||
'role_name',
|
||||
['execution_environment_admin_role', 'project_admin_role', 'admin_role', 'auditor_role', 'read_role', 'execute_role', 'notification_admin_role'],
|
||||
)
|
||||
def test_round_trip_roles(organization, rando, role_name):
|
||||
def test_round_trip_roles(organization, rando, role_name, managed_roles):
|
||||
"""
|
||||
Make an assignment with the old-style role,
|
||||
get the equivelent new role
|
||||
@ -28,7 +28,7 @@ def test_round_trip_roles(organization, rando, role_name):
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_level_permissions(organization, inventory):
|
||||
def test_organization_level_permissions(organization, inventory, managed_roles):
|
||||
u1 = User.objects.create(username='alice')
|
||||
u2 = User.objects.create(username='bob')
|
||||
|
||||
@ -58,14 +58,14 @@ def test_organization_level_permissions(organization, inventory):
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_execute_role(organization, rando):
|
||||
def test_organization_execute_role(organization, rando, managed_roles):
|
||||
organization.execute_role.members.add(rando)
|
||||
assert rando in organization.execute_role
|
||||
assert set(Organization.accessible_objects(rando, 'execute_role')) == set([organization])
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_workflow_approval_list(get, post, admin_user):
|
||||
def test_workflow_approval_list(get, post, admin_user, managed_roles):
|
||||
workflow_job_template = WorkflowJobTemplate.objects.create()
|
||||
approval_node = WorkflowJobTemplateNode.objects.create(workflow_job_template=workflow_job_template)
|
||||
url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'})
|
||||
@ -79,14 +79,14 @@ def test_workflow_approval_list(get, post, admin_user):
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_creator_permission(rando, admin_user, inventory):
|
||||
def test_creator_permission(rando, admin_user, inventory, managed_roles):
|
||||
give_creator_permissions(rando, inventory)
|
||||
assert rando in inventory.admin_role
|
||||
assert rando in inventory.admin_role.members.all()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_team_team_read_role(rando, team, admin_user, post):
|
||||
def test_team_team_read_role(rando, team, admin_user, post, managed_roles):
|
||||
orgs = [Organization.objects.create(name=f'foo-{i}') for i in range(2)]
|
||||
teams = [Team.objects.create(name=f'foo-{i}', organization=orgs[i]) for i in range(2)]
|
||||
teams[1].member_role.members.add(rando)
|
||||
|
||||
@ -1145,14 +1145,17 @@ ANSIBLE_BASE_CUSTOM_VIEW_PARENT = 'awx.api.generics.APIView'
|
||||
|
||||
# Settings for the ansible_base RBAC system
|
||||
|
||||
# Settings for the RBAC system, override as necessary in app
|
||||
# Only used internally, names of the managed RoleDefinitions to create
|
||||
ANSIBLE_BASE_ROLE_PRECREATE = {
|
||||
'object_admin': '{cls._meta.model_name}-admin',
|
||||
'org_admin': 'organization-admin',
|
||||
'org_children': 'organization-{cls._meta.model_name}-admin',
|
||||
'special': '{cls._meta.model_name}-{action}',
|
||||
'object_admin': '{cls.__name__} Admin',
|
||||
'org_admin': 'Organization Admin',
|
||||
'org_children': 'Organization {cls.__name__} Admin',
|
||||
'special': '{cls.__name__} {action}',
|
||||
}
|
||||
|
||||
# Name for auto-created roles that give users permissions to what they create
|
||||
ANSIBLE_BASE_ROLE_CREATOR_NAME = '{cls.__name__} Creator'
|
||||
|
||||
# Use the new Gateway RBAC system for evaluations? You should. We will remove the old system soon.
|
||||
ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED = True
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user