diff --git a/awx/main/access.py b/awx/main/access.py index 72636e1776..d122d48da8 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1094,17 +1094,32 @@ class JobAccess(BaseAccess): if self.user.is_superuser: return True - # If a user can launch the job template then they can relaunch a job from that - # job template + inventory_access = obj.inventory and self.user in obj.inventory.use_role + credential_access = obj.credential and self.user in obj.credential.use_role + + # Check if JT execute access (and related prompts) is sufficient if obj.job_template is not None: - return self.user in obj.job_template.execute_role + prompts_access = True + job_fields = {} + for fd in obj.job_template._ask_for_vars_dict(): + job_fields[fd] = getattr(obj, fd) + accepted_fields, ignored_fields = obj.job_template._accept_or_ignore_job_kwargs(**job_fields) + for fd in ignored_fields: + if fd != 'extra_vars' and job_fields[fd] != getattr(obj.job_template, fd): + # Job has field that is not promptable + prompts_access = False + if obj.credential != obj.job_template.credential and not credential_access: + prompts_access = False + if obj.inventory != obj.job_template.inventory and not inventory_access: + prompts_access = False + if prompts_access and self.user in obj.job_template.execute_role: + return True - inventory_access = self.user in obj.inventory.use_role - credential_access = self.user in obj.credential.use_role - org_access = self.user in obj.inventory.organization.admin_role + org_access = obj.inventory and self.user in obj.inventory.organization.admin_role project_access = obj.project is None or self.user in obj.project.admin_role + # job can be relaunched if user could make an equivalent JT return inventory_access and credential_access and (org_access or project_access) def can_cancel(self, obj): diff --git a/awx/main/tests/functional/test_rbac_job_start.py b/awx/main/tests/functional/test_rbac_job_start.py index 18060126e1..c934973cf4 100644 --- a/awx/main/tests/functional/test_rbac_job_start.py +++ b/awx/main/tests/functional/test_rbac_job_start.py @@ -2,11 +2,7 @@ import pytest from awx.main.models.inventory import Inventory from awx.main.models.credential import Credential -from awx.main.models.jobs import JobTemplate - -@pytest.fixture -def machine_credential(): - return Credential.objects.create(name='machine-cred', kind='ssh', username='test_user', password='pas4word') +from awx.main.models.jobs import JobTemplate, Job @pytest.mark.django_db @pytest.mark.job_permissions @@ -45,3 +41,52 @@ def test_inventory_use_access(inventory, user): inventory.use_role.members.add(common_user) assert common_user.can_access(Inventory, 'use', inventory) + +@pytest.mark.django_db +class TestJobRelaunchAccess: + @pytest.fixture + def job_no_prompts(self, machine_credential, inventory): + jt = JobTemplate.objects.create(name='test-job_template', credential=machine_credential, inventory=inventory) + return jt.create_unified_job() + + @pytest.fixture + def job_with_prompts(self, machine_credential, inventory, organization): + jt = JobTemplate.objects.create( + name='test-job-template-prompts', credential=machine_credential, inventory=inventory, + ask_tags_on_launch=True, ask_variables_on_launch=True, ask_skip_tags_on_launch=True, + ask_limit_on_launch=True, ask_job_type_on_launch=True, ask_inventory_on_launch=True, + ask_credential_on_launch=True) + new_cred = Credential.objects.create(name='new-cred', kind='ssh', username='test_user', password='pas4word') + new_inv = Inventory.objects.create(name='new-inv', organization=organization) + return jt.create_unified_job(credential=new_cred, inventory=new_inv) + + def test_normal_relaunch_via_job_template(self, job_no_prompts, rando): + "Has JT execute_role, job unchanged relative to JT" + job_no_prompts.job_template.execute_role.members.add(rando) + assert rando.can_access(Job, 'start', job_no_prompts) + + def test_no_relaunch_without_prompted_fields_access(self, job_with_prompts, rando): + "Has JT execute_role but no use_role on inventory & credential - deny relaunch" + job_with_prompts.job_template.execute_role.members.add(rando) + assert not rando.can_access(Job, 'start', job_with_prompts) + + def test_can_relaunch_with_prompted_fields_access(self, job_with_prompts, rando): + "Has use_role on the prompted inventory & credential - allow relaunch" + job_with_prompts.job_template.execute_role.members.add(rando) + job_with_prompts.credential.use_role.members.add(rando) + job_with_prompts.inventory.use_role.members.add(rando) + assert rando.can_access(Job, 'start', job_with_prompts) + + def test_no_relaunch_after_limit_change(self, job_no_prompts, rando): + "State of the job contradicts the JT state - deny relaunch" + job_no_prompts.job_template.execute_role.members.add(rando) + job_no_prompts.limit = 'webservers' + job_no_prompts.save() + assert not rando.can_access(Job, 'start', job_no_prompts) + + def test_can_relaunch_if_limit_was_prompt(self, job_with_prompts, rando): + "Job state differs from JT, but only on prompted fields - allow relaunch" + job_with_prompts.job_template.execute_role.members.add(rando) + job_with_prompts.limit = 'webservers' + job_with_prompts.save() + assert not rando.can_access(Job, 'start', job_with_prompts)