diff --git a/awx/main/migrations/0192_custom_roles.py b/awx/main/migrations/0192_custom_roles.py index 3491ad67e1..c91823aa34 100644 --- a/awx/main/migrations/0192_custom_roles.py +++ b/awx/main/migrations/0192_custom_roles.py @@ -2,9 +2,7 @@ from django.db import migrations -from awx.main.migrations._dab_rbac import migrate_to_new_rbac, create_permissions_as_operation - -from ansible_base.rbac.migrations._managed_definitions import setup_managed_role_definitions +from awx.main.migrations._dab_rbac import migrate_to_new_rbac, create_permissions_as_operation, setup_managed_role_definitions class Migration(migrations.Migration): diff --git a/awx/main/migrations/_dab_rbac.py b/awx/main/migrations/_dab_rbac.py index 1d8da9222f..6e3c04882f 100644 --- a/awx/main/migrations/_dab_rbac.py +++ b/awx/main/migrations/_dab_rbac.py @@ -3,12 +3,15 @@ import logging from django.apps import apps as global_apps from django.db.models import ForeignKey +from django.conf import settings from ansible_base.rbac.migrations._utils import give_permissions from ansible_base.rbac.management import create_dab_permissions from awx.main.fields import ImplicitRoleField from awx.main.constants import role_name_to_perm_mapping +from ansible_base.rbac.permission_registry import permission_registry + logger = logging.getLogger('awx.main.migrations._dab_rbac') @@ -194,7 +197,7 @@ def migrate_to_new_rbac(apps, schema_editor): role_definition = managed_definitions[permissions] else: action = role.role_field.rsplit('_', 1)[0] # remove the _field ending of the name - role_definition_name = f'{role.content_type.model}-{action}' + role_definition_name = f'{role.content_type.model_class().__name__} {action.title()}' description = role_descriptions[role.role_field] if type(description) == dict: @@ -241,3 +244,100 @@ def migrate_to_new_rbac(apps, schema_editor): ct += 1 if ct: logger.info(f'Migrated {ct} users to new system auditor flag') + + +def get_or_create_managed(name, description, ct, permissions, RoleDefinition): + role_definition, created = RoleDefinition.objects.get_or_create(name=name, defaults={'managed': True, 'description': description, 'content_type': ct}) + role_definition.permissions.set(list(permissions)) + + if not role_definition.managed: + role_definition.managed = True + role_definition.save(update_fields=['managed']) + + if created: + logger.info(f'Created RoleDefinition {role_definition.name} pk={role_definition} with {len(permissions)} permissions') + + return role_definition + + +def setup_managed_role_definitions(apps, schema_editor): + """ + Idepotent method to create or sync the managed role definitions + """ + to_create = settings.ANSIBLE_BASE_ROLE_PRECREATE + + ContentType = apps.get_model('contenttypes', 'ContentType') + Permission = apps.get_model('dab_rbac', 'DABPermission') + RoleDefinition = apps.get_model('dab_rbac', 'RoleDefinition') + Organization = apps.get_model(settings.ANSIBLE_BASE_ORGANIZATION_MODEL) + org_ct = ContentType.objects.get_for_model(Organization) + managed_role_definitions = [] + + org_perms = set() + for cls in permission_registry._registry: + ct = ContentType.objects.get_for_model(cls) + object_perms = set(Permission.objects.filter(content_type=ct)) + # Special case for InstanceGroup which has an organiation field, but is not an organization child object + if cls._meta.model_name != 'instancegroup': + org_perms.update(object_perms) + + if 'object_admin' in to_create and cls != Organization: + indiv_perms = object_perms.copy() + add_perms = [perm for perm in indiv_perms if perm.codename.startswith('add_')] + if add_perms: + for perm in add_perms: + indiv_perms.remove(perm) + + managed_role_definitions.append( + get_or_create_managed( + to_create['object_admin'].format(cls=cls), f'Has all permissions to a single {cls._meta.verbose_name}', ct, indiv_perms, RoleDefinition + ) + ) + + if 'org_children' in to_create and cls != Organization: + org_child_perms = object_perms.copy() + org_child_perms.add(Permission.objects.get(codename='view_organization')) + + managed_role_definitions.append( + get_or_create_managed( + to_create['org_children'].format(cls=cls), + f'Has all permissions to {cls._meta.verbose_name_plural} within an organization', + org_ct, + org_child_perms, + RoleDefinition, + ) + ) + + if 'special' in to_create: + special_perms = [] + for perm in object_perms: + if perm.codename.split('_')[0] not in ('add', 'change', 'update', 'delete', 'view'): + special_perms.append(perm) + for perm in special_perms: + action = perm.codename.split('_')[0] + view_perm = Permission.objects.get(content_type=ct, codename__startswith='view_') + managed_role_definitions.append( + get_or_create_managed( + to_create['special'].format(cls=cls, action=action.title()), + f'Has {action} permissions to a single {cls._meta.verbose_name}', + ct, + [perm, view_perm], + RoleDefinition, + ) + ) + + if 'org_admin' in to_create: + managed_role_definitions.append( + get_or_create_managed( + to_create['org_admin'].format(cls=Organization), + 'Has all permissions to a single organization and all objects inside of it', + org_ct, + org_perms, + RoleDefinition, + ) + ) + + unexpected_role_definitions = RoleDefinition.objects.filter(managed=True).exclude(pk__in=[rd.pk for rd in managed_role_definitions]) + for role_definition in unexpected_role_definitions: + logger.info(f'Deleting old managed role definition {role_definition.name}, pk={role_definition.pk}') + role_definition.delete() diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index a137c381fd..c3cdeb5f6b 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -553,7 +553,7 @@ def get_role_definition(role): return f = obj._meta.get_field(role.role_field) action_name = f.name.rsplit("_", 1)[0] - rd_name = f'{obj._meta.model_name}-{action_name}-compat' + rd_name = f'{type(obj).__name__} {action_name.title()} Compat' perm_list = get_role_codenames(role) defaults = {'content_type_id': role.content_type_id} try: @@ -573,23 +573,26 @@ def get_role_from_object_role(object_role): reverses naming from get_role_definition, and the ANSIBLE_BASE_ROLE_PRECREATE setting. """ rd = object_role.role_definition - if rd.name.endswith('-compat'): - model_name, role_name, _ = rd.name.split('-') + if rd.name.endswith(' Compat'): + model_name, role_name, _ = rd.name.split() + role_name = role_name.lower() role_name += '_role' - elif rd.name.endswith('-admin') and rd.name.count('-') == 2: - # cases like "organization-project-admin" - model_name, target_model_name, role_name = rd.name.split('-') + elif rd.name.endswith(' Admin') and rd.name.count(' ') == 2: + # cases like "Organization Project Admin" + model_name, target_model_name, role_name = rd.name.split() + role_name = role_name.lower() model_cls = apps.get_model('main', target_model_name) target_model_name = get_type_for_model(model_cls) if target_model_name == 'notification_template': target_model_name = 'notification' # total exception role_name = f'{target_model_name}_admin_role' - elif rd.name.endswith('-admin'): + elif rd.name.endswith(' Admin'): # cases like "project-admin" - model_name, _ = rd.name.rsplit('-', 1) role_name = 'admin_role' else: - model_name, role_name = rd.name.split('-') + print(rd.name) + model_name, role_name = rd.name.split() + role_name = role_name.lower() role_name += '_role' return getattr(object_role.content_object, role_name) diff --git a/awx/main/tests/functional/dab_rbac/conftest.py b/awx/main/tests/functional/dab_rbac/conftest.py new file mode 100644 index 0000000000..2e37b7f751 --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/conftest.py @@ -0,0 +1,10 @@ +import pytest +from django.apps import apps + +from awx.main.migrations._dab_rbac import setup_managed_role_definitions + + +@pytest.fixture +def managed_roles(): + "Run the migration script to pre-create managed role definitions" + setup_managed_role_definitions(apps, None) diff --git a/awx/main/tests/functional/dab_rbac/test_dab_migration.py b/awx/main/tests/functional/dab_rbac/test_dab_migration.py new file mode 100644 index 0000000000..34639774db --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/test_dab_migration.py @@ -0,0 +1,45 @@ +import pytest +from django.apps import apps +from django.test.utils import override_settings + +from awx.main.migrations._dab_rbac import setup_managed_role_definitions + +from ansible_base.rbac.models import RoleDefinition + +INVENTORY_OBJ_PERMISSIONS = ['view_inventory', 'adhoc_inventory', 'use_inventory', 'change_inventory', 'delete_inventory', 'update_inventory'] + + +@pytest.mark.django_db +def test_managed_definitions_precreate(): + with override_settings( + ANSIBLE_BASE_ROLE_PRECREATE={ + 'object_admin': '{cls._meta.model_name}-admin', + 'org_admin': 'organization-admin', + 'org_children': 'organization-{cls._meta.model_name}-admin', + 'special': '{cls._meta.model_name}-{action}', + } + ): + setup_managed_role_definitions(apps, None) + rd = RoleDefinition.objects.get(name='inventory-admin') + assert rd.managed is True + # add permissions do not go in the object-level admin + assert set(rd.permissions.values_list('codename', flat=True)) == set(INVENTORY_OBJ_PERMISSIONS) + + # test org-level object admin permissions + rd = RoleDefinition.objects.get(name='organization-inventory-admin') + assert rd.managed is True + assert set(rd.permissions.values_list('codename', flat=True)) == set(['add_inventory', 'view_organization'] + INVENTORY_OBJ_PERMISSIONS) + + +@pytest.mark.django_db +def test_managed_definitions_custom_obj_admin_name(): + with override_settings( + ANSIBLE_BASE_ROLE_PRECREATE={ + 'object_admin': 'foo-{cls._meta.model_name}-foo', + } + ): + setup_managed_role_definitions(apps, None) + rd = RoleDefinition.objects.get(name='foo-inventory-foo') + assert rd.managed is True + # add permissions do not go in the object-level admin + assert set(rd.permissions.values_list('codename', flat=True)) == set(INVENTORY_OBJ_PERMISSIONS) diff --git a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py index 7db61ae04e..293f37c1f9 100644 --- a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py +++ b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py @@ -10,19 +10,19 @@ from ansible_base.rbac.models import RoleDefinition @pytest.mark.django_db -def test_managed_roles_created(): +def test_managed_roles_created(managed_roles): "Managed RoleDefinitions are created in post_migration signal, we expect to see them here" for cls in (JobTemplate, Inventory): ct = ContentType.objects.get_for_model(cls) rds = list(RoleDefinition.objects.filter(content_type=ct)) assert len(rds) > 1 - assert f'{cls._meta.model_name}-admin' in [rd.name for rd in rds] + assert f'{cls.__name__} Admin' in [rd.name for rd in rds] for rd in rds: assert rd.managed is True @pytest.mark.django_db -def test_custom_read_role(admin_user, post): +def test_custom_read_role(admin_user, post, managed_roles): rd_url = django_reverse('roledefinition-list') resp = post( url=rd_url, data={"name": "read role made for test", "content_type": "awx.inventory", "permissions": ['view_inventory']}, user=admin_user, expect=201 @@ -40,8 +40,8 @@ def test_custom_system_roles_prohibited(admin_user, post): @pytest.mark.django_db -def test_assign_managed_role(admin_user, alice, rando, inventory, post): - rd = RoleDefinition.objects.get(name='inventory-admin') +def test_assign_managed_role(admin_user, alice, rando, inventory, post, managed_roles): + rd = RoleDefinition.objects.get(name='Inventory Admin') rd.give_permission(alice, inventory) # Now that alice has full permissions to the inventory, she will give rando permission url = django_reverse('roleuserassignment-list') @@ -63,7 +63,7 @@ def test_assign_custom_delete_role(admin_user, rando, inventory, delete, patch): @pytest.mark.django_db -def test_assign_custom_add_role(admin_user, rando, organization, post): +def test_assign_custom_add_role(admin_user, rando, organization, post, managed_roles): rd, _ = RoleDefinition.objects.get_or_create( name='inventory-add', permissions=['add_inventory', 'view_organization'], content_type=ContentType.objects.get_for_model(Organization) ) diff --git a/awx/main/tests/functional/dab_rbac/test_translation_layer.py b/awx/main/tests/functional/dab_rbac/test_translation_layer.py index 0d2c2c7245..2829599252 100644 --- a/awx/main/tests/functional/dab_rbac/test_translation_layer.py +++ b/awx/main/tests/functional/dab_rbac/test_translation_layer.py @@ -14,7 +14,7 @@ from ansible_base.rbac.models import RoleUserAssignment 'role_name', ['execution_environment_admin_role', 'project_admin_role', 'admin_role', 'auditor_role', 'read_role', 'execute_role', 'notification_admin_role'], ) -def test_round_trip_roles(organization, rando, role_name): +def test_round_trip_roles(organization, rando, role_name, managed_roles): """ Make an assignment with the old-style role, get the equivelent new role @@ -28,7 +28,7 @@ def test_round_trip_roles(organization, rando, role_name): @pytest.mark.django_db -def test_organization_level_permissions(organization, inventory): +def test_organization_level_permissions(organization, inventory, managed_roles): u1 = User.objects.create(username='alice') u2 = User.objects.create(username='bob') @@ -58,14 +58,14 @@ def test_organization_level_permissions(organization, inventory): @pytest.mark.django_db -def test_organization_execute_role(organization, rando): +def test_organization_execute_role(organization, rando, managed_roles): organization.execute_role.members.add(rando) assert rando in organization.execute_role assert set(Organization.accessible_objects(rando, 'execute_role')) == set([organization]) @pytest.mark.django_db -def test_workflow_approval_list(get, post, admin_user): +def test_workflow_approval_list(get, post, admin_user, managed_roles): workflow_job_template = WorkflowJobTemplate.objects.create() approval_node = WorkflowJobTemplateNode.objects.create(workflow_job_template=workflow_job_template) url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'}) @@ -79,14 +79,14 @@ def test_workflow_approval_list(get, post, admin_user): @pytest.mark.django_db -def test_creator_permission(rando, admin_user, inventory): +def test_creator_permission(rando, admin_user, inventory, managed_roles): give_creator_permissions(rando, inventory) assert rando in inventory.admin_role assert rando in inventory.admin_role.members.all() @pytest.mark.django_db -def test_team_team_read_role(rando, team, admin_user, post): +def test_team_team_read_role(rando, team, admin_user, post, managed_roles): orgs = [Organization.objects.create(name=f'foo-{i}') for i in range(2)] teams = [Team.objects.create(name=f'foo-{i}', organization=orgs[i]) for i in range(2)] teams[1].member_role.members.add(rando) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f94b4af428..751e419730 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1145,14 +1145,17 @@ ANSIBLE_BASE_CUSTOM_VIEW_PARENT = 'awx.api.generics.APIView' # Settings for the ansible_base RBAC system -# Settings for the RBAC system, override as necessary in app +# Only used internally, names of the managed RoleDefinitions to create ANSIBLE_BASE_ROLE_PRECREATE = { - 'object_admin': '{cls._meta.model_name}-admin', - 'org_admin': 'organization-admin', - 'org_children': 'organization-{cls._meta.model_name}-admin', - 'special': '{cls._meta.model_name}-{action}', + 'object_admin': '{cls.__name__} Admin', + 'org_admin': 'Organization Admin', + 'org_children': 'Organization {cls.__name__} Admin', + 'special': '{cls.__name__} {action}', } +# Name for auto-created roles that give users permissions to what they create +ANSIBLE_BASE_ROLE_CREATOR_NAME = '{cls.__name__} Creator' + # Use the new Gateway RBAC system for evaluations? You should. We will remove the old system soon. ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED = True