diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 4dd6d8182f..5592540efc 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4794,7 +4794,7 @@ class SchedulePreviewSerializer(BaseSerializer): return value -class ScheduleSerializer(LabelsListMixin, LaunchConfigurationBaseSerializer, SchedulePreviewSerializer): +class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSerializer): show_capabilities = ['edit', 'delete'] timezone = serializers.SerializerMethodField( diff --git a/awx/main/access.py b/awx/main/access.py index 81a1d4372f..e8deea8f36 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -12,7 +12,7 @@ from django.conf import settings from django.db.models import Q, Prefetch from django.contrib.auth.models import User from django.utils.translation import gettext_lazy as _ -from django.core.exceptions import ObjectDoesNotExist +from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist # Django REST Framework from rest_framework.exceptions import ParseError, PermissionDenied @@ -281,13 +281,23 @@ class BaseAccess(object): """ return True + def assure_relationship_exists(self, obj, relationship): + if '.' in relationship: + return # not attempting validation for complex relationships now + try: + obj._meta.get_field(relationship) + except FieldDoesNotExist: + raise NotImplementedError(f'The relationship {relationship} does not exist for model {type(obj)}') + def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): + self.assure_relationship_exists(obj, relationship) if skip_sub_obj_read_check: return self.can_change(obj, None) else: return bool(self.can_change(obj, None) and self.user.can_access(type(sub_obj), 'read', sub_obj)) def can_unattach(self, obj, sub_obj, relationship, data=None): + self.assure_relationship_exists(obj, relationship) return self.can_change(obj, data) def check_related(self, field, Model, data, role_field='admin_role', obj=None, mandatory=False): @@ -328,6 +338,8 @@ class BaseAccess(object): role = getattr(resource, role_field, None) if role is None: # Handle special case where resource does not have direct roles + if role_field == 'read_role': + return self.user.can_access(type(resource), 'read', resource) access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] return self.user.can_access(type(resource), access_method_type, resource, None) return self.user in role @@ -499,6 +511,21 @@ class BaseAccess(object): return False +class UnifiedCredentialsMixin(BaseAccess): + """ + The credentials many-to-many is a standard relationship for JT, jobs, and others + Permission to attach is always use permission, and permission to unattach is admin to the parent object + """ + + @check_superuser + def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): + if relationship == 'credentials': + if not isinstance(sub_obj, Credential): + raise RuntimeError(f'Can only attach credentials to credentials relationship, got {type(sub_obj)}') + return self.can_change(obj, None) and (self.user in sub_obj.use_role) + return super().can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) + + class NotificationAttachMixin(BaseAccess): """For models that can have notifications attached @@ -1031,7 +1058,7 @@ class GroupAccess(BaseAccess): return bool(obj and self.user in obj.inventory.admin_role) -class InventorySourceAccess(NotificationAttachMixin, BaseAccess): +class InventorySourceAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAccess): """ I can see inventory sources whenever I can see their inventory. I can change inventory sources whenever I can change their inventory. @@ -1075,18 +1102,6 @@ class InventorySourceAccess(NotificationAttachMixin, BaseAccess): return self.user in obj.inventory.update_role return False - @check_superuser - def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return obj and obj.inventory and self.user in obj.inventory.admin_role and self.user in sub_obj.use_role - return super(InventorySourceAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - - @check_superuser - def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return obj and obj.inventory and self.user in obj.inventory.admin_role - return super(InventorySourceAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) - class InventoryUpdateAccess(BaseAccess): """ @@ -1485,7 +1500,7 @@ class ProjectUpdateAccess(BaseAccess): return obj and self.user in obj.project.admin_role -class JobTemplateAccess(NotificationAttachMixin, BaseAccess): +class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAccess): """ I can see job templates when: - I have read role for the job template. @@ -1549,8 +1564,7 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess): if self.user not in inventory.use_role: return False - ee = get_value(ExecutionEnvironment, 'execution_environment') - if ee and not self.user.can_access(ExecutionEnvironment, 'read', ee): + if not self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role'): return False project = get_value(Project, 'project') @@ -1600,10 +1614,8 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess): if self.changes_are_non_sensitive(obj, data): return True - if data.get('execution_environment'): - ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) - if not self.user.can_access(ExecutionEnvironment, 'read', ee): - return False + if not self.check_related('execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role'): + return False for required_field, cls in (('inventory', Inventory), ('project', Project)): is_mandatory = True @@ -1667,17 +1679,13 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess): if not obj.organization: return False return self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.organization.admin_role - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return self.user in obj.admin_role and self.user in sub_obj.use_role return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) @check_superuser def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): if relationship == "instance_groups": return self.can_attach(obj, sub_obj, relationship, *args, **kwargs) - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return self.user in obj.admin_role - return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) + return super(JobTemplateAccess, self).can_unattach(obj, sub_obj, relationship, *args, **kwargs) class JobAccess(BaseAccess): @@ -1824,7 +1832,7 @@ class SystemJobAccess(BaseAccess): return False # no relaunching of system jobs -class JobLaunchConfigAccess(BaseAccess): +class JobLaunchConfigAccess(UnifiedCredentialsMixin, BaseAccess): """ Launch configs must have permissions checked for - relaunching @@ -1832,185 +1840,69 @@ class JobLaunchConfigAccess(BaseAccess): In order to create a new object with a copy of this launch config, I need: - use access to related inventory (if present) + - read access to Execution Environment (if present), unless the specified ee is already in the template - use role to many-related credentials (if any present) - - use role to Execution Environment (if present), unless the specified ee is already in the template - - use role to many-related labels (if any present), unless the specified label is already in the template - - use role to many-related instance groups (if any present), unless the specified instance group is already in the template + - read access to many-related labels (if any present), unless the specified label is already in the template + - read access to many-related instance groups (if any present), unless the specified instance group is already in the template """ model = JobLaunchConfig select_related = 'job' prefetch_related = ('credentials', 'inventory') - def _unusable_creds_exist(self, qs): - return qs.exclude(pk__in=Credential._accessible_pk_qs(Credential, self.user, 'use_role')).exists() + M2M_CHECKS = {'credentials': Credential, 'labels': Label, 'instance_groups': InstanceGroup} - def has_credentials_access(self, obj): - # user has access if no related credentials exist that the user lacks use role for - return not self._unusable_creds_exist(obj.credentials) + def _related_filtered_queryset(self, cls): + if cls is Label: + return LabelAccess(self.user).filtered_queryset() + elif cls is InstanceGroup: + return InstanceGroupAccess(self.user).filtered_queryset() + else: + return cls._accessible_pk_qs(cls, self.user, 'use_role') + + def has_obj_m2m_access(self, obj): + for relationship, cls in self.M2M_CHECKS.items(): + if getattr(obj, relationship).exclude(pk__in=self._related_filtered_queryset(cls)).exists(): + return False + return True @check_superuser def can_add(self, data, template=None): # This is a special case, we don't check related many-to-many elsewhere # launch RBAC checks use this - permission_error = False - if 'credentials' in data and data['credentials'] or 'reference_obj' in data: - if 'reference_obj' in data: - prompted_cred_qs = data['reference_obj'].credentials.all() - else: - # If given model objects, only use the primary key from them - cred_pks = [cred.pk for cred in data['credentials']] - if template: - for cred in template.credentials.all(): - if cred.pk in cred_pks: - cred_pks.remove(cred.pk) - prompted_cred_qs = Credential.objects.filter(pk__in=cred_pks) - if self._unusable_creds_exist(prompted_cred_qs): - credential_names = [cred.name for cred in prompted_cred_qs] - logger.debug("User {} not allowed to access credentials in {}".format(self.user.username, credential_names)) - permission_error = True - if 'execution_environment' in data and data['execution_environment'] or 'reference_obj' in data: - if 'reference_obj' in data: - ee = data['reference_obj'].execution_environment - else: - ee = data['execution_environment'] - if ee and not self.user.can_access(ExecutionEnvironment, 'read', ee): - if not template or ee != template.execution_environment: - logger.debug("User {} not allowed access to ee {}".format(self.user.username, ee.name)) - permission_error = True - else: - logger.debug( - "User {} does not have permissions to execution_environment {} but its part of the template".format(self.user.username, ee.name) - ) - if 'labels' in data and data['labels'] or 'reference_obj' in data: - if 'reference_obj' in data: - labels = data['reference_obj'].labels.all() - else: - labels = data['labels'] - for a_label in labels: - if not self.user.can_access(Label, 'read', a_label): - # This if allows a template admin who can see labels to specify a list and the executor to select a subset of the list - if not template or a_label not in template.labels.all(): - logger.debug("User {} not allowed access to label {}".format(self.user.username, a_label.name)) - permission_error = True - else: - logger.debug("User {} does not have permissions to label {} but its part of the template".format(self.user.username, a_label.name)) - if 'instance_groups' in data and data['instance_groups'] or 'reference_obj' in data: - if 'reference_obj' in data: - instance_groups = data['reference_obj'].labels.all() - else: - instance_groups = data['instance_groups'] - for an_ig in instance_groups: - if not an_ig in self.user.get_queryset(InstanceGroup): - # This if allows a template admin who can see IGs to specify a list and the executor to select a subset of the list - if not template or an_ig not in template.instance_groups.all(): - logger.debug("user {} not allowed access to instance group {}".format(self.user.username, an_ig.name)) - permission_error = True - else: - logger.debug( - "User {} does not have permissions to instance_group {} but its part of the template".format(self.user.username, an_ig.name) - ) - if permission_error: - return False - return self.check_related('inventory', Inventory, data, role_field='use_role') + if 'reference_obj' in data: + if not self.has_obj_m2m_access(data['reference_obj']): + return False + else: + for relationship, cls in self.M2M_CHECKS.items(): + if relationship in data and data[relationship]: + # If given model objects, only use the primary key from them + sub_obj_pks = [sub_obj.pk for sub_obj in data[relationship]] + if template: + for sub_obj in getattr(template, relationship).all(): + if sub_obj.pk in sub_obj_pks: + sub_obj_pks.remove(sub_obj.pk) + if cls.objects.filter(pk__in=sub_obj_pks).exclude(pk__in=self._related_filtered_queryset(cls)).exists(): + return False + return self.check_related('inventory', Inventory, data, role_field='use_role') and self.check_related( + 'execution_environment', ExecutionEnvironment, data, role_field='read_role' + ) @check_superuser def can_use(self, obj): - inventory_check = self.check_related('inventory', Inventory, {}, obj=obj, role_field='use_role', mandatory=True) - return inventory_check and self.has_credentials_access(obj) + return ( + self.has_obj_m2m_access(obj) + and self.check_related('inventory', Inventory, {}, obj=obj, role_field='use_role', mandatory=True) + and self.check_related('execution_environment', ExecutionEnvironment, {}, obj=obj, role_field='read_role') + ) def can_change(self, obj, data): - return self.check_related('inventory', Inventory, data, obj=obj, role_field='use_role') - - def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - try: - obj_name = obj.name - except AttributeError: - obj_name = obj.identifier - - if isinstance(sub_obj, Credential) and relationship == 'credentials': - if not self.user in sub_obj.use_role: - logger.debug( - "User {} not allowed access to credential {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id) - ) - return False - return True - - if isinstance(sub_obj, Label) and relationship == 'labels': - if not self.user.can_access(Label, 'read', sub_obj): - logger.debug("User {} not allowed access to label {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id)) - return False - return True - - if isinstance(sub_obj, InstanceGroup) and relationship == 'instance_groups': - if not sub_obj in self.user.get_queryset(InstanceGroup): - logger.debug( - "User {} not allowed access to instance_group {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id) - ) - return False - return True - - raise NotImplementedError('Only credentials, labels and instance groups can be attached to launch configurations.') - - def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - try: - obj_name = obj.name - except AttributeError: - obj_name = obj.identifier - - if isinstance(sub_obj, Credential) and relationship == 'credentials': - if not skip_sub_obj_read_check: - logger.debug( - "Skipping check if user {} can access credential {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return True - if not self.user in sub_obj.read_role: - logger.debug( - "User {} can not read credential {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return False - return True - if isinstance(sub_obj, Label) and relationship == 'labels': - if skip_sub_obj_read_check: - logger.debug( - "Skipping check if user {} can access label {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return True - if self.user.can_access(Label, 'read', sub_obj): - return True - logger.debug( - "User {} can not read label {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return False - if isinstance(sub_obj, InstanceGroup) and relationship == 'instance_groups': - if skip_sub_obj_read_check: - logger.debug( - "Skipping check if user {} can access instance_group {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return True - if sub_obj in self.user.get_queryset(InstanceGroup): - return True - logger.debug( - "User {} can not read instance_group {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return False - - raise NotImplementedError('Only credentials, labels and instance groups can be attached to launch configurations.') + return self.check_related('inventory', Inventory, data, obj=obj, role_field='use_role') and self.check_related( + 'execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role' + ) -class WorkflowJobTemplateNodeAccess(BaseAccess): +class WorkflowJobTemplateNodeAccess(UnifiedCredentialsMixin, BaseAccess): """ I can see/use a WorkflowJobTemplateNode if I have read permission to associated Workflow Job Template @@ -2033,7 +1925,7 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): """ model = WorkflowJobTemplateNode - prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'credentials', 'workflow_job_template') + prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'workflow_job_template') def filtered_queryset(self): return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.accessible_objects(self.user, 'read_role')) @@ -2045,7 +1937,8 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): return ( self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True) and self.check_related('unified_job_template', UnifiedJobTemplate, data, role_field='execute_role') - and JobLaunchConfigAccess(self.user).can_add(data) + and self.check_related('inventory', Inventory, data, role_field='use_role') + and self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role') ) def wfjt_admin(self, obj): @@ -2054,17 +1947,14 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): else: return self.user in obj.workflow_job_template.admin_role - def ujt_execute(self, obj): + def ujt_execute(self, obj, data=None): if not obj.unified_job_template: return True - return self.check_related('unified_job_template', UnifiedJobTemplate, {}, obj=obj, role_field='execute_role', mandatory=True) + return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, role_field='execute_role', mandatory=True) def can_change(self, obj, data): - if not data: - return True - # should not be able to edit the prompts if lacking access to UJT or WFJT - return self.ujt_execute(obj) and self.wfjt_admin(obj) and JobLaunchConfigAccess(self.user).can_change(obj, data) + return self.ujt_execute(obj, data=data) and self.wfjt_admin(obj) and JobLaunchConfigAccess(self.user).can_change(obj, data) def can_delete(self, obj): return self.wfjt_admin(obj) @@ -2077,29 +1967,14 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): return True def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - if not self.wfjt_admin(obj): - return False - if relationship in ['credentials', 'labels', 'instance_groups']: - # Need permission to related template to attach a credential - if not self.ujt_execute(obj): - return False - return JobLaunchConfigAccess(self.user).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - elif relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): - return self.check_same_WFJT(obj, sub_obj) - else: - raise NotImplementedError('Relationship {} not understood for WFJT nodes.'.format(relationship)) + if relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): + return self.wfjt_admin(obj) and self.check_same_WFJT(obj, sub_obj) + return super().can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - if not self.wfjt_admin(obj): - return False - if relationship in ['credentials', 'labels', 'instance_groups']: - if not self.ujt_execute(obj): - return False - return JobLaunchConfigAccess(self.user).can_unattach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - elif relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): - return self.check_same_WFJT(obj, sub_obj) - else: - raise NotImplementedError('Relationship {} not understood for WFJT nodes.'.format(relationship)) + def can_unattach(self, obj, sub_obj, relationship, data=None): + if relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): + return self.wfjt_admin(obj) + return super().can_unattach(obj, sub_obj, relationship, data=None) class WorkflowJobNodeAccess(BaseAccess): @@ -2174,13 +2049,10 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'workflow_admin_role').exists() - if data.get('execution_environment'): - ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) - if not self.user.can_access(ExecutionEnvironment, 'read', ee): - return False - - return self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True) and self.check_related( - 'inventory', Inventory, data, role_field='use_role' + return bool( + self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True) + and self.check_related('inventory', Inventory, data, role_field='use_role') + and self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role') ) def can_copy(self, obj): @@ -2226,14 +2098,10 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess): if self.user.is_superuser: return True - if data and data.get('execution_environment'): - ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) - if not self.user.can_access(ExecutionEnvironment, 'read', ee): - return False - return ( self.check_related('organization', Organization, data, role_field='workflow_admin_role', obj=obj) and self.check_related('inventory', Inventory, data, role_field='use_role', obj=obj) + and self.check_related('execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role') and self.user in obj.admin_role ) @@ -2640,7 +2508,7 @@ class UnifiedJobAccess(BaseAccess): return super(UnifiedJobAccess, self).get_queryset().filter(workflowapproval__isnull=True) -class ScheduleAccess(BaseAccess): +class ScheduleAccess(UnifiedCredentialsMixin, BaseAccess): """ I can see a schedule if I can see it's related unified job, I can create them or update them if I have write access """ @@ -2681,12 +2549,6 @@ class ScheduleAccess(BaseAccess): def can_delete(self, obj): return self.can_change(obj, {}) - def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - return JobLaunchConfigAccess(self.user).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - - def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - return JobLaunchConfigAccess(self.user).can_unattach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - class NotificationTemplateAccess(BaseAccess): """ diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index fbfb0c32f4..19ce50d986 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -382,7 +382,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn unified_job.survey_passwords = new_job_passwords kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch - if 'instance_groups' in kwargs: + if kwargs.get('instance_groups'): unified_job.preferred_instance_groups_cache = [ig.id for ig in kwargs['instance_groups']] else: unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache() diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 9bc0e3408d..2355b04039 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -218,7 +218,7 @@ class WorkflowJobTemplateNode(WorkflowNodeBase): approval_template = WorkflowApprovalTemplate(**kwargs) approval_template.save() self.unified_job_template = approval_template - self.save() + self.save(update_fields=['unified_job_template']) return approval_template diff --git a/awx/main/tests/functional/api/test_workflow_node.py b/awx/main/tests/functional/api/test_workflow_node.py index 0b89dfb546..71874085d7 100644 --- a/awx/main/tests/functional/api/test_workflow_node.py +++ b/awx/main/tests/functional/api/test_workflow_node.py @@ -77,6 +77,18 @@ class TestApprovalNodes: assert approval_node.unified_job_template.description == 'Approval Node' assert approval_node.unified_job_template.timeout == 0 + def test_approval_node_creation_with_timeout(self, post, approval_node, admin_user): + assert approval_node.timeout is None + + url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'}) + post(url, {'name': 'Test', 'description': 'Approval Node', 'timeout': 10}, user=admin_user, expect=201) + + approval_node = WorkflowJobTemplateNode.objects.get(pk=approval_node.pk) + approval_node.refresh_from_db() + assert approval_node.timeout is None + assert isinstance(approval_node.unified_job_template, WorkflowApprovalTemplate) + assert approval_node.unified_job_template.timeout == 10 + def test_approval_node_creation_failure(self, post, approval_node, admin_user): # This test leaves off a required param to assert that user will get a 400. url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'}) diff --git a/awx/main/tests/functional/test_rbac_job.py b/awx/main/tests/functional/test_rbac_job.py index 9580c9eacb..4f17aab45d 100644 --- a/awx/main/tests/functional/test_rbac_job.py +++ b/awx/main/tests/functional/test_rbac_job.py @@ -315,13 +315,13 @@ class TestLaunchConfigAccess: access = JobLaunchConfigAccess(rando) cred1, cred2 = self._make_two_credentials(credentialtype_ssh) - assert access.has_credentials_access(config) # has access if 0 creds + assert access.has_obj_m2m_access(config) # has access if 0 creds config.credentials.add(cred1, cred2) - assert not access.has_credentials_access(config) # lacks access to both + assert not access.has_obj_m2m_access(config) # lacks access to both cred1.use_role.members.add(rando) - assert not access.has_credentials_access(config) # lacks access to 1 + assert not access.has_obj_m2m_access(config) # lacks access to 1 cred2.use_role.members.add(rando) - assert access.has_credentials_access(config) # has access to both + assert access.has_obj_m2m_access(config) # has access to both def test_new_execution_environment_access(self, rando): ee = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar') diff --git a/awx/main/tests/functional/test_rbac_workflow.py b/awx/main/tests/functional/test_rbac_workflow.py index 15577b65a4..4c29907519 100644 --- a/awx/main/tests/functional/test_rbac_workflow.py +++ b/awx/main/tests/functional/test_rbac_workflow.py @@ -6,6 +6,7 @@ from awx.main.access import ( WorkflowJobAccess, # WorkflowJobNodeAccess ) +from awx.main.models import JobTemplate, WorkflowJobTemplateNode from rest_framework.exceptions import PermissionDenied @@ -87,6 +88,16 @@ class TestWorkflowJobTemplateNodeAccess: job_template.read_role.members.add(rando) assert not access.can_add({'workflow_job_template': wfjt, 'unified_job_template': job_template}) + def test_change_JT_no_start_perm(self, wfjt, rando): + wfjt.admin_role.members.add(rando) + access = WorkflowJobTemplateNodeAccess(rando) + jt1 = JobTemplate.objects.create() + jt1.execute_role.members.add(rando) + assert access.can_add({'workflow_job_template': wfjt, 'unified_job_template': jt1}) + node = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=jt1) + jt2 = JobTemplate.objects.create() + assert not access.can_change(node, {'unified_job_template': jt2.id}) + def test_add_node_with_minimum_permissions(self, wfjt, job_template, inventory, rando): wfjt.admin_role.members.add(rando) access = WorkflowJobTemplateNodeAccess(rando) @@ -104,14 +115,12 @@ class TestWorkflowJobTemplateNodeAccess: @pytest.mark.parametrize( "add_wfjt_admin, add_jt_admin, permission_type, expected_result, method_type", [ - (False, False, None, False, 'can_attach'), (True, False, 'credentials', False, 'can_attach'), (True, True, 'credentials', True, 'can_attach'), (True, False, 'labels', False, 'can_attach'), (True, True, 'labels', True, 'can_attach'), (True, False, 'instance_groups', False, 'can_attach'), (True, True, 'instance_groups', True, 'can_attach'), - (False, False, None, False, 'can_unattach'), (True, False, 'credentials', False, 'can_unattach'), (True, True, 'credentials', True, 'can_unattach'), (True, False, 'labels', False, 'can_unattach'), @@ -128,11 +137,25 @@ class TestWorkflowJobTemplateNodeAccess: if add_jt_admin: job_template.execute_role.members.add(rando) - # We have to mock the JobLaunchConfigAccess because the attachment methods will look at the object type and the relation - # Since we pass None as the second param this will trigger an NotImplementedError from that object - with mocker.patch('awx.main.access.JobLaunchConfigAccess.{}'.format(method_type), return_value=True): - access = WorkflowJobTemplateNodeAccess(rando) - assert getattr(access, method_type)(wfjt_node, None, permission_type, None) == expected_result + from awx.main.models import Credential, Label, InstanceGroup, Organization, CredentialType + + if permission_type == 'credentials': + sub_obj = Credential.objects.create(credential_type=CredentialType.objects.create()) + sub_obj.use_role.members.add(rando) + elif permission_type == 'labels': + sub_obj = Label.objects.create(organization=Organization.objects.create()) + sub_obj.organization.member_role.members.add(rando) + elif permission_type == 'instance_groups': + sub_obj = InstanceGroup.objects.create() + org = Organization.objects.create() + org.admin_role.members.add(rando) # only admins can see IGs + org.instance_groups.add(sub_obj) + + access = WorkflowJobTemplateNodeAccess(rando) + if method_type == 'can_unattach': + assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type) == expected_result + else: + assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type, {}) == expected_result # The actual attachment of labels, credentials and instance groups are tested from JobLaunchConfigAccess