diff --git a/awx/main/access.py b/awx/main/access.py index b5a81e443a..c3200b77bf 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1392,26 +1392,45 @@ class JobAccess(BaseAccess): inventory_access = obj.inventory and self.user in obj.inventory.use_role credential_access = obj.credential and self.user in obj.credential.use_role + job_extra_credentials = set(obj.extra_credentials.all()) + if job_extra_credentials: + credential_access = False # Check if JT execute access (and related prompts) is sufficient if obj.job_template is not None: prompts_access = True job_fields = {} + jt_extra_credentials = set(obj.job_template.extra_credentials.all()) for fd in obj.job_template._ask_for_vars_dict(): + if fd == 'extra_credentials': + job_fields[fd] = job_extra_credentials job_fields[fd] = getattr(obj, fd) accepted_fields, ignored_fields = obj.job_template._accept_or_ignore_job_kwargs(**job_fields) + # Check if job fields are not allowed by current _on_launch settings for fd in ignored_fields: - if fd == 'extra_credentials': - if set(job_fields[fd].all()) != set(getattr(obj.job_template, fd).all()): + if fd == 'extra_vars': + continue # we cannot yet validate validity of prompted extra_vars + elif fd == 'extra_credentials': + if job_extra_credentials != jt_extra_credentials: # Job has extra_credentials that are not promptable prompts_access = False - elif fd != 'extra_vars' and job_fields[fd] != getattr(obj.job_template, fd): + break + elif job_fields[fd] != getattr(obj.job_template, fd): # Job has field that is not promptable prompts_access = False - if obj.credential != obj.job_template.credential and not credential_access: - prompts_access = False - if obj.inventory != obj.job_template.inventory and not inventory_access: - prompts_access = False + break + # For those fields that are allowed by prompting, but differ + # from JT, assure that user has explicit access to them + if prompts_access: + if obj.credential != obj.job_template.credential and not credential_access: + prompts_access = False + if obj.inventory != obj.job_template.inventory and not inventory_access: + prompts_access = False + if prompts_access and job_extra_credentials != jt_extra_credentials: + for cred in job_extra_credentials: + if self.user not in cred.use_role: + prompts_access = False + break if prompts_access and self.user in obj.job_template.execute_role: return True diff --git a/awx/main/tests/functional/test_rbac_job.py b/awx/main/tests/functional/test_rbac_job.py index 7ad79adaac..d721f2b98e 100644 --- a/awx/main/tests/functional/test_rbac_job.py +++ b/awx/main/tests/functional/test_rbac_job.py @@ -149,7 +149,7 @@ class TestJobRelaunchAccess: assert not inventory_user.can_access(Job, 'start', job_with_links, validate_license=False) def test_job_relaunch_extra_credential_access( - self, post, inventory, project, credential, net_credential): + self, inventory, project, credential, net_credential): jt = JobTemplate.objects.create(name='testjt', inventory=inventory, project=project) jt.extra_credentials.add(credential) job = jt.create_unified_job() @@ -164,6 +164,45 @@ class TestJobRelaunchAccess: job.extra_credentials.add(net_credential) assert not jt_user.can_access(Job, 'start', job, validate_license=False) + def test_prompted_extra_credential_relaunch_denied( + self, inventory, project, net_credential, rando): + jt = JobTemplate.objects.create( + name='testjt', inventory=inventory, project=project, + ask_credential_on_launch=True) + job = jt.create_unified_job() + jt.execute_role.members.add(rando) + + # Job has prompted extra_credential, rando lacks permission to use it + job.extra_credentials.add(net_credential) + assert not rando.can_access(Job, 'start', job, validate_license=False) + + def test_prompted_extra_credential_relaunch_allowed( + self, inventory, project, net_credential, rando): + jt = JobTemplate.objects.create( + name='testjt', inventory=inventory, project=project, + ask_credential_on_launch=True) + job = jt.create_unified_job() + jt.execute_role.members.add(rando) + + # Job has prompted extra_credential, but rando can use it + net_credential.use_role.members.add(rando) + job.extra_credentials.add(net_credential) + assert rando.can_access(Job, 'start', job, validate_license=False) + + def test_extra_credential_relaunch_recreation_permission( + self, inventory, project, net_credential, credential, rando): + jt = JobTemplate.objects.create( + name='testjt', inventory=inventory, project=project, + credential=credential, ask_credential_on_launch=True) + job = jt.create_unified_job() + project.admin_role.members.add(rando) + inventory.admin_role.members.add(rando) + credential.admin_role.members.add(rando) + + # Relaunch blocked by the extra credential + job.extra_credentials.add(net_credential) + assert not rando.can_access(Job, 'start', job, validate_license=False) + @pytest.mark.django_db class TestJobAndUpdateCancels: