Merge pull request #1123 from wwitzel3/new-permissions

New RBAC Roles
This commit is contained in:
Wayne Witzel III 2018-02-15 16:56:03 -05:00 committed by GitHub
commit 2c71a27630
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 281 additions and 69 deletions

View File

@ -4494,7 +4494,7 @@ class UnifiedJobTemplateList(ListAPIView):
capabilities_prefetch = [
'admin', 'execute',
{'copy': ['jobtemplate.project.use', 'jobtemplate.inventory.use',
'workflowjobtemplate.organization.admin']}
'workflowjobtemplate.organization.workflow_admin']}
]

View File

@ -608,6 +608,7 @@ class InventoryAccess(BaseAccess):
I can see inventory when:
- I'm a superuser.
- I'm an org admin of the inventory's org.
- I'm an inventory admin of the inventory's org.
- I have read, write or admin permissions on it.
I can change inventory when:
- I'm a superuser.
@ -641,9 +642,9 @@ class InventoryAccess(BaseAccess):
def can_add(self, data):
# If no data is specified, just checking for generic add permission?
if not data:
return Organization.accessible_objects(self.user, 'admin_role').exists()
return Organization.accessible_objects(self.user, 'inventory_admin_role').exists()
return self.check_related('organization', Organization, data)
return self.check_related('organization', Organization, data, role_field='inventory_admin_role')
@check_superuser
def can_change(self, obj, data):
@ -659,7 +660,7 @@ class InventoryAccess(BaseAccess):
# Verify that the user has access to the new organization if moving an
# inventory to a new organization. Otherwise, just check for admin permission.
return (
self.check_related('organization', Organization, data, obj=obj,
self.check_related('organization', Organization, data, obj=obj, role_field='inventory_admin_role',
mandatory=org_admin_mandatory) and
self.user in obj.admin_role
)
@ -945,8 +946,12 @@ class CredentialAccess(BaseAccess):
- I'm a superuser.
- It's a user credential and it's my credential.
- It's a user credential and I'm an admin of an organization where that
user is a member of admin of the organization.
user is a member.
- It's a user credential and I'm a credential_admin of an organization
where that user is a member.
- It's a team credential and I'm an admin of the team's organization.
- It's a team credential and I'm a credential admin of the team's
organization.
- It's a team credential and I'm a member of the team.
I can change/delete when:
- I'm a superuser.
@ -980,7 +985,8 @@ class CredentialAccess(BaseAccess):
return check_user_access(self.user, Team, 'change', team_obj, None)
if data and data.get('organization', None):
organization_obj = get_object_from_data('organization', Organization, data)
return check_user_access(self.user, Organization, 'change', organization_obj, None)
return any([check_user_access(self.user, Organization, 'change', organization_obj, None),
self.user in organization_obj.credential_admin_role])
return False
@check_superuser
@ -991,7 +997,7 @@ class CredentialAccess(BaseAccess):
def can_change(self, obj, data):
if not obj:
return False
return self.user in obj.admin_role and self.check_related('organization', Organization, data, obj=obj)
return self.user in obj.admin_role and self.check_related('organization', Organization, data, obj=obj, role_field='credential_admin_role')
def can_delete(self, obj):
# Unassociated credentials may be marked deleted by anyone, though we
@ -1067,6 +1073,7 @@ class ProjectAccess(BaseAccess):
I can see projects when:
- I am a superuser.
- I am an admin in an organization associated with the project.
- I am a project admin in an organization associated with the project.
- I am a user in an organization associated with the project.
- I am on a team associated with the project.
- I have been explicitly granted permission to run/check jobs using the
@ -1087,12 +1094,12 @@ class ProjectAccess(BaseAccess):
@check_superuser
def can_add(self, data):
if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'admin_role').exists()
return self.check_related('organization', Organization, data, mandatory=True)
return Organization.accessible_objects(self.user, 'project_admin_role').exists()
return self.check_related('organization', Organization, data, role_field='project_admin_role', mandatory=True)
@check_superuser
def can_change(self, obj, data):
if not self.check_related('organization', Organization, data, obj=obj):
if not self.check_related('organization', Organization, data, obj=obj, role_field='project_admin_role'):
return False
return self.user in obj.admin_role
@ -1174,6 +1181,7 @@ class JobTemplateAccess(BaseAccess):
a user can create a job template if
- they are a superuser
- an org admin of any org that the project is a member
- if they are a project_admin for any org that project is a member of
- if they have user or team
based permissions tying the project to the inventory source for the
given action as well as the 'create' deploy permission.
@ -1432,7 +1440,7 @@ class JobAccess(BaseAccess):
elif not jt_access:
return False
org_access = obj.inventory and self.user in obj.inventory.organization.admin_role
org_access = obj.inventory and self.user in obj.inventory.organization.inventory_admin_role
project_access = obj.project is None or self.user in obj.project.admin_role
credential_access = all([self.user in cred.use_role for cred in obj.credentials.all()])
@ -1725,13 +1733,14 @@ class WorkflowJobTemplateAccess(BaseAccess):
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
'''
if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'admin_role').exists()
return Organization.accessible_objects(self.user, 'workflow_admin_role').exists()
# will check this if surveys are added to WFJT
if 'survey_enabled' in data and data['survey_enabled']:
self.check_license(feature='surveys')
return self.check_related('organization', Organization, data, mandatory=True)
return self.check_related('organization', Organization, data, role_field='workflow_admin_role',
mandatory=True)
def can_copy(self, obj):
if self.save_messages:
@ -1758,7 +1767,8 @@ class WorkflowJobTemplateAccess(BaseAccess):
if missing_inventories:
self.messages['inventories_unable_to_copy'] = missing_inventories
return self.check_related('organization', Organization, {'reference_obj': obj}, mandatory=True)
return self.check_related('organization', Organization, {'reference_obj': obj}, role_field='workflow_admin_role',
mandatory=True)
def can_start(self, obj, validate_license=True):
if validate_license:
@ -1783,7 +1793,8 @@ class WorkflowJobTemplateAccess(BaseAccess):
if self.user.is_superuser:
return True
return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role
return (self.check_related('organization', Organization, data, role_field='workflow_admin_field', obj=obj) and
self.user in obj.admin_role)
def can_delete(self, obj):
is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role
@ -1824,7 +1835,7 @@ class WorkflowJobAccess(BaseAccess):
def can_delete(self, obj):
return (obj.workflow_job_template and
obj.workflow_job_template.organization and
self.user in obj.workflow_job_template.organization.admin_role)
self.user in obj.workflow_job_template.organization.workflow_admin_role)
def get_method_capability(self, method, obj, parent_obj):
if method == 'start':
@ -2204,7 +2215,7 @@ class NotificationTemplateAccess(BaseAccess):
def filtered_queryset(self):
return self.model.objects.filter(
Q(organization__in=self.user.admin_of_organizations) |
Q(organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) |
Q(organization__in=self.user.auditor_of_organizations)
).distinct()
@ -2212,22 +2223,22 @@ class NotificationTemplateAccess(BaseAccess):
if self.user.is_superuser or self.user.is_system_auditor:
return True
if obj.organization is not None:
if self.user in obj.organization.admin_role or self.user in obj.organization.auditor_role:
if self.user in obj.organization.notification_admin_role or self.user in obj.organization.auditor_role:
return True
return False
@check_superuser
def can_add(self, data):
if not data:
return Organization.accessible_objects(self.user, 'admin_role').exists()
return self.check_related('organization', Organization, data, mandatory=True)
return Organization.accessible_objects(self.user, 'notification_admin_role').exists()
return self.check_related('organization', Organization, data, role_field='notification_admin_role', mandatory=True)
@check_superuser
def can_change(self, obj, data):
if obj.organization is None:
# only superusers are allowed to edit orphan notification templates
return False
return self.check_related('organization', Organization, data, obj=obj, mandatory=True)
return self.check_related('organization', Organization, data, obj=obj, role_field='notification_admin_role', mandatory=True)
def can_admin(self, obj, data):
return self.can_change(obj, data)
@ -2239,7 +2250,7 @@ class NotificationTemplateAccess(BaseAccess):
def can_start(self, obj, validate_license=True):
if obj.organization is None:
return False
return self.user in obj.organization.admin_role
return self.user in obj.organization.notification_admin_role
class NotificationAccess(BaseAccess):
@ -2251,7 +2262,7 @@ class NotificationAccess(BaseAccess):
def filtered_queryset(self):
return self.model.objects.filter(
Q(notification_template__organization__in=self.user.admin_of_organizations) |
Q(notification_template__organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) |
Q(notification_template__organization__in=self.user.auditor_of_organizations)
).distinct()

View File

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.7 on 2018-02-01 16:32
from __future__ import unicode_literals
import awx.main.fields
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0020_v330_instancegroup_policies'),
]
operations = [
migrations.AddField(
model_name='organization',
name='execute_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'),
),
migrations.AddField(
model_name='organization',
name='credential_admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'),
),
migrations.AddField(
model_name='organization',
name='inventory_admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'),
),
migrations.AddField(
model_name='organization',
name='project_admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'),
),
migrations.AddField(
model_name='organization',
name='workflow_admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'),
),
migrations.AddField(
model_name='organization',
name='notification_admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'admin_role', related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='credential',
name='admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'singleton:system_administrator', b'organization.credential_admin_role'], related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='inventory',
name='admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=b'organization.inventory_admin_role', related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='project',
name='admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'organization.project_admin_role', b'singleton:system_administrator'], related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='workflowjobtemplate',
name='admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'singleton:system_administrator', b'organization.workflow_admin_role'], related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='workflowjobtemplate',
name='execute_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'admin_role', b'organization.execute_role'], related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='jobtemplate',
name='admin_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'project.organization.project_admin_role', b'inventory.organization.inventory_admin_role'], related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='jobtemplate',
name='execute_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'admin_role', b'project.organization.execute_role', b'inventory.organization.execute_role'], related_name='+', to='main.Role'),
),
migrations.AlterField(
model_name='organization',
name='member_role',
field=awx.main.fields.ImplicitRoleField(null=b'True', on_delete=django.db.models.deletion.CASCADE, parent_role=[b'admin_role', b'project_admin_role', b'inventory_admin_role', b'workflow_admin_role', b'notification_admin_role', b'execute_role'], related_name='+', to='main.Role'),
),
]

View File

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations
from awx.main.migrations import ActivityStreamDisabledMigration
from awx.main.migrations import _rbac as rbac
from awx.main.migrations import _migration_utils as migration_utils
class Migration(ActivityStreamDisabledMigration):
dependencies = [
('main', '0021_v330_declare_new_rbac_roles'),
]
operations = [
migrations.RunPython(migration_utils.set_current_apps_for_migrations),
migrations.RunPython(rbac.create_roles),
]

View File

@ -262,7 +262,7 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
admin_role = ImplicitRoleField(
parent_role=[
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
'organization.admin_role',
'organization.credential_admin_role',
],
)
use_role = ImplicitRoleField(

View File

@ -132,7 +132,7 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin):
blank=True,
)
admin_role = ImplicitRoleField(
parent_role='organization.admin_role',
parent_role='organization.inventory_admin_role',
)
update_role = ImplicitRoleField(
parent_role='admin_role',

View File

@ -270,10 +270,10 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
allows_field='credentials'
)
admin_role = ImplicitRoleField(
parent_role=['project.organization.admin_role', 'inventory.organization.admin_role']
parent_role=['project.organization.project_admin_role', 'inventory.organization.inventory_admin_role']
)
execute_role = ImplicitRoleField(
parent_role=['admin_role'],
parent_role=['admin_role', 'project.organization.execute_role', 'inventory.organization.execute_role'],
)
read_role = ImplicitRoleField(
parent_role=['project.organization.auditor_role', 'inventory.organization.auditor_role', 'execute_role', 'admin_role'],

View File

@ -45,11 +45,31 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin, CustomVi
admin_role = ImplicitRoleField(
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
)
execute_role = ImplicitRoleField(
parent_role='admin_role',
)
project_admin_role = ImplicitRoleField(
parent_role='admin_role',
)
inventory_admin_role = ImplicitRoleField(
parent_role='admin_role',
)
credential_admin_role = ImplicitRoleField(
parent_role='admin_role',
)
workflow_admin_role = ImplicitRoleField(
parent_role='admin_role',
)
notification_admin_role = ImplicitRoleField(
parent_role='admin_role',
)
auditor_role = ImplicitRoleField(
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,
)
member_role = ImplicitRoleField(
parent_role='admin_role',
parent_role=['admin_role', 'execute_role', 'project_admin_role',
'inventory_admin_role', 'workflow_admin_role',
'notification_admin_role']
)
read_role = ImplicitRoleField(
parent_role=['member_role', 'auditor_role'],

View File

@ -286,7 +286,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin, CustomVirtualEn
)
admin_role = ImplicitRoleField(parent_role=[
'organization.admin_role',
'organization.project_admin_role',
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
])

View File

@ -33,29 +33,42 @@ ROLE_SINGLETON_SYSTEM_ADMINISTRATOR='system_administrator'
ROLE_SINGLETON_SYSTEM_AUDITOR='system_auditor'
role_names = {
'system_administrator' : _('System Administrator'),
'system_auditor' : _('System Auditor'),
'adhoc_role' : _('Ad Hoc'),
'admin_role' : _('Admin'),
'auditor_role' : _('Auditor'),
'execute_role' : _('Execute'),
'member_role' : _('Member'),
'read_role' : _('Read'),
'update_role' : _('Update'),
'use_role' : _('Use'),
'system_administrator': _('System Administrator'),
'system_auditor': _('System Auditor'),
'adhoc_role': _('Ad Hoc'),
'admin_role': _('Admin'),
'project_admin_role': _('Project Admin'),
'inventory_admin_role': _('Inventory Admin'),
'credential_admin_role': _('Credential Admin'),
'workflow_admin_role': _('Workflow Admin'),
'notification_admin_role': _('Notification Admin'),
'auditor_role': _('Auditor'),
'execute_role': _('Execute'),
'member_role': _('Member'),
'read_role': _('Read'),
'update_role': _('Update'),
'use_role': _('Use'),
}
role_descriptions = {
'system_administrator' : _('Can manage all aspects of the system'),
'system_auditor' : _('Can view all settings on the system'),
'adhoc_role' : _('May run ad hoc commands on an inventory'),
'admin_role' : _('Can manage all aspects of the %s'),
'auditor_role' : _('Can view all settings for the %s'),
'execute_role' : _('May run the %s'),
'member_role' : _('User is a member of the %s'),
'read_role' : _('May view settings for the %s'),
'update_role' : _('May update project or inventory or group using the configured source update system'),
'use_role' : _('Can use the %s in a job template'),
'system_administrator': _('Can manage all aspects of the system'),
'system_auditor': _('Can view all settings on the system'),
'adhoc_role': _('May run ad hoc commands on an inventory'),
'admin_role': _('Can manage all aspects of the %s'),
'project_admin_role': _('Can manage all projects of the %s'),
'inventory_admin_role': _('Can manage all inventories of the %s'),
'credential_admin_role': _('Can manage all credentials of the %s'),
'workflow_admin_role': _('Can manage all workflows of the %s'),
'notification_admin_role': _('Can manage all notifications of the %s'),
'auditor_role': _('Can view all settings for the %s'),
'execute_role': {
'organization': _('May run any executable resources in the organization'),
'default': _('May run the %s'),
},
'member_role': _('User is a member of the %s'),
'read_role': _('May view settings for the %s'),
'update_role': _('May update project or inventory or group using the configured source update system'),
'use_role': _('Can use the %s in a job template'),
}
@ -170,12 +183,22 @@ class Role(models.Model):
global role_descriptions
description = role_descriptions[self.role_field]
content_type = self.content_type
if '%s' in description and content_type:
model_name = None
if content_type:
model = content_type.model_class()
model_name = re.sub(r'([a-z])([A-Z])', r'\1 \2', model.__name__).lower()
description = description % model_name
return description
value = description
if type(description) == dict:
value = description.get(model_name)
if value is None:
value = description.get('default')
if '%s' in value and content_type:
value = value % model_name
return value
@staticmethod
def rebuild_role_ancestor_list(additions, removals):

View File

@ -306,10 +306,11 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl
)
admin_role = ImplicitRoleField(parent_role=[
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
'organization.admin_role'
'organization.workflow_admin_role'
])
execute_role = ImplicitRoleField(parent_role=[
'admin_role'
'admin_role',
'organization.execute_role',
])
read_role = ImplicitRoleField(parent_role=[
'singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,

View File

@ -35,9 +35,31 @@ def test_credential_access_auditor(credential, organization_factory):
@pytest.mark.django_db
def test_org_credential_access_member(alice, org_credential, credential):
org_credential.admin_role.members.add(alice)
def test_credential_access_member(alice, credential):
credential.admin_role.members.add(alice)
access = CredentialAccess(alice)
assert access.can_change(credential, {
'description': 'New description.',
'organization': None})
@pytest.mark.django_db
@pytest.mark.parametrize("role_name", ["admin_role", "credential_admin_role"])
def test_org_credential_access_admin(role_name, alice, org_credential):
role = getattr(org_credential.organization, role_name)
role.members.add(alice)
access = CredentialAccess(alice)
# Alice should be able to PATCH if organization is not changed
assert access.can_change(org_credential, {
'description': 'New description.',
'organization': org_credential.organization.pk})
@pytest.mark.django_db
def test_org_credential_access_member(alice, org_credential):
org_credential.admin_role.members.add(alice)
access = CredentialAccess(alice)
@ -47,9 +69,6 @@ def test_org_credential_access_member(alice, org_credential, credential):
'organization': org_credential.organization.pk})
assert access.can_change(org_credential, {
'description': 'New description.'})
assert access.can_change(credential, {
'description': 'New description.',
'organization': None})
@pytest.mark.django_db

View File

@ -62,10 +62,13 @@ def test_org_member_inventory_script_permissions(org_member, organization):
@pytest.mark.django_db
def test_access_admin(organization, inventory, user):
@pytest.mark.parametrize("role", ["admin_role", "inventory_admin_role"])
def test_access_admin(role, organization, inventory, user):
a = user('admin', False)
inventory.organization = organization
organization.admin_role.members.add(a)
role = getattr(organization, role)
role.members.add(a)
access = InventoryAccess(a)
assert access.can_read(inventory)

View File

@ -80,10 +80,15 @@ def test_job_template_access_use_level(jt_linked, rando):
@pytest.mark.django_db
def test_job_template_access_org_admin(jt_linked, rando):
@pytest.mark.parametrize("role_names", [("admin_role",), ("inventory_admin_role", "project_admin_role")])
def test_job_template_access_admin(role_names, jt_linked, rando):
access = JobTemplateAccess(rando)
# Appoint this user as admin of the organization
jt_linked.inventory.organization.admin_role.members.add(rando)
#jt_linked.inventory.organization.admin_role.members.add(rando)
for role_name in role_names:
role = getattr(jt_linked.inventory.organization, role_name)
role.members.add(rando)
# Assign organization permission in the same way the create view does
organization = jt_linked.inventory.organization
jt_linked.get_deprecated_credential('ssh').admin_role.parents.add(organization.admin_role)

View File

@ -33,6 +33,13 @@ def test_notification_template_get_queryset_orgadmin(notification_template, user
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_notification_template_get_queryset_notificationadmin(notification_template, user):
access = NotificationTemplateAccess(user('admin', False))
notification_template.organization.notification_admin_role.members.add(user('admin', False))
assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_notification_template_get_queryset_org_auditor(notification_template, org_auditor):
access = NotificationTemplateAccess(org_auditor)
@ -59,12 +66,13 @@ def test_notification_template_access_superuser(notification_template_factory):
@pytest.mark.django_db
def test_notification_template_access_admin(organization_factory, notification_template_factory):
@pytest.mark.parametrize("role", ["present.admin_role:admin", "present.notification_admin_role:admin"])
def test_notification_template_access_admin(role, organization_factory, notification_template_factory):
other_objects = organization_factory('other')
present_objects = organization_factory('present',
users=['admin'],
notification_templates=['test-notification'],
roles=['present.admin_role:admin'])
roles=[role])
notification_template = present_objects.notification_templates.test_notification
other_org = other_objects.organization

View File

@ -49,6 +49,13 @@ class TestWorkflowJobTemplateAccess:
assert org_admin in wfjt.execute_role
assert org_admin in wfjt.read_role
def test_org_workflow_admin_role_inheritance(self, wfjt, org_member):
wfjt.organization.workflow_admin_role.members.add(org_member)
assert org_member in wfjt.admin_role
assert org_member in wfjt.execute_role
assert org_member in wfjt.read_role
@pytest.mark.django_db
class TestWorkflowJobTemplateNodeAccess:
@ -103,8 +110,12 @@ class TestWorkflowJobTemplateNodeAccess:
@pytest.mark.django_db
class TestWorkflowJobAccess:
def test_org_admin_can_delete_workflow_job(self, workflow_job, org_admin):
access = WorkflowJobAccess(org_admin)
@pytest.mark.parametrize("role_name", ["admin_role", "workflow_admin_role"])
def test_org_admin_can_delete_workflow_job(self, role_name, workflow_job, org_member):
role = getattr(workflow_job.workflow_job_template.organization, role_name)
role.members.add(org_member)
access = WorkflowJobAccess(org_member)
assert access.can_delete(workflow_job)
def test_wfjt_admin_can_delete_workflow_job(self, workflow_job, rando):
@ -132,9 +143,13 @@ class TestWFJTCopyAccess:
admin_access = WorkflowJobTemplateAccess(org_admin)
assert admin_access.can_copy(wfjt)
wfjt.organization.workflow_admin_role.members.add(org_member)
admin_access = WorkflowJobTemplateAccess(org_member)
assert admin_access.can_copy(wfjt)
def test_copy_permissions_user(self, wfjt, org_admin, org_member):
'''
Only org admins are able to add WFJTs, only org admins
Only org admins and org workflow admins are able to add WFJTs, only org admins
are able to copy them
'''
wfjt.admin_role.members.add(org_member)

View File

@ -244,7 +244,7 @@ class TestWorkflowAccessMethods:
def test_workflow_can_add(self, workflow, user_unit):
organization = Organization(name='test-org')
workflow.organization = organization
organization.admin_role = Role()
organization.workflow_admin_role = Role()
def mock_get_object(Class, **kwargs):
if Class == Organization: