From 34e8087aeef0de19642e7dd9cd076adcdf5fbe9c Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 12 Sep 2022 22:06:10 -0400 Subject: [PATCH] DRY edits to access classes for new prompts Remove if-not-data conditional from WFJTnode.can_change these are cannonical for can_add, but this looks like a bug Change JTaccess.can_unattach to call same method in super() previously called can_attach, which is problematic Better consolidate launch config m2m related checks Test and fix pre-existing WFJT node RBAC bug recognize not-provided instance group list on launch, avoiding bug where it fell back to default fix bug where timeout field was saved on WFJT nodes after creating approval node remove labels from schedule serializer summary_fields remove unnecessary prefetch of credentials from WFJT node queryset --- awx/api/serializers.py | 2 +- awx/main/access.py | 332 +++++------------- awx/main/models/unified_jobs.py | 2 +- awx/main/models/workflow.py | 2 +- .../functional/api/test_workflow_node.py | 12 + awx/main/tests/functional/test_rbac_job.py | 8 +- .../tests/functional/test_rbac_workflow.py | 37 +- 7 files changed, 146 insertions(+), 249 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 4dd6d8182f..5592540efc 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4794,7 +4794,7 @@ class SchedulePreviewSerializer(BaseSerializer): return value -class ScheduleSerializer(LabelsListMixin, LaunchConfigurationBaseSerializer, SchedulePreviewSerializer): +class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSerializer): show_capabilities = ['edit', 'delete'] timezone = serializers.SerializerMethodField( diff --git a/awx/main/access.py b/awx/main/access.py index 81a1d4372f..e8deea8f36 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -12,7 +12,7 @@ from django.conf import settings from django.db.models import Q, Prefetch from django.contrib.auth.models import User from django.utils.translation import gettext_lazy as _ -from django.core.exceptions import ObjectDoesNotExist +from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist # Django REST Framework from rest_framework.exceptions import ParseError, PermissionDenied @@ -281,13 +281,23 @@ class BaseAccess(object): """ return True + def assure_relationship_exists(self, obj, relationship): + if '.' in relationship: + return # not attempting validation for complex relationships now + try: + obj._meta.get_field(relationship) + except FieldDoesNotExist: + raise NotImplementedError(f'The relationship {relationship} does not exist for model {type(obj)}') + def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): + self.assure_relationship_exists(obj, relationship) if skip_sub_obj_read_check: return self.can_change(obj, None) else: return bool(self.can_change(obj, None) and self.user.can_access(type(sub_obj), 'read', sub_obj)) def can_unattach(self, obj, sub_obj, relationship, data=None): + self.assure_relationship_exists(obj, relationship) return self.can_change(obj, data) def check_related(self, field, Model, data, role_field='admin_role', obj=None, mandatory=False): @@ -328,6 +338,8 @@ class BaseAccess(object): role = getattr(resource, role_field, None) if role is None: # Handle special case where resource does not have direct roles + if role_field == 'read_role': + return self.user.can_access(type(resource), 'read', resource) access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] return self.user.can_access(type(resource), access_method_type, resource, None) return self.user in role @@ -499,6 +511,21 @@ class BaseAccess(object): return False +class UnifiedCredentialsMixin(BaseAccess): + """ + The credentials many-to-many is a standard relationship for JT, jobs, and others + Permission to attach is always use permission, and permission to unattach is admin to the parent object + """ + + @check_superuser + def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): + if relationship == 'credentials': + if not isinstance(sub_obj, Credential): + raise RuntimeError(f'Can only attach credentials to credentials relationship, got {type(sub_obj)}') + return self.can_change(obj, None) and (self.user in sub_obj.use_role) + return super().can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) + + class NotificationAttachMixin(BaseAccess): """For models that can have notifications attached @@ -1031,7 +1058,7 @@ class GroupAccess(BaseAccess): return bool(obj and self.user in obj.inventory.admin_role) -class InventorySourceAccess(NotificationAttachMixin, BaseAccess): +class InventorySourceAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAccess): """ I can see inventory sources whenever I can see their inventory. I can change inventory sources whenever I can change their inventory. @@ -1075,18 +1102,6 @@ class InventorySourceAccess(NotificationAttachMixin, BaseAccess): return self.user in obj.inventory.update_role return False - @check_superuser - def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return obj and obj.inventory and self.user in obj.inventory.admin_role and self.user in sub_obj.use_role - return super(InventorySourceAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - - @check_superuser - def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return obj and obj.inventory and self.user in obj.inventory.admin_role - return super(InventorySourceAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) - class InventoryUpdateAccess(BaseAccess): """ @@ -1485,7 +1500,7 @@ class ProjectUpdateAccess(BaseAccess): return obj and self.user in obj.project.admin_role -class JobTemplateAccess(NotificationAttachMixin, BaseAccess): +class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAccess): """ I can see job templates when: - I have read role for the job template. @@ -1549,8 +1564,7 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess): if self.user not in inventory.use_role: return False - ee = get_value(ExecutionEnvironment, 'execution_environment') - if ee and not self.user.can_access(ExecutionEnvironment, 'read', ee): + if not self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role'): return False project = get_value(Project, 'project') @@ -1600,10 +1614,8 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess): if self.changes_are_non_sensitive(obj, data): return True - if data.get('execution_environment'): - ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) - if not self.user.can_access(ExecutionEnvironment, 'read', ee): - return False + if not self.check_related('execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role'): + return False for required_field, cls in (('inventory', Inventory), ('project', Project)): is_mandatory = True @@ -1667,17 +1679,13 @@ class JobTemplateAccess(NotificationAttachMixin, BaseAccess): if not obj.organization: return False return self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.organization.admin_role - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return self.user in obj.admin_role and self.user in sub_obj.use_role return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) @check_superuser def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): if relationship == "instance_groups": return self.can_attach(obj, sub_obj, relationship, *args, **kwargs) - if relationship == 'credentials' and isinstance(sub_obj, Credential): - return self.user in obj.admin_role - return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) + return super(JobTemplateAccess, self).can_unattach(obj, sub_obj, relationship, *args, **kwargs) class JobAccess(BaseAccess): @@ -1824,7 +1832,7 @@ class SystemJobAccess(BaseAccess): return False # no relaunching of system jobs -class JobLaunchConfigAccess(BaseAccess): +class JobLaunchConfigAccess(UnifiedCredentialsMixin, BaseAccess): """ Launch configs must have permissions checked for - relaunching @@ -1832,185 +1840,69 @@ class JobLaunchConfigAccess(BaseAccess): In order to create a new object with a copy of this launch config, I need: - use access to related inventory (if present) + - read access to Execution Environment (if present), unless the specified ee is already in the template - use role to many-related credentials (if any present) - - use role to Execution Environment (if present), unless the specified ee is already in the template - - use role to many-related labels (if any present), unless the specified label is already in the template - - use role to many-related instance groups (if any present), unless the specified instance group is already in the template + - read access to many-related labels (if any present), unless the specified label is already in the template + - read access to many-related instance groups (if any present), unless the specified instance group is already in the template """ model = JobLaunchConfig select_related = 'job' prefetch_related = ('credentials', 'inventory') - def _unusable_creds_exist(self, qs): - return qs.exclude(pk__in=Credential._accessible_pk_qs(Credential, self.user, 'use_role')).exists() + M2M_CHECKS = {'credentials': Credential, 'labels': Label, 'instance_groups': InstanceGroup} - def has_credentials_access(self, obj): - # user has access if no related credentials exist that the user lacks use role for - return not self._unusable_creds_exist(obj.credentials) + def _related_filtered_queryset(self, cls): + if cls is Label: + return LabelAccess(self.user).filtered_queryset() + elif cls is InstanceGroup: + return InstanceGroupAccess(self.user).filtered_queryset() + else: + return cls._accessible_pk_qs(cls, self.user, 'use_role') + + def has_obj_m2m_access(self, obj): + for relationship, cls in self.M2M_CHECKS.items(): + if getattr(obj, relationship).exclude(pk__in=self._related_filtered_queryset(cls)).exists(): + return False + return True @check_superuser def can_add(self, data, template=None): # This is a special case, we don't check related many-to-many elsewhere # launch RBAC checks use this - permission_error = False - if 'credentials' in data and data['credentials'] or 'reference_obj' in data: - if 'reference_obj' in data: - prompted_cred_qs = data['reference_obj'].credentials.all() - else: - # If given model objects, only use the primary key from them - cred_pks = [cred.pk for cred in data['credentials']] - if template: - for cred in template.credentials.all(): - if cred.pk in cred_pks: - cred_pks.remove(cred.pk) - prompted_cred_qs = Credential.objects.filter(pk__in=cred_pks) - if self._unusable_creds_exist(prompted_cred_qs): - credential_names = [cred.name for cred in prompted_cred_qs] - logger.debug("User {} not allowed to access credentials in {}".format(self.user.username, credential_names)) - permission_error = True - if 'execution_environment' in data and data['execution_environment'] or 'reference_obj' in data: - if 'reference_obj' in data: - ee = data['reference_obj'].execution_environment - else: - ee = data['execution_environment'] - if ee and not self.user.can_access(ExecutionEnvironment, 'read', ee): - if not template or ee != template.execution_environment: - logger.debug("User {} not allowed access to ee {}".format(self.user.username, ee.name)) - permission_error = True - else: - logger.debug( - "User {} does not have permissions to execution_environment {} but its part of the template".format(self.user.username, ee.name) - ) - if 'labels' in data and data['labels'] or 'reference_obj' in data: - if 'reference_obj' in data: - labels = data['reference_obj'].labels.all() - else: - labels = data['labels'] - for a_label in labels: - if not self.user.can_access(Label, 'read', a_label): - # This if allows a template admin who can see labels to specify a list and the executor to select a subset of the list - if not template or a_label not in template.labels.all(): - logger.debug("User {} not allowed access to label {}".format(self.user.username, a_label.name)) - permission_error = True - else: - logger.debug("User {} does not have permissions to label {} but its part of the template".format(self.user.username, a_label.name)) - if 'instance_groups' in data and data['instance_groups'] or 'reference_obj' in data: - if 'reference_obj' in data: - instance_groups = data['reference_obj'].labels.all() - else: - instance_groups = data['instance_groups'] - for an_ig in instance_groups: - if not an_ig in self.user.get_queryset(InstanceGroup): - # This if allows a template admin who can see IGs to specify a list and the executor to select a subset of the list - if not template or an_ig not in template.instance_groups.all(): - logger.debug("user {} not allowed access to instance group {}".format(self.user.username, an_ig.name)) - permission_error = True - else: - logger.debug( - "User {} does not have permissions to instance_group {} but its part of the template".format(self.user.username, an_ig.name) - ) - if permission_error: - return False - return self.check_related('inventory', Inventory, data, role_field='use_role') + if 'reference_obj' in data: + if not self.has_obj_m2m_access(data['reference_obj']): + return False + else: + for relationship, cls in self.M2M_CHECKS.items(): + if relationship in data and data[relationship]: + # If given model objects, only use the primary key from them + sub_obj_pks = [sub_obj.pk for sub_obj in data[relationship]] + if template: + for sub_obj in getattr(template, relationship).all(): + if sub_obj.pk in sub_obj_pks: + sub_obj_pks.remove(sub_obj.pk) + if cls.objects.filter(pk__in=sub_obj_pks).exclude(pk__in=self._related_filtered_queryset(cls)).exists(): + return False + return self.check_related('inventory', Inventory, data, role_field='use_role') and self.check_related( + 'execution_environment', ExecutionEnvironment, data, role_field='read_role' + ) @check_superuser def can_use(self, obj): - inventory_check = self.check_related('inventory', Inventory, {}, obj=obj, role_field='use_role', mandatory=True) - return inventory_check and self.has_credentials_access(obj) + return ( + self.has_obj_m2m_access(obj) + and self.check_related('inventory', Inventory, {}, obj=obj, role_field='use_role', mandatory=True) + and self.check_related('execution_environment', ExecutionEnvironment, {}, obj=obj, role_field='read_role') + ) def can_change(self, obj, data): - return self.check_related('inventory', Inventory, data, obj=obj, role_field='use_role') - - def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - try: - obj_name = obj.name - except AttributeError: - obj_name = obj.identifier - - if isinstance(sub_obj, Credential) and relationship == 'credentials': - if not self.user in sub_obj.use_role: - logger.debug( - "User {} not allowed access to credential {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id) - ) - return False - return True - - if isinstance(sub_obj, Label) and relationship == 'labels': - if not self.user.can_access(Label, 'read', sub_obj): - logger.debug("User {} not allowed access to label {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id)) - return False - return True - - if isinstance(sub_obj, InstanceGroup) and relationship == 'instance_groups': - if not sub_obj in self.user.get_queryset(InstanceGroup): - logger.debug( - "User {} not allowed access to instance_group {} for {} {} ({})".format(self.user.username, sub_obj.name, obj.__class__, obj_name, obj.id) - ) - return False - return True - - raise NotImplementedError('Only credentials, labels and instance groups can be attached to launch configurations.') - - def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - try: - obj_name = obj.name - except AttributeError: - obj_name = obj.identifier - - if isinstance(sub_obj, Credential) and relationship == 'credentials': - if not skip_sub_obj_read_check: - logger.debug( - "Skipping check if user {} can access credential {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return True - if not self.user in sub_obj.read_role: - logger.debug( - "User {} can not read credential {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return False - return True - if isinstance(sub_obj, Label) and relationship == 'labels': - if skip_sub_obj_read_check: - logger.debug( - "Skipping check if user {} can access label {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return True - if self.user.can_access(Label, 'read', sub_obj): - return True - logger.debug( - "User {} can not read label {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return False - if isinstance(sub_obj, InstanceGroup) and relationship == 'instance_groups': - if skip_sub_obj_read_check: - logger.debug( - "Skipping check if user {} can access instance_group {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return True - if sub_obj in self.user.get_queryset(InstanceGroup): - return True - logger.debug( - "User {} can not read instance_group {} ({}) for removal from {} {} ({})".format( - self.user.username, sub_obj.name, sub_obj.id, obj.__class__, obj_name, obj.id - ) - ) - return False - - raise NotImplementedError('Only credentials, labels and instance groups can be attached to launch configurations.') + return self.check_related('inventory', Inventory, data, obj=obj, role_field='use_role') and self.check_related( + 'execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role' + ) -class WorkflowJobTemplateNodeAccess(BaseAccess): +class WorkflowJobTemplateNodeAccess(UnifiedCredentialsMixin, BaseAccess): """ I can see/use a WorkflowJobTemplateNode if I have read permission to associated Workflow Job Template @@ -2033,7 +1925,7 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): """ model = WorkflowJobTemplateNode - prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'credentials', 'workflow_job_template') + prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'workflow_job_template') def filtered_queryset(self): return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.accessible_objects(self.user, 'read_role')) @@ -2045,7 +1937,8 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): return ( self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True) and self.check_related('unified_job_template', UnifiedJobTemplate, data, role_field='execute_role') - and JobLaunchConfigAccess(self.user).can_add(data) + and self.check_related('inventory', Inventory, data, role_field='use_role') + and self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role') ) def wfjt_admin(self, obj): @@ -2054,17 +1947,14 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): else: return self.user in obj.workflow_job_template.admin_role - def ujt_execute(self, obj): + def ujt_execute(self, obj, data=None): if not obj.unified_job_template: return True - return self.check_related('unified_job_template', UnifiedJobTemplate, {}, obj=obj, role_field='execute_role', mandatory=True) + return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, role_field='execute_role', mandatory=True) def can_change(self, obj, data): - if not data: - return True - # should not be able to edit the prompts if lacking access to UJT or WFJT - return self.ujt_execute(obj) and self.wfjt_admin(obj) and JobLaunchConfigAccess(self.user).can_change(obj, data) + return self.ujt_execute(obj, data=data) and self.wfjt_admin(obj) and JobLaunchConfigAccess(self.user).can_change(obj, data) def can_delete(self, obj): return self.wfjt_admin(obj) @@ -2077,29 +1967,14 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): return True def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - if not self.wfjt_admin(obj): - return False - if relationship in ['credentials', 'labels', 'instance_groups']: - # Need permission to related template to attach a credential - if not self.ujt_execute(obj): - return False - return JobLaunchConfigAccess(self.user).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - elif relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): - return self.check_same_WFJT(obj, sub_obj) - else: - raise NotImplementedError('Relationship {} not understood for WFJT nodes.'.format(relationship)) + if relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): + return self.wfjt_admin(obj) and self.check_same_WFJT(obj, sub_obj) + return super().can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - if not self.wfjt_admin(obj): - return False - if relationship in ['credentials', 'labels', 'instance_groups']: - if not self.ujt_execute(obj): - return False - return JobLaunchConfigAccess(self.user).can_unattach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - elif relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): - return self.check_same_WFJT(obj, sub_obj) - else: - raise NotImplementedError('Relationship {} not understood for WFJT nodes.'.format(relationship)) + def can_unattach(self, obj, sub_obj, relationship, data=None): + if relationship in ('success_nodes', 'failure_nodes', 'always_nodes'): + return self.wfjt_admin(obj) + return super().can_unattach(obj, sub_obj, relationship, data=None) class WorkflowJobNodeAccess(BaseAccess): @@ -2174,13 +2049,10 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'workflow_admin_role').exists() - if data.get('execution_environment'): - ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) - if not self.user.can_access(ExecutionEnvironment, 'read', ee): - return False - - return self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True) and self.check_related( - 'inventory', Inventory, data, role_field='use_role' + return bool( + self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True) + and self.check_related('inventory', Inventory, data, role_field='use_role') + and self.check_related('execution_environment', ExecutionEnvironment, data, role_field='read_role') ) def can_copy(self, obj): @@ -2226,14 +2098,10 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess): if self.user.is_superuser: return True - if data and data.get('execution_environment'): - ee = get_object_from_data('execution_environment', ExecutionEnvironment, data) - if not self.user.can_access(ExecutionEnvironment, 'read', ee): - return False - return ( self.check_related('organization', Organization, data, role_field='workflow_admin_role', obj=obj) and self.check_related('inventory', Inventory, data, role_field='use_role', obj=obj) + and self.check_related('execution_environment', ExecutionEnvironment, data, obj=obj, role_field='read_role') and self.user in obj.admin_role ) @@ -2640,7 +2508,7 @@ class UnifiedJobAccess(BaseAccess): return super(UnifiedJobAccess, self).get_queryset().filter(workflowapproval__isnull=True) -class ScheduleAccess(BaseAccess): +class ScheduleAccess(UnifiedCredentialsMixin, BaseAccess): """ I can see a schedule if I can see it's related unified job, I can create them or update them if I have write access """ @@ -2681,12 +2549,6 @@ class ScheduleAccess(BaseAccess): def can_delete(self, obj): return self.can_change(obj, {}) - def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - return JobLaunchConfigAccess(self.user).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - - def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): - return JobLaunchConfigAccess(self.user).can_unattach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) - class NotificationTemplateAccess(BaseAccess): """ diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index fbfb0c32f4..19ce50d986 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -382,7 +382,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn unified_job.survey_passwords = new_job_passwords kwargs['survey_passwords'] = new_job_passwords # saved in config object for relaunch - if 'instance_groups' in kwargs: + if kwargs.get('instance_groups'): unified_job.preferred_instance_groups_cache = [ig.id for ig in kwargs['instance_groups']] else: unified_job.preferred_instance_groups_cache = unified_job._get_preferred_instance_group_cache() diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 9bc0e3408d..2355b04039 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -218,7 +218,7 @@ class WorkflowJobTemplateNode(WorkflowNodeBase): approval_template = WorkflowApprovalTemplate(**kwargs) approval_template.save() self.unified_job_template = approval_template - self.save() + self.save(update_fields=['unified_job_template']) return approval_template diff --git a/awx/main/tests/functional/api/test_workflow_node.py b/awx/main/tests/functional/api/test_workflow_node.py index 0b89dfb546..71874085d7 100644 --- a/awx/main/tests/functional/api/test_workflow_node.py +++ b/awx/main/tests/functional/api/test_workflow_node.py @@ -77,6 +77,18 @@ class TestApprovalNodes: assert approval_node.unified_job_template.description == 'Approval Node' assert approval_node.unified_job_template.timeout == 0 + def test_approval_node_creation_with_timeout(self, post, approval_node, admin_user): + assert approval_node.timeout is None + + url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'}) + post(url, {'name': 'Test', 'description': 'Approval Node', 'timeout': 10}, user=admin_user, expect=201) + + approval_node = WorkflowJobTemplateNode.objects.get(pk=approval_node.pk) + approval_node.refresh_from_db() + assert approval_node.timeout is None + assert isinstance(approval_node.unified_job_template, WorkflowApprovalTemplate) + assert approval_node.unified_job_template.timeout == 10 + def test_approval_node_creation_failure(self, post, approval_node, admin_user): # This test leaves off a required param to assert that user will get a 400. url = reverse('api:workflow_job_template_node_create_approval', kwargs={'pk': approval_node.pk, 'version': 'v2'}) diff --git a/awx/main/tests/functional/test_rbac_job.py b/awx/main/tests/functional/test_rbac_job.py index 9580c9eacb..4f17aab45d 100644 --- a/awx/main/tests/functional/test_rbac_job.py +++ b/awx/main/tests/functional/test_rbac_job.py @@ -315,13 +315,13 @@ class TestLaunchConfigAccess: access = JobLaunchConfigAccess(rando) cred1, cred2 = self._make_two_credentials(credentialtype_ssh) - assert access.has_credentials_access(config) # has access if 0 creds + assert access.has_obj_m2m_access(config) # has access if 0 creds config.credentials.add(cred1, cred2) - assert not access.has_credentials_access(config) # lacks access to both + assert not access.has_obj_m2m_access(config) # lacks access to both cred1.use_role.members.add(rando) - assert not access.has_credentials_access(config) # lacks access to 1 + assert not access.has_obj_m2m_access(config) # lacks access to 1 cred2.use_role.members.add(rando) - assert access.has_credentials_access(config) # has access to both + assert access.has_obj_m2m_access(config) # has access to both def test_new_execution_environment_access(self, rando): ee = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar') diff --git a/awx/main/tests/functional/test_rbac_workflow.py b/awx/main/tests/functional/test_rbac_workflow.py index 15577b65a4..4c29907519 100644 --- a/awx/main/tests/functional/test_rbac_workflow.py +++ b/awx/main/tests/functional/test_rbac_workflow.py @@ -6,6 +6,7 @@ from awx.main.access import ( WorkflowJobAccess, # WorkflowJobNodeAccess ) +from awx.main.models import JobTemplate, WorkflowJobTemplateNode from rest_framework.exceptions import PermissionDenied @@ -87,6 +88,16 @@ class TestWorkflowJobTemplateNodeAccess: job_template.read_role.members.add(rando) assert not access.can_add({'workflow_job_template': wfjt, 'unified_job_template': job_template}) + def test_change_JT_no_start_perm(self, wfjt, rando): + wfjt.admin_role.members.add(rando) + access = WorkflowJobTemplateNodeAccess(rando) + jt1 = JobTemplate.objects.create() + jt1.execute_role.members.add(rando) + assert access.can_add({'workflow_job_template': wfjt, 'unified_job_template': jt1}) + node = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=jt1) + jt2 = JobTemplate.objects.create() + assert not access.can_change(node, {'unified_job_template': jt2.id}) + def test_add_node_with_minimum_permissions(self, wfjt, job_template, inventory, rando): wfjt.admin_role.members.add(rando) access = WorkflowJobTemplateNodeAccess(rando) @@ -104,14 +115,12 @@ class TestWorkflowJobTemplateNodeAccess: @pytest.mark.parametrize( "add_wfjt_admin, add_jt_admin, permission_type, expected_result, method_type", [ - (False, False, None, False, 'can_attach'), (True, False, 'credentials', False, 'can_attach'), (True, True, 'credentials', True, 'can_attach'), (True, False, 'labels', False, 'can_attach'), (True, True, 'labels', True, 'can_attach'), (True, False, 'instance_groups', False, 'can_attach'), (True, True, 'instance_groups', True, 'can_attach'), - (False, False, None, False, 'can_unattach'), (True, False, 'credentials', False, 'can_unattach'), (True, True, 'credentials', True, 'can_unattach'), (True, False, 'labels', False, 'can_unattach'), @@ -128,11 +137,25 @@ class TestWorkflowJobTemplateNodeAccess: if add_jt_admin: job_template.execute_role.members.add(rando) - # We have to mock the JobLaunchConfigAccess because the attachment methods will look at the object type and the relation - # Since we pass None as the second param this will trigger an NotImplementedError from that object - with mocker.patch('awx.main.access.JobLaunchConfigAccess.{}'.format(method_type), return_value=True): - access = WorkflowJobTemplateNodeAccess(rando) - assert getattr(access, method_type)(wfjt_node, None, permission_type, None) == expected_result + from awx.main.models import Credential, Label, InstanceGroup, Organization, CredentialType + + if permission_type == 'credentials': + sub_obj = Credential.objects.create(credential_type=CredentialType.objects.create()) + sub_obj.use_role.members.add(rando) + elif permission_type == 'labels': + sub_obj = Label.objects.create(organization=Organization.objects.create()) + sub_obj.organization.member_role.members.add(rando) + elif permission_type == 'instance_groups': + sub_obj = InstanceGroup.objects.create() + org = Organization.objects.create() + org.admin_role.members.add(rando) # only admins can see IGs + org.instance_groups.add(sub_obj) + + access = WorkflowJobTemplateNodeAccess(rando) + if method_type == 'can_unattach': + assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type) == expected_result + else: + assert getattr(access, method_type)(wfjt_node, sub_obj, permission_type, {}) == expected_result # The actual attachment of labels, credentials and instance groups are tested from JobLaunchConfigAccess