DRY edits to access classes for new prompts

Remove if-not-data conditional from WFJTnode.can_change
  these are cannonical for can_add, but this looks like a bug

Change JTaccess.can_unattach to call same method in super()
  previously called can_attach, which is problematic

Better consolidate launch config m2m related checks

Test and fix pre-existing WFJT node RBAC bug

recognize not-provided instance group list on launch, avoiding bug where it fell back to default

fix bug where timeout field was saved on WFJT nodes after creating approval node

remove labels from schedule serializer summary_fields

remove unnecessary prefetch of credentials from WFJT node queryset
This commit is contained in:
Alan Rominger
2022-09-12 22:06:10 -04:00
parent ead56bfa1b
commit 34e8087aee
7 changed files with 146 additions and 249 deletions

View File

@@ -4794,7 +4794,7 @@ class SchedulePreviewSerializer(BaseSerializer):
return value return value
class ScheduleSerializer(LabelsListMixin, LaunchConfigurationBaseSerializer, SchedulePreviewSerializer): class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSerializer):
show_capabilities = ['edit', 'delete'] show_capabilities = ['edit', 'delete']
timezone = serializers.SerializerMethodField( timezone = serializers.SerializerMethodField(

View File

@@ -12,7 +12,7 @@ from django.conf import settings
from django.db.models import Q, Prefetch from django.db.models import Q, Prefetch
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist
# Django REST Framework # Django REST Framework
from rest_framework.exceptions import ParseError, PermissionDenied from rest_framework.exceptions import ParseError, PermissionDenied
@@ -281,13 +281,23 @@ class BaseAccess(object):
""" """
return True return True
def assure_relationship_exists(self, obj, relationship):
if '.' in relationship:
return # not attempting validation for complex relationships now
try:
obj._meta.get_field(relationship)
except FieldDoesNotExist:
raise NotImplementedError(f'The relationship {relationship} does not exist for model {type(obj)}')
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
self.assure_relationship_exists(obj, relationship)
if skip_sub_obj_read_check: if skip_sub_obj_read_check:
return self.can_change(obj, None) return self.can_change(obj, None)
else: else:
return bool(self.can_change(obj, None) and self.user.can_access(type(sub_obj), 'read', sub_obj)) return bool(self.can_change(obj, None) and self.user.can_access(type(sub_obj), 'read', sub_obj))
def can_unattach(self, obj, sub_obj, relationship, data=None): def can_unattach(self, obj, sub_obj, relationship, data=None):
self.assure_relationship_exists(obj, relationship)
return self.can_change(obj, data) return self.can_change(obj, data)
def check_related(self, field, Model, data, role_field='admin_role', obj=None, mandatory=False): def check_related(self, field, Model, data, role_field='admin_role', obj=None, mandatory=False):
@@ -328,6 +338,8 @@ class BaseAccess(object):
role = getattr(resource, role_field, None) role = getattr(resource, role_field, None)
if role is None: if role is None:
# Handle special case where resource does not have direct roles # Handle special case where resource does not have direct roles
if role_field == 'read_role':
return self.user.can_access(type(resource), 'read', resource)
access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field]
return self.user.can_access(type(resource), access_method_type, resource, None) return self.user.can_access(type(resource), access_method_type, resource, None)
return self.user in role return self.user in role
@@ -499,6 +511,21 @@ class BaseAccess(object):
return False return False
class UnifiedCredentialsMixin(BaseAccess):
"""
The credentials many-to-many is a standard relationship for JT, jobs, and others
Permission to attach is always use permission, and permission to unattach is admin to the parent object
"""
@check_superuser
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
if relationship == 'credentials':
if not isinstance(sub_obj, Credential):
raise RuntimeError(f'Can only attach credentials to credentials relationship, got {type(sub_obj)}')
return self.can_change(obj, None) and (self.user in sub_obj.use_role)
return super().can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
class NotificationAttachMixin(BaseAccess): class NotificationAttachMixin(BaseAccess):
"""For models that can have notifications attached """For models that can have notifications attached
@@ -1031,7 +1058,7 @@ class GroupAccess(BaseAccess):
return bool(obj and self.user in obj.inventory.admin_role) return bool(obj and self.user in obj.inventory.admin_role)
class InventorySourceAccess(NotificationAttachMixin, BaseAccess): class InventorySourceAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAccess):
""" """
I can see inventory sources whenever I can see their inventory. I can see inventory sources whenever I can see their inventory.
I can change inventory sources whenever I can change their inventory. I can change inventory sources whenever I can change their inventory.
@@ -1075,18 +1102,6 @@ class InventorySourceAccess(NotificationAttachMixin, BaseAccess):
return self.user in obj.inventory.update_role return self.user in obj.inventory.update_role
return False return False
@check_superuser
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
if relationship == 'credentials' and isinstance(sub_obj, Credential):
return obj and obj.inventory and self.user in obj.inventory.admin_role and self.user in sub_obj.use_role
return super(InventorySourceAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
@check_superuser
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == 'credentials' and isinstance(sub_obj, Credential):
return obj and obj.inventory and self.user in obj.inventory.admin_role
return super(InventorySourceAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
class InventoryUpdateAccess(BaseAccess): class InventoryUpdateAccess(BaseAccess):
""" """
@@ -1485,7 +1500,7 @@ class ProjectUpdateAccess(BaseAccess):
return obj and self.user in obj.project.admin_role return obj and self.user in obj.project.admin_role
class JobTemplateAccess(NotificationAttachMixin, BaseAccess): class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAccess):
""" """
I can see job templates when: I can see job templates when:
- I have read role for the job template. - I have read role for the job template.
@@ -1549,8 +1564,7 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess):
if self.user not in inventory.use_role: if self.user not in inventory.use_role:
return False return False
ee = get_value(ExecutionEnvironment, 'execution_environment') if not self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role'):
if ee and not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False return False
project = get_value(Project, 'project') project = get_value(Project, 'project')
@@ -1600,10 +1614,8 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess):
if self.changes_are_non_sensitive(obj, data): if self.changes_are_non_sensitive(obj, data):
return True return True
if data.get('execution_environment'): if not self.check_related('execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role'):
ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) return False
if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
for required_field, cls in (('inventory', Inventory), ('project', Project)): for required_field, cls in (('inventory', Inventory), ('project', Project)):
is_mandatory = True is_mandatory = True
@@ -1667,17 +1679,13 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess):
if not obj.organization: if not obj.organization:
return False return False
return self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.organization.admin_role return self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.organization.admin_role
if relationship == 'credentials' and isinstance(sub_obj, Credential):
return self.user in obj.admin_role and self.user in sub_obj.use_role
return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
@check_superuser @check_superuser
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == "instance_groups": if relationship == "instance_groups":
return self.can_attach(obj, sub_obj, relationship, *args, **kwargs) return self.can_attach(obj, sub_obj, relationship, *args, **kwargs)
if relationship == 'credentials' and isinstance(sub_obj, Credential): return super(JobTemplateAccess, self).can_unattach(obj, sub_obj, relationship, *args, **kwargs)
return self.user in obj.admin_role
return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
class JobAccess(BaseAccess): class JobAccess(BaseAccess):
@@ -1824,7 +1832,7 @@ class SystemJobAccess(BaseAccess):
return False # no relaunching of system jobs return False # no relaunching of system jobs
class JobLaunchConfigAccess(BaseAccess): class JobLaunchConfigAccess(UnifiedCredentialsMixin, BaseAccess):
""" """
Launch configs must have permissions checked for Launch configs must have permissions checked for
- relaunching - relaunching
@@ -1832,185 +1840,69 @@ class JobLaunchConfigAccess(BaseAccess):
In order to create a new object with a copy of this launch config, I need: In order to create a new object with a copy of this launch config, I need:
- use access to related inventory (if present) - use access to related inventory (if present)
- read access to Execution Environment (if present), unless the specified ee is already in the template
- use role to many-related credentials (if any present) - use role to many-related credentials (if any present)
- use role to Execution Environment (if present), unless the specified ee is already in the template - read access to many-related labels (if any present), unless the specified label is already in the template
- use role to many-related labels (if any present), unless the specified label is already in the template - read access to many-related instance groups (if any present), unless the specified instance group is already in the template
- use role to many-related instance groups (if any present), unless the specified instance group is already in the template
""" """
model = JobLaunchConfig model = JobLaunchConfig
select_related = 'job' select_related = 'job'
prefetch_related = ('credentials', 'inventory') prefetch_related = ('credentials', 'inventory')
def _unusable_creds_exist(self, qs): M2M_CHECKS = {'credentials': Credential, 'labels': Label, 'instance_groups': InstanceGroup}
return qs.exclude(pk__in=Credential._accessible_pk_qs(Credential, self.user, 'use_role')).exists()
def has_credentials_access(self, obj): def _related_filtered_queryset(self, cls):
# user has access if no related credentials exist that the user lacks use role for if cls is Label:
return not self._unusable_creds_exist(obj.credentials) return LabelAccess(self.user).filtered_queryset()
elif cls is InstanceGroup:
return InstanceGroupAccess(self.user).filtered_queryset()
else:
return cls._accessible_pk_qs(cls, self.user, 'use_role')
def has_obj_m2m_access(self, obj):
for relationship, cls in self.M2M_CHECKS.items():
if getattr(obj, relationship).exclude(pk__in=self._related_filtered_queryset(cls)).exists():
return False
return True
@check_superuser @check_superuser
def can_add(self, data, template=None): def can_add(self, data, template=None):
# This is a special case, we don't check related many-to-many elsewhere # This is a special case, we don't check related many-to-many elsewhere
# launch RBAC checks use this # launch RBAC checks use this
permission_error = False if 'reference_obj' in data:
if 'credentials' in data and data['credentials'] or 'reference_obj' in data: if not self.has_obj_m2m_access(data['reference_obj']):
if 'reference_obj' in data: return False
prompted_cred_qs = data['reference_obj'].credentials.all() else:
else: for relationship, cls in self.M2M_CHECKS.items():
# If given model objects, only use the primary key from them if relationship in data and data[relationship]:
cred_pks = [cred.pk for cred in data['credentials']] # If given model objects, only use the primary key from them
if template: sub_obj_pks = [sub_obj.pk for sub_obj in data[relationship]]
for cred in template.credentials.all(): if template:
if cred.pk in cred_pks: for sub_obj in getattr(template, relationship).all():
cred_pks.remove(cred.pk) if sub_obj.pk in sub_obj_pks:
prompted_cred_qs = Credential.objects.filter(pk__in=cred_pks) sub_obj_pks.remove(sub_obj.pk)
if self._unusable_creds_exist(prompted_cred_qs): if cls.objects.filter(pk__in=sub_obj_pks).exclude(pk__in=self._related_filtered_queryset(cls)).exists():
credential_names = [cred.name for cred in prompted_cred_qs] return False
logger.debug("User {} not allowed to access credentials in {}".format(self.user.username, credential_names)) return self.check_related('inventory', Inventory, data, role_field='use_role') and self.check_related(
permission_error = True 'execution_environment', ExecutionEnvironment, data, role_field='read_role'
if 'execution_environment' in data and data['execution_environment'] or 'reference_obj' in data: )
if 'reference_obj' in data:
ee = data['reference_obj'].execution_environment
else:
ee = data['execution_environment']
if ee and not self.user.can_access(ExecutionEnvironment, 'read', ee):
if not template or ee != template.execution_environment:
logger.debug("User {} not allowed access to ee {}".format(self.user.username, ee.name))
permission_error = True
else:
logger.debug(
"User {} does not have permissions to execution_environment {} but its part of the template".format(self.user.username, ee.name)
)
if 'labels' in data and data['labels'] or 'reference_obj' in data:
if 'reference_obj' in data:
labels = data['reference_obj'].labels.all()
else:
labels = data['labels']
for a_label in labels:
if not self.user.can_access(Label, 'read', a_label):
# This if allows a template admin who can see labels to specify a list and the executor to select a subset of the list
if not template or a_label not in template.labels.all():
logger.debug("User {} not allowed access to label {}".format(self.user.username, a_label.name))
permission_error = True
else:
logger.debug("User {} does not have permissions to label {} but its part of the template".format(self.user.username, a_label.name))
if 'instance_groups' in data and data['instance_groups'] or 'reference_obj' in data:
if 'reference_obj' in data:
instance_groups = data['reference_obj'].labels.all()
else:
instance_groups = data['instance_groups']
for an_ig in instance_groups:
if not an_ig in self.user.get_queryset(InstanceGroup):
# This if allows a template admin who can see IGs to specify a list and the executor to select a subset of the list
if not template or an_ig not in template.instance_groups.all():
logger.debug("user {} not allowed access to instance group {}".format(self.user.username, an_ig.name))
permission_error = True
else:
logger.debug(
"User {} does not have permissions to instance_group {} but its part of the template".format(self.user.username, an_ig.name)
)
if permission_error:
return False
return self.check_related('inventory', Inventory, data, role_field='use_role')
@check_superuser @check_superuser
def can_use(self, obj): def can_use(self, obj):
inventory_check = self.check_related('inventory', Inventory, {}, obj=obj, role_field='use_role', mandatory=True) return (
return inventory_check and self.has_credentials_access(obj) self.has_obj_m2m_access(obj)
and self.check_related('inventory', Inventory, {}, obj=obj, role_field='use_role', mandatory=True)
and self.check_related('execution_environment', ExecutionEnvironment, {}, obj=obj, role_field='read_role')
)
def can_change(self, obj, data): def can_change(self, obj, data):
return self.check_related('inventory', Inventory, data, obj=obj, role_field='use_role') return self.check_related('inventory', Inventory, data, obj=obj, role_field='use_role') and self.check_related(
'execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role'
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): )
try:
obj_name = obj.name
except AttributeError:
obj_name = obj.identifier
if isinstance(sub_obj, Credential) and relationship == 'credentials':
if not self.user in sub_obj.use_role:
logger.debug(
"User {} not allowed access to credential {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id)
)
return False
return True
if isinstance(sub_obj, Label) and relationship == 'labels':
if not self.user.can_access(Label, 'read', sub_obj):
logger.debug("User {} not allowed access to label {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id))
return False
return True
if isinstance(sub_obj, InstanceGroup) and relationship == 'instance_groups':
if not sub_obj in self.user.get_queryset(InstanceGroup):
logger.debug(
"User {} not allowed access to instance_group {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id)
)
return False
return True
raise NotImplementedError('Only credentials, labels and instance groups can be attached to launch configurations.')
def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
try:
obj_name = obj.name
except AttributeError:
obj_name = obj.identifier
if isinstance(sub_obj, Credential) and relationship == 'credentials':
if not skip_sub_obj_read_check:
logger.debug(
"Skipping check if user {} can access credential {} ({}) for removal from {} {} ({})".format(
self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id
)
)
return True
if not self.user in sub_obj.read_role:
logger.debug(
"User {} can not read credential {} ({}) for removal from {} {} ({})".format(
self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id
)
)
return False
return True
if isinstance(sub_obj, Label) and relationship == 'labels':
if skip_sub_obj_read_check:
logger.debug(
"Skipping check if user {} can access label {} ({}) for removal from {} {} ({})".format(
self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id
)
)
return True
if self.user.can_access(Label, 'read', sub_obj):
return True
logger.debug(
"User {} can not read label {} ({}) for removal from {} {} ({})".format(
self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id
)
)
return False
if isinstance(sub_obj, InstanceGroup) and relationship == 'instance_groups':
if skip_sub_obj_read_check:
logger.debug(
"Skipping check if user {} can access instance_group {} ({}) for removal from {} {} ({})".format(
self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id
)
)
return True
if sub_obj in self.user.get_queryset(InstanceGroup):
return True
logger.debug(
"User {} can not read instance_group {} ({}) for removal from {} {} ({})".format(
self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id
)
)
return False
raise NotImplementedError('Only credentials, labels and instance groups can be attached to launch configurations.')
class WorkflowJobTemplateNodeAccess(BaseAccess): class WorkflowJobTemplateNodeAccess(UnifiedCredentialsMixin, BaseAccess):
""" """
I can see/use a WorkflowJobTemplateNode if I have read permission I can see/use a WorkflowJobTemplateNode if I have read permission
to associated Workflow Job Template to associated Workflow Job Template
@@ -2033,7 +1925,7 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
""" """
model = WorkflowJobTemplateNode model = WorkflowJobTemplateNode
prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'credentials', 'workflow_job_template') prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'workflow_job_template')
def filtered_queryset(self): def filtered_queryset(self):
return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.accessible_objects(self.user, 'read_role')) return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.accessible_objects(self.user, 'read_role'))
@@ -2045,7 +1937,8 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
return ( return (
self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True) self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True)
and self.check_related('unified_job_template', UnifiedJobTemplate, data, role_field='execute_role') and self.check_related('unified_job_template', UnifiedJobTemplate, data, role_field='execute_role')
and JobLaunchConfigAccess(self.user).can_add(data) and self.check_related('inventory', Inventory, data, role_field='use_role')
and self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role')
) )
def wfjt_admin(self, obj): def wfjt_admin(self, obj):
@@ -2054,17 +1947,14 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
else: else:
return self.user in obj.workflow_job_template.admin_role return self.user in obj.workflow_job_template.admin_role
def ujt_execute(self, obj): def ujt_execute(self, obj, data=None):
if not obj.unified_job_template: if not obj.unified_job_template:
return True return True
return self.check_related('unified_job_template', UnifiedJobTemplate, {}, obj=obj, role_field='execute_role', mandatory=True) return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, role_field='execute_role', mandatory=True)
def can_change(self, obj, data): def can_change(self, obj, data):
if not data:
return True
# should not be able to edit the prompts if lacking access to UJT or WFJT # should not be able to edit the prompts if lacking access to UJT or WFJT
return self.ujt_execute(obj) and self.wfjt_admin(obj) and JobLaunchConfigAccess(self.user).can_change(obj, data) return self.ujt_execute(obj, data=data) and self.wfjt_admin(obj) and JobLaunchConfigAccess(self.user).can_change(obj, data)
def can_delete(self, obj): def can_delete(self, obj):
return self.wfjt_admin(obj) return self.wfjt_admin(obj)
@@ -2077,29 +1967,14 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
return True return True
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
if not self.wfjt_admin(obj): if relationship in ('success_nodes', 'failure_nodes', 'always_nodes'):
return False return self.wfjt_admin(obj) and self.check_same_WFJT(obj, sub_obj)
if relationship in ['credentials', 'labels', 'instance_groups']: return super().can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
# Need permission to related template to attach a credential
if not self.ujt_execute(obj):
return False
return JobLaunchConfigAccess(self.user).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
elif relationship in ('success_nodes', 'failure_nodes', 'always_nodes'):
return self.check_same_WFJT(obj, sub_obj)
else:
raise NotImplementedError('Relationship {} not understood for WFJT nodes.'.format(relationship))
def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): def can_unattach(self, obj, sub_obj, relationship, data=None):
if not self.wfjt_admin(obj): if relationship in ('success_nodes', 'failure_nodes', 'always_nodes'):
return False return self.wfjt_admin(obj)
if relationship in ['credentials', 'labels', 'instance_groups']: return super().can_unattach(obj, sub_obj, relationship, data=None)
if not self.ujt_execute(obj):
return False
return JobLaunchConfigAccess(self.user).can_unattach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
elif relationship in ('success_nodes', 'failure_nodes', 'always_nodes'):
return self.check_same_WFJT(obj, sub_obj)
else:
raise NotImplementedError('Relationship {} not understood for WFJT nodes.'.format(relationship))
class WorkflowJobNodeAccess(BaseAccess): class WorkflowJobNodeAccess(BaseAccess):
@@ -2174,13 +2049,10 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'workflow_admin_role').exists() return Organization.accessible_objects(self.user, 'workflow_admin_role').exists()
if data.get('execution_environment'): return bool(
ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True)
if not self.user.can_access(ExecutionEnvironment, 'read', ee): and self.check_related('inventory', Inventory, data, role_field='use_role')
return False and self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role')
return self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True) and self.check_related(
'inventory', Inventory, data, role_field='use_role'
) )
def can_copy(self, obj): def can_copy(self, obj):
@@ -2226,14 +2098,10 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess):
if self.user.is_superuser: if self.user.is_superuser:
return True return True
if data and data.get('execution_environment'):
ee = get_object_from_data('execution_environment', ExecutionEnvironment, data)
if not self.user.can_access(ExecutionEnvironment, 'read', ee):
return False
return ( return (
self.check_related('organization', Organization, data, role_field='workflow_admin_role', obj=obj) self.check_related('organization', Organization, data, role_field='workflow_admin_role', obj=obj)
and self.check_related('inventory', Inventory, data, role_field='use_role', obj=obj) and self.check_related('inventory', Inventory, data, role_field='use_role', obj=obj)
and self.check_related('execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role')
and self.user in obj.admin_role and self.user in obj.admin_role
) )
@@ -2640,7 +2508,7 @@ class UnifiedJobAccess(BaseAccess):
return super(UnifiedJobAccess, self).get_queryset().filter(workflowapproval__isnull=True) return super(UnifiedJobAccess, self).get_queryset().filter(workflowapproval__isnull=True)
class ScheduleAccess(BaseAccess): class ScheduleAccess(UnifiedCredentialsMixin, BaseAccess):
""" """
I can see a schedule if I can see it's related unified job, I can create them or update them if I have write access I can see a schedule if I can see it's related unified job, I can create them or update them if I have write access
""" """
@@ -2681,12 +2549,6 @@ class ScheduleAccess(BaseAccess):
def can_delete(self, obj): def can_delete(self, obj):
return self.can_change(obj, {}) return self.can_change(obj, {})
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
return JobLaunchConfigAccess(self.user).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
return JobLaunchConfigAccess(self.user).can_unattach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
class NotificationTemplateAccess(BaseAccess): class NotificationTemplateAccess(BaseAccess):
""" """

View File

@@ -382,7 +382,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
unified_job.survey_passwords = new_job_passwords unified_job.survey_passwords = new_job_passwords
kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch
if 'instance_groups' in kwargs: if kwargs.get('instance_groups'):
unified_job.preferred_instance_groups_cache = [ig.id for ig in kwargs['instance_groups']] unified_job.preferred_instance_groups_cache = [ig.id for ig in kwargs['instance_groups']]
else: else:
unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache() unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache()

View File

@@ -218,7 +218,7 @@ class WorkflowJobTemplateNode(WorkflowNodeBase):
approval_template = WorkflowApprovalTemplate(**kwargs) approval_template = WorkflowApprovalTemplate(**kwargs)
approval_template.save() approval_template.save()
self.unified_job_template = approval_template self.unified_job_template = approval_template
self.save() self.save(update_fields=['unified_job_template'])
return approval_template return approval_template

View File

@@ -77,6 +77,18 @@ class TestApprovalNodes:
assert approval_node.unified_job_template.description == 'Approval Node' assert approval_node.unified_job_template.description == 'Approval Node'
assert approval_node.unified_job_template.timeout == 0 assert approval_node.unified_job_template.timeout == 0
def test_approval_node_creation_with_timeout(self, post, approval_node, admin_user):
assert approval_node.timeout is None
url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'})
post(url, {'name': 'Test', 'description': 'Approval Node', 'timeout': 10}, user=admin_user, expect=201)
approval_node = WorkflowJobTemplateNode.objects.get(pk=approval_node.pk)
approval_node.refresh_from_db()
assert approval_node.timeout is None
assert isinstance(approval_node.unified_job_template, WorkflowApprovalTemplate)
assert approval_node.unified_job_template.timeout == 10
def test_approval_node_creation_failure(self, post, approval_node, admin_user): def test_approval_node_creation_failure(self, post, approval_node, admin_user):
# This test leaves off a required param to assert that user will get a 400. # This test leaves off a required param to assert that user will get a 400.
url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'}) url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'})

View File

@@ -315,13 +315,13 @@ class TestLaunchConfigAccess:
access = JobLaunchConfigAccess(rando) access = JobLaunchConfigAccess(rando)
cred1, cred2 = self._make_two_credentials(credentialtype_ssh) cred1, cred2 = self._make_two_credentials(credentialtype_ssh)
assert access.has_credentials_access(config) # has access if 0 creds assert access.has_obj_m2m_access(config) # has access if 0 creds
config.credentials.add(cred1, cred2) config.credentials.add(cred1, cred2)
assert not access.has_credentials_access(config) # lacks access to both assert not access.has_obj_m2m_access(config) # lacks access to both
cred1.use_role.members.add(rando) cred1.use_role.members.add(rando)
assert not access.has_credentials_access(config) # lacks access to 1 assert not access.has_obj_m2m_access(config) # lacks access to 1
cred2.use_role.members.add(rando) cred2.use_role.members.add(rando)
assert access.has_credentials_access(config) # has access to both assert access.has_obj_m2m_access(config) # has access to both
def test_new_execution_environment_access(self, rando): def test_new_execution_environment_access(self, rando):
ee = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar') ee = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar')

View File

@@ -6,6 +6,7 @@ from awx.main.access import (
WorkflowJobAccess, WorkflowJobAccess,
# WorkflowJobNodeAccess # WorkflowJobNodeAccess
) )
from awx.main.models import JobTemplate, WorkflowJobTemplateNode
from rest_framework.exceptions import PermissionDenied from rest_framework.exceptions import PermissionDenied
@@ -87,6 +88,16 @@ class TestWorkflowJobTemplateNodeAccess:
job_template.read_role.members.add(rando) job_template.read_role.members.add(rando)
assert not access.can_add({'workflow_job_template': wfjt, 'unified_job_template': job_template}) assert not access.can_add({'workflow_job_template': wfjt, 'unified_job_template': job_template})
def test_change_JT_no_start_perm(self, wfjt, rando):
wfjt.admin_role.members.add(rando)
access = WorkflowJobTemplateNodeAccess(rando)
jt1 = JobTemplate.objects.create()
jt1.execute_role.members.add(rando)
assert access.can_add({'workflow_job_template': wfjt, 'unified_job_template': jt1})
node = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=jt1)
jt2 = JobTemplate.objects.create()
assert not access.can_change(node, {'unified_job_template': jt2.id})
def test_add_node_with_minimum_permissions(self, wfjt, job_template, inventory, rando): def test_add_node_with_minimum_permissions(self, wfjt, job_template, inventory, rando):
wfjt.admin_role.members.add(rando) wfjt.admin_role.members.add(rando)
access = WorkflowJobTemplateNodeAccess(rando) access = WorkflowJobTemplateNodeAccess(rando)
@@ -104,14 +115,12 @@ class TestWorkflowJobTemplateNodeAccess:
@pytest.mark.parametrize( @pytest.mark.parametrize(
"add_wfjt_admin, add_jt_admin, permission_type, expected_result, method_type", "add_wfjt_admin, add_jt_admin, permission_type, expected_result, method_type",
[ [
(False, False, None, False, 'can_attach'),
(True, False, 'credentials', False, 'can_attach'), (True, False, 'credentials', False, 'can_attach'),
(True, True, 'credentials', True, 'can_attach'), (True, True, 'credentials', True, 'can_attach'),
(True, False, 'labels', False, 'can_attach'), (True, False, 'labels', False, 'can_attach'),
(True, True, 'labels', True, 'can_attach'), (True, True, 'labels', True, 'can_attach'),
(True, False, 'instance_groups', False, 'can_attach'), (True, False, 'instance_groups', False, 'can_attach'),
(True, True, 'instance_groups', True, 'can_attach'), (True, True, 'instance_groups', True, 'can_attach'),
(False, False, None, False, 'can_unattach'),
(True, False, 'credentials', False, 'can_unattach'), (True, False, 'credentials', False, 'can_unattach'),
(True, True, 'credentials', True, 'can_unattach'), (True, True, 'credentials', True, 'can_unattach'),
(True, False, 'labels', False, 'can_unattach'), (True, False, 'labels', False, 'can_unattach'),
@@ -128,11 +137,25 @@ class TestWorkflowJobTemplateNodeAccess:
if add_jt_admin: if add_jt_admin:
job_template.execute_role.members.add(rando) job_template.execute_role.members.add(rando)
# We have to mock the JobLaunchConfigAccess because the attachment methods will look at the object type and the relation from awx.main.models import Credential, Label, InstanceGroup, Organization, CredentialType
# Since we pass None as the second param this will trigger an NotImplementedError from that object
with mocker.patch('awx.main.access.JobLaunchConfigAccess.{}'.format(method_type), return_value=True): if permission_type == 'credentials':
access = WorkflowJobTemplateNodeAccess(rando) sub_obj = Credential.objects.create(credential_type=CredentialType.objects.create())
assert getattr(access, method_type)(wfjt_node, None, permission_type, None) == expected_result sub_obj.use_role.members.add(rando)
elif permission_type == 'labels':
sub_obj = Label.objects.create(organization=Organization.objects.create())
sub_obj.organization.member_role.members.add(rando)
elif permission_type == 'instance_groups':
sub_obj = InstanceGroup.objects.create()
org = Organization.objects.create()
org.admin_role.members.add(rando) # only admins can see IGs
org.instance_groups.add(sub_obj)
access = WorkflowJobTemplateNodeAccess(rando)
if method_type == 'can_unattach':
assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type) == expected_result
else:
assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type, {}) == expected_result
# The actual attachment of labels, credentials and instance groups are tested from JobLaunchConfigAccess # The actual attachment of labels, credentials and instance groups are tested from JobLaunchConfigAccess