Fix the RBAC for attaching an EE to various objects

- Organization.default_environment
- Project.default_environment
- JobTemplate.execution_environment
- WorkflowJobTemplate.execution_environment

System jobs are not editable by anyone other than a system admin, so
we don't need to check.  It appears that unified job templates can't
be created or edited outside of the endpoints for the specific types.
This commit is contained in:
Jeff Bradberry
2021-03-08 17:03:38 -05:00
parent c9ec0d31f1
commit 097f465f39

View File

@@ -777,6 +777,11 @@ class OrganizationAccess(NotificationAttachMixin, BaseAccess):
@check_superuser @check_superuser
def can_change(self, obj, data): def can_change(self, obj, data):
if data and data.get('default_environment'):
ee = get_object_from_data('default_environment', ExecutionEnvironment, data)
if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
return self.user in obj.admin_role return self.user in obj.admin_role
def can_delete(self, obj): def can_delete(self, obj):
@@ -1385,14 +1390,29 @@ class ProjectAccess(NotificationAttachMixin, BaseAccess):
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'project_admin_role').exists() return Organization.accessible_objects(self.user, 'project_admin_role').exists()
return (self.check_related('organization', Organization, data, role_field='project_admin_role', mandatory=True) and
self.check_related('credential', Credential, data, role_field='use_role')) if data.get('default_environment'):
ee = get_object_from_data('default_environment', ExecutionEnvironment, data)
if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
return (
self.check_related('organization', Organization, data, role_field='project_admin_role', mandatory=True) and
self.check_related('credential', Credential, data, role_field='use_role')
)
@check_superuser @check_superuser
def can_change(self, obj, data): def can_change(self, obj, data):
return (self.check_related('organization', Organization, data, obj=obj, role_field='project_admin_role') and if data and data.get('default_environment'):
self.user in obj.admin_role and ee = get_object_from_data('default_environment', ExecutionEnvironment, data, obj=obj)
self.check_related('credential', Credential, data, obj=obj, role_field='use_role')) if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
return (
self.check_related('organization', Organization, data, obj=obj, role_field='project_admin_role') and
self.user in obj.admin_role and
self.check_related('credential', Credential, data, obj=obj, role_field='use_role')
)
@check_superuser @check_superuser
def can_start(self, obj, validate_license=True): def can_start(self, obj, validate_license=True):
@@ -1497,6 +1517,10 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess):
if self.user not in inventory.use_role: if self.user not in inventory.use_role:
return False return False
ee = get_value(ExecutionEnvironment, 'execution_environment')
if ee and not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
project = get_value(Project, 'project') project = get_value(Project, 'project')
# If the user has admin access to the project (as an org admin), should # If the user has admin access to the project (as an org admin), should
# be able to proceed without additional checks. # be able to proceed without additional checks.
@@ -1544,6 +1568,11 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess):
if self.changes_are_non_sensitive(obj, data): if self.changes_are_non_sensitive(obj, data):
return True return True
if data.get('execution_environment'):
ee = get_object_from_data('execution_environment', ExecutionEnvironment, data)
if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
for required_field, cls in (('inventory', Inventory), ('project', Project)): for required_field, cls in (('inventory', Inventory), ('project', Project)):
is_mandatory = True is_mandatory = True
if not getattr(obj, '{}_id'.format(required_field)): if not getattr(obj, '{}_id'.format(required_field)):
@@ -1974,6 +2003,11 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'workflow_admin_role').exists() return Organization.accessible_objects(self.user, 'workflow_admin_role').exists()
if data.get('execution_environment'):
ee = get_object_from_data('execution_environment', ExecutionEnvironment, data)
if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
return ( return (
self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True) and self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True) and
self.check_related('inventory', Inventory, data, role_field='use_role') self.check_related('inventory', Inventory, data, role_field='use_role')
@@ -2023,6 +2057,11 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
if self.user.is_superuser: if self.user.is_superuser:
return True return True
if data and data.get('execution_environment'):
ee = get_object_from_data('execution_environment', ExecutionEnvironment, data)
if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
return ( return (
self.check_related('organization', Organization, data, role_field='workflow_admin_role', obj=obj) and self.check_related('organization', Organization, data, role_field='workflow_admin_role', obj=obj) and
self.check_related('inventory', Inventory, data, role_field='use_role', obj=obj) and self.check_related('inventory', Inventory, data, role_field='use_role', obj=obj) and