diff --git a/awx/api/views.py b/awx/api/views.py index 3b939825fb..5a6aebebc1 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -4494,7 +4494,7 @@ class UnifiedJobTemplateList(ListAPIView): capabilities_prefetch = [ 'admin', 'execute', {'copy': ['jobtemplate.project.use', 'jobtemplate.inventory.use', - 'workflowjobtemplate.organization.admin']} + 'workflowjobtemplate.organization.workflow_admin']} ] diff --git a/awx/main/access.py b/awx/main/access.py index a5897936db..0d66c7209b 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -608,6 +608,7 @@ class InventoryAccess(BaseAccess): I can see inventory when: - I'm a superuser. - I'm an org admin of the inventory's org. + - I'm an inventory admin of the inventory's org. - I have read, write or admin permissions on it. I can change inventory when: - I'm a superuser. @@ -641,9 +642,9 @@ class InventoryAccess(BaseAccess): def can_add(self, data): # If no data is specified, just checking for generic add permission? if not data: - return Organization.accessible_objects(self.user, 'admin_role').exists() + return Organization.accessible_objects(self.user, 'inventory_admin_role').exists() - return self.check_related('organization', Organization, data) + return self.check_related('organization', Organization, data, role_field='inventory_admin_role') @check_superuser def can_change(self, obj, data): @@ -659,7 +660,7 @@ class InventoryAccess(BaseAccess): # Verify that the user has access to the new organization if moving an # inventory to a new organization. Otherwise, just check for admin permission. return ( - self.check_related('organization', Organization, data, obj=obj, + self.check_related('organization', Organization, data, obj=obj, role_field='inventory_admin_role', mandatory=org_admin_mandatory) and self.user in obj.admin_role ) @@ -945,8 +946,12 @@ class CredentialAccess(BaseAccess): - I'm a superuser. - It's a user credential and it's my credential. - It's a user credential and I'm an admin of an organization where that - user is a member of admin of the organization. + user is a member. + - It's a user credential and I'm a credential_admin of an organization + where that user is a member. - It's a team credential and I'm an admin of the team's organization. + - It's a team credential and I'm a credential admin of the team's + organization. - It's a team credential and I'm a member of the team. I can change/delete when: - I'm a superuser. @@ -980,7 +985,8 @@ class CredentialAccess(BaseAccess): return check_user_access(self.user, Team, 'change', team_obj, None) if data and data.get('organization', None): organization_obj = get_object_from_data('organization', Organization, data) - return check_user_access(self.user, Organization, 'change', organization_obj, None) + return any([check_user_access(self.user, Organization, 'change', organization_obj, None), + self.user in organization_obj.credential_admin_role]) return False @check_superuser @@ -991,7 +997,7 @@ class CredentialAccess(BaseAccess): def can_change(self, obj, data): if not obj: return False - return self.user in obj.admin_role and self.check_related('organization', Organization, data, obj=obj) + return self.user in obj.admin_role and self.check_related('organization', Organization, data, obj=obj, role_field='credential_admin_role') def can_delete(self, obj): # Unassociated credentials may be marked deleted by anyone, though we @@ -1067,6 +1073,7 @@ class ProjectAccess(BaseAccess): I can see projects when: - I am a superuser. - I am an admin in an organization associated with the project. + - I am a project admin in an organization associated with the project. - I am a user in an organization associated with the project. - I am on a team associated with the project. - I have been explicitly granted permission to run/check jobs using the @@ -1087,12 +1094,12 @@ class ProjectAccess(BaseAccess): @check_superuser def can_add(self, data): if not data: # So the browseable API will work - return Organization.accessible_objects(self.user, 'admin_role').exists() - return self.check_related('organization', Organization, data, mandatory=True) + return Organization.accessible_objects(self.user, 'project_admin_role').exists() + return self.check_related('organization', Organization, data, role_field='project_admin_role', mandatory=True) @check_superuser def can_change(self, obj, data): - if not self.check_related('organization', Organization, data, obj=obj): + if not self.check_related('organization', Organization, data, obj=obj, role_field='project_admin_role'): return False return self.user in obj.admin_role @@ -1174,6 +1181,7 @@ class JobTemplateAccess(BaseAccess): a user can create a job template if - they are a superuser - an org admin of any org that the project is a member + - if they are a project_admin for any org that project is a member of - if they have user or team based permissions tying the project to the inventory source for the given action as well as the 'create' deploy permission. @@ -1432,7 +1440,7 @@ class JobAccess(BaseAccess): elif not jt_access: return False - org_access = obj.inventory and self.user in obj.inventory.organization.admin_role + org_access = obj.inventory and self.user in obj.inventory.organization.inventory_admin_role project_access = obj.project is None or self.user in obj.project.admin_role credential_access = all([self.user in cred.use_role for cred in obj.credentials.all()]) @@ -1725,13 +1733,14 @@ class WorkflowJobTemplateAccess(BaseAccess): Users who are able to create deploy jobs can also run normal and check (dry run) jobs. ''' if not data: # So the browseable API will work - return Organization.accessible_objects(self.user, 'admin_role').exists() + return Organization.accessible_objects(self.user, 'workflow_admin_role').exists() # will check this if surveys are added to WFJT if 'survey_enabled' in data and data['survey_enabled']: self.check_license(feature='surveys') - return self.check_related('organization', Organization, data, mandatory=True) + return self.check_related('organization', Organization, data, role_field='workflow_admin_role', + mandatory=True) def can_copy(self, obj): if self.save_messages: @@ -1758,7 +1767,8 @@ class WorkflowJobTemplateAccess(BaseAccess): if missing_inventories: self.messages['inventories_unable_to_copy'] = missing_inventories - return self.check_related('organization', Organization, {'reference_obj': obj}, mandatory=True) + return self.check_related('organization', Organization, {'reference_obj': obj}, role_field='workflow_admin_role', + mandatory=True) def can_start(self, obj, validate_license=True): if validate_license: @@ -1783,7 +1793,8 @@ class WorkflowJobTemplateAccess(BaseAccess): if self.user.is_superuser: return True - return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role + return (self.check_related('organization', Organization, data, role_field='workflow_admin_field', obj=obj) and + self.user in obj.admin_role) def can_delete(self, obj): is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role @@ -1824,7 +1835,7 @@ class WorkflowJobAccess(BaseAccess): def can_delete(self, obj): return (obj.workflow_job_template and obj.workflow_job_template.organization and - self.user in obj.workflow_job_template.organization.admin_role) + self.user in obj.workflow_job_template.organization.workflow_admin_role) def get_method_capability(self, method, obj, parent_obj): if method == 'start': @@ -2204,7 +2215,7 @@ class NotificationTemplateAccess(BaseAccess): def filtered_queryset(self): return self.model.objects.filter( - Q(organization__in=self.user.admin_of_organizations) | + Q(organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) | Q(organization__in=self.user.auditor_of_organizations) ).distinct() @@ -2212,22 +2223,22 @@ class NotificationTemplateAccess(BaseAccess): if self.user.is_superuser or self.user.is_system_auditor: return True if obj.organization is not None: - if self.user in obj.organization.admin_role or self.user in obj.organization.auditor_role: + if self.user in obj.organization.notification_admin_role or self.user in obj.organization.auditor_role: return True return False @check_superuser def can_add(self, data): if not data: - return Organization.accessible_objects(self.user, 'admin_role').exists() - return self.check_related('organization', Organization, data, mandatory=True) + return Organization.accessible_objects(self.user, 'notification_admin_role').exists() + return self.check_related('organization', Organization, data, role_field='notification_admin_role', mandatory=True) @check_superuser def can_change(self, obj, data): if obj.organization is None: # only superusers are allowed to edit orphan notification templates return False - return self.check_related('organization', Organization, data, obj=obj, mandatory=True) + return self.check_related('organization', Organization, data, obj=obj, role_field='notification_admin_role', mandatory=True) def can_admin(self, obj, data): return self.can_change(obj, data) @@ -2239,7 +2250,7 @@ class NotificationTemplateAccess(BaseAccess): def can_start(self, obj, validate_license=True): if obj.organization is None: return False - return self.user in obj.organization.admin_role + return self.user in obj.organization.notification_admin_role class NotificationAccess(BaseAccess): @@ -2251,7 +2262,7 @@ class NotificationAccess(BaseAccess): def filtered_queryset(self): return self.model.objects.filter( - Q(notification_template__organization__in=self.user.admin_of_organizations) | + Q(notification_template__organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) | Q(notification_template__organization__in=self.user.auditor_of_organizations) ).distinct() diff --git a/awx/main/migrations/0021_v330_declare_new_rbac_roles.py b/awx/main/migrations/0021_v330_declare_new_rbac_roles.py new file mode 100644 index 0000000000..a279ef90e3 --- /dev/null +++ b/awx/main/migrations/0021_v330_declare_new_rbac_roles.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.7 on 2018-02-01 16:32 +from __future__ import unicode_literals + +import awx.main.fields +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0020_v330_instancegroup_policies'), + ] + + operations = [ + migrations.AddField( + model_name='organization', + name='execute_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'), + ), + migrations.AddField( + model_name='organization', + name='credential_admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'), + ), + migrations.AddField( + model_name='organization', + name='inventory_admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'), + ), + migrations.AddField( + model_name='organization', + name='project_admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'), + ), + migrations.AddField( + model_name='organization', + name='workflow_admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'), + ), + migrations.AddField( + model_name='organization', + name='notification_admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='credential', + name='admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'singleton:system_administrator', b'organization.credential_admin_role'], related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='inventory', + name='admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'organization.inventory_admin_role', related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='project', + name='admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'organization.project_admin_role', b'singleton:system_administrator'], related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='workflowjobtemplate', + name='admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'singleton:system_administrator', b'organization.workflow_admin_role'], related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='workflowjobtemplate', + name='execute_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'admin_role', b'organization.execute_role'], related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='jobtemplate', + name='admin_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'project.organization.project_admin_role', b'inventory.organization.inventory_admin_role'], related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='jobtemplate', + name='execute_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'admin_role', b'project.organization.execute_role', b'inventory.organization.execute_role'], related_name='+', to='main.Role'), + ), + migrations.AlterField( + model_name='organization', + name='member_role', + field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'admin_role', b'project_admin_role', b'inventory_admin_role', b'workflow_admin_role', b'notification_admin_role', b'execute_role'], related_name='+', to='main.Role'), + ), + ] diff --git a/awx/main/migrations/0022_v330_create_new_rbac_roles.py b/awx/main/migrations/0022_v330_create_new_rbac_roles.py new file mode 100644 index 0000000000..578be8645c --- /dev/null +++ b/awx/main/migrations/0022_v330_create_new_rbac_roles.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations +from awx.main.migrations import ActivityStreamDisabledMigration +from awx.main.migrations import _rbac as rbac +from awx.main.migrations import _migration_utils as migration_utils + + +class Migration(ActivityStreamDisabledMigration): + + dependencies = [ + ('main', '0021_v330_declare_new_rbac_roles'), + ] + + operations = [ + migrations.RunPython(migration_utils.set_current_apps_for_migrations), + migrations.RunPython(rbac.create_roles), + ] diff --git a/awx/main/models/credential/__init__.py b/awx/main/models/credential/__init__.py index e51758d2b3..ebea41cc80 100644 --- a/awx/main/models/credential/__init__.py +++ b/awx/main/models/credential/__init__.py @@ -262,7 +262,7 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): admin_role = ImplicitRoleField( parent_role=[ 'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, - 'organization.admin_role', + 'organization.credential_admin_role', ], ) use_role = ImplicitRoleField( diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index d03b1c0279..68adf6daf0 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -132,7 +132,7 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin): blank=True, ) admin_role = ImplicitRoleField( - parent_role='organization.admin_role', + parent_role='organization.inventory_admin_role', ) update_role = ImplicitRoleField( parent_role='admin_role', diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 0d27329cdf..5efd502a7d 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -270,10 +270,10 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour allows_field='credentials' ) admin_role = ImplicitRoleField( - parent_role=['project.organization.admin_role', 'inventory.organization.admin_role'] + parent_role=['project.organization.project_admin_role', 'inventory.organization.inventory_admin_role'] ) execute_role = ImplicitRoleField( - parent_role=['admin_role'], + parent_role=['admin_role', 'project.organization.execute_role', 'inventory.organization.execute_role'], ) read_role = ImplicitRoleField( parent_role=['project.organization.auditor_role', 'inventory.organization.auditor_role', 'execute_role', 'admin_role'], diff --git a/awx/main/models/organization.py b/awx/main/models/organization.py index 2e2a46e28e..377a7619bb 100644 --- a/awx/main/models/organization.py +++ b/awx/main/models/organization.py @@ -45,11 +45,31 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin, CustomVi admin_role = ImplicitRoleField( parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, ) + execute_role = ImplicitRoleField( + parent_role='admin_role', + ) + project_admin_role = ImplicitRoleField( + parent_role='admin_role', + ) + inventory_admin_role = ImplicitRoleField( + parent_role='admin_role', + ) + credential_admin_role = ImplicitRoleField( + parent_role='admin_role', + ) + workflow_admin_role = ImplicitRoleField( + parent_role='admin_role', + ) + notification_admin_role = ImplicitRoleField( + parent_role='admin_role', + ) auditor_role = ImplicitRoleField( parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR, ) member_role = ImplicitRoleField( - parent_role='admin_role', + parent_role=['admin_role', 'execute_role', 'project_admin_role', + 'inventory_admin_role', 'workflow_admin_role', + 'notification_admin_role'] ) read_role = ImplicitRoleField( parent_role=['member_role', 'auditor_role'], diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 5b4c8be177..7798c62c9e 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -286,7 +286,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin, CustomVirtualEn ) admin_role = ImplicitRoleField(parent_role=[ - 'organization.admin_role', + 'organization.project_admin_role', 'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, ]) diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index b7f70aafec..930b226ac6 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -33,29 +33,42 @@ ROLE_SINGLETON_SYSTEM_ADMINISTRATOR='system_administrator' ROLE_SINGLETON_SYSTEM_AUDITOR='system_auditor' role_names = { - 'system_administrator' : _('System Administrator'), - 'system_auditor' : _('System Auditor'), - 'adhoc_role' : _('Ad Hoc'), - 'admin_role' : _('Admin'), - 'auditor_role' : _('Auditor'), - 'execute_role' : _('Execute'), - 'member_role' : _('Member'), - 'read_role' : _('Read'), - 'update_role' : _('Update'), - 'use_role' : _('Use'), + 'system_administrator': _('System Administrator'), + 'system_auditor': _('System Auditor'), + 'adhoc_role': _('Ad Hoc'), + 'admin_role': _('Admin'), + 'project_admin_role': _('Project Admin'), + 'inventory_admin_role': _('Inventory Admin'), + 'credential_admin_role': _('Credential Admin'), + 'workflow_admin_role': _('Workflow Admin'), + 'notification_admin_role': _('Notification Admin'), + 'auditor_role': _('Auditor'), + 'execute_role': _('Execute'), + 'member_role': _('Member'), + 'read_role': _('Read'), + 'update_role': _('Update'), + 'use_role': _('Use'), } role_descriptions = { - 'system_administrator' : _('Can manage all aspects of the system'), - 'system_auditor' : _('Can view all settings on the system'), - 'adhoc_role' : _('May run ad hoc commands on an inventory'), - 'admin_role' : _('Can manage all aspects of the %s'), - 'auditor_role' : _('Can view all settings for the %s'), - 'execute_role' : _('May run the %s'), - 'member_role' : _('User is a member of the %s'), - 'read_role' : _('May view settings for the %s'), - 'update_role' : _('May update project or inventory or group using the configured source update system'), - 'use_role' : _('Can use the %s in a job template'), + 'system_administrator': _('Can manage all aspects of the system'), + 'system_auditor': _('Can view all settings on the system'), + 'adhoc_role': _('May run ad hoc commands on an inventory'), + 'admin_role': _('Can manage all aspects of the %s'), + 'project_admin_role': _('Can manage all projects of the %s'), + 'inventory_admin_role': _('Can manage all inventories of the %s'), + 'credential_admin_role': _('Can manage all credentials of the %s'), + 'workflow_admin_role': _('Can manage all workflows of the %s'), + 'notification_admin_role': _('Can manage all notifications of the %s'), + 'auditor_role': _('Can view all settings for the %s'), + 'execute_role': { + 'organization': _('May run any executable resources in the organization'), + 'default': _('May run the %s'), + }, + 'member_role': _('User is a member of the %s'), + 'read_role': _('May view settings for the %s'), + 'update_role': _('May update project or inventory or group using the configured source update system'), + 'use_role': _('Can use the %s in a job template'), } @@ -170,12 +183,22 @@ class Role(models.Model): global role_descriptions description = role_descriptions[self.role_field] content_type = self.content_type - if '%s' in description and content_type: + + model_name = None + if content_type: model = content_type.model_class() model_name = re.sub(r'([a-z])([A-Z])', r'\1 \2', model.__name__).lower() - description = description % model_name - return description + value = description + if type(description) == dict: + value = description.get(model_name) + if value is None: + value = description.get('default') + + if '%s' in value and content_type: + value = value % model_name + + return value @staticmethod def rebuild_role_ancestor_list(additions, removals): diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index e3bf2640dd..5c582683c6 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -306,10 +306,11 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl ) admin_role = ImplicitRoleField(parent_role=[ 'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, - 'organization.admin_role' + 'organization.workflow_admin_role' ]) execute_role = ImplicitRoleField(parent_role=[ - 'admin_role' + 'admin_role', + 'organization.execute_role', ]) read_role = ImplicitRoleField(parent_role=[ 'singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR, diff --git a/awx/main/tests/functional/test_rbac_credential.py b/awx/main/tests/functional/test_rbac_credential.py index b02b30755a..783fdd9e82 100644 --- a/awx/main/tests/functional/test_rbac_credential.py +++ b/awx/main/tests/functional/test_rbac_credential.py @@ -35,9 +35,31 @@ def test_credential_access_auditor(credential, organization_factory): @pytest.mark.django_db -def test_org_credential_access_member(alice, org_credential, credential): - org_credential.admin_role.members.add(alice) +def test_credential_access_member(alice, credential): credential.admin_role.members.add(alice) + access = CredentialAccess(alice) + assert access.can_change(credential, { + 'description': 'New description.', + 'organization': None}) + + +@pytest.mark.django_db +@pytest.mark.parametrize("role_name", ["admin_role", "credential_admin_role"]) +def test_org_credential_access_admin(role_name, alice, org_credential): + role = getattr(org_credential.organization, role_name) + role.members.add(alice) + + access = CredentialAccess(alice) + + # Alice should be able to PATCH if organization is not changed + assert access.can_change(org_credential, { + 'description': 'New description.', + 'organization': org_credential.organization.pk}) + + +@pytest.mark.django_db +def test_org_credential_access_member(alice, org_credential): + org_credential.admin_role.members.add(alice) access = CredentialAccess(alice) @@ -47,9 +69,6 @@ def test_org_credential_access_member(alice, org_credential, credential): 'organization': org_credential.organization.pk}) assert access.can_change(org_credential, { 'description': 'New description.'}) - assert access.can_change(credential, { - 'description': 'New description.', - 'organization': None}) @pytest.mark.django_db diff --git a/awx/main/tests/functional/test_rbac_inventory.py b/awx/main/tests/functional/test_rbac_inventory.py index 830e5a7b52..fd195b10b3 100644 --- a/awx/main/tests/functional/test_rbac_inventory.py +++ b/awx/main/tests/functional/test_rbac_inventory.py @@ -62,10 +62,13 @@ def test_org_member_inventory_script_permissions(org_member, organization): @pytest.mark.django_db -def test_access_admin(organization, inventory, user): +@pytest.mark.parametrize("role", ["admin_role", "inventory_admin_role"]) +def test_access_admin(role, organization, inventory, user): a = user('admin', False) inventory.organization = organization - organization.admin_role.members.add(a) + + role = getattr(organization, role) + role.members.add(a) access = InventoryAccess(a) assert access.can_read(inventory) diff --git a/awx/main/tests/functional/test_rbac_job_templates.py b/awx/main/tests/functional/test_rbac_job_templates.py index 91778a3c5d..34a6b06e97 100644 --- a/awx/main/tests/functional/test_rbac_job_templates.py +++ b/awx/main/tests/functional/test_rbac_job_templates.py @@ -80,10 +80,15 @@ def test_job_template_access_use_level(jt_linked, rando): @pytest.mark.django_db -def test_job_template_access_org_admin(jt_linked, rando): +@pytest.mark.parametrize("role_names", [("admin_role",), ("inventory_admin_role", "project_admin_role")]) +def test_job_template_access_admin(role_names, jt_linked, rando): access = JobTemplateAccess(rando) # Appoint this user as admin of the organization - jt_linked.inventory.organization.admin_role.members.add(rando) + #jt_linked.inventory.organization.admin_role.members.add(rando) + for role_name in role_names: + role = getattr(jt_linked.inventory.organization, role_name) + role.members.add(rando) + # Assign organization permission in the same way the create view does organization = jt_linked.inventory.organization jt_linked.get_deprecated_credential('ssh').admin_role.parents.add(organization.admin_role) diff --git a/awx/main/tests/functional/test_rbac_notifications.py b/awx/main/tests/functional/test_rbac_notifications.py index 80255da0d1..18ff3959aa 100644 --- a/awx/main/tests/functional/test_rbac_notifications.py +++ b/awx/main/tests/functional/test_rbac_notifications.py @@ -33,6 +33,13 @@ def test_notification_template_get_queryset_orgadmin(notification_template, user assert access.get_queryset().count() == 1 +@pytest.mark.django_db +def test_notification_template_get_queryset_notificationadmin(notification_template, user): + access = NotificationTemplateAccess(user('admin', False)) + notification_template.organization.notification_admin_role.members.add(user('admin', False)) + assert access.get_queryset().count() == 1 + + @pytest.mark.django_db def test_notification_template_get_queryset_org_auditor(notification_template, org_auditor): access = NotificationTemplateAccess(org_auditor) @@ -59,12 +66,13 @@ def test_notification_template_access_superuser(notification_template_factory): @pytest.mark.django_db -def test_notification_template_access_admin(organization_factory, notification_template_factory): +@pytest.mark.parametrize("role", ["present.admin_role:admin", "present.notification_admin_role:admin"]) +def test_notification_template_access_admin(role, organization_factory, notification_template_factory): other_objects = organization_factory('other') present_objects = organization_factory('present', users=['admin'], notification_templates=['test-notification'], - roles=['present.admin_role:admin']) + roles=[role]) notification_template = present_objects.notification_templates.test_notification other_org = other_objects.organization diff --git a/awx/main/tests/functional/test_rbac_workflow.py b/awx/main/tests/functional/test_rbac_workflow.py index 578db417d0..5cd63027d2 100644 --- a/awx/main/tests/functional/test_rbac_workflow.py +++ b/awx/main/tests/functional/test_rbac_workflow.py @@ -49,6 +49,13 @@ class TestWorkflowJobTemplateAccess: assert org_admin in wfjt.execute_role assert org_admin in wfjt.read_role + def test_org_workflow_admin_role_inheritance(self, wfjt, org_member): + wfjt.organization.workflow_admin_role.members.add(org_member) + + assert org_member in wfjt.admin_role + assert org_member in wfjt.execute_role + assert org_member in wfjt.read_role + @pytest.mark.django_db class TestWorkflowJobTemplateNodeAccess: @@ -103,8 +110,12 @@ class TestWorkflowJobTemplateNodeAccess: @pytest.mark.django_db class TestWorkflowJobAccess: - def test_org_admin_can_delete_workflow_job(self, workflow_job, org_admin): - access = WorkflowJobAccess(org_admin) + @pytest.mark.parametrize("role_name", ["admin_role", "workflow_admin_role"]) + def test_org_admin_can_delete_workflow_job(self, role_name, workflow_job, org_member): + role = getattr(workflow_job.workflow_job_template.organization, role_name) + role.members.add(org_member) + + access = WorkflowJobAccess(org_member) assert access.can_delete(workflow_job) def test_wfjt_admin_can_delete_workflow_job(self, workflow_job, rando): @@ -132,9 +143,13 @@ class TestWFJTCopyAccess: admin_access = WorkflowJobTemplateAccess(org_admin) assert admin_access.can_copy(wfjt) + wfjt.organization.workflow_admin_role.members.add(org_member) + admin_access = WorkflowJobTemplateAccess(org_member) + assert admin_access.can_copy(wfjt) + def test_copy_permissions_user(self, wfjt, org_admin, org_member): ''' - Only org admins are able to add WFJTs, only org admins + Only org admins and org workflow admins are able to add WFJTs, only org admins are able to copy them ''' wfjt.admin_role.members.add(org_member) diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index 0692ba0490..6b20aaaed9 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -244,7 +244,7 @@ class TestWorkflowAccessMethods: def test_workflow_can_add(self, workflow, user_unit): organization = Organization(name='test-org') workflow.organization = organization - organization.admin_role = Role() + organization.workflow_admin_role = Role() def mock_get_object(Class, **kwargs): if Class == Organization: