From f82b10f018cfc2a1f168a74d1ef5f9b05acbd879 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 29 Sep 2016 13:35:47 -0400 Subject: [PATCH 1/2] Job relaunch permissions made consistent with JT prompts --- awx/main/access.py | 30 ++++++++-- .../tests/functional/test_rbac_job_start.py | 56 +++++++++++++++++-- 2 files changed, 75 insertions(+), 11 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 72636e1776..f3b8ef22e1 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1094,17 +1094,35 @@ class JobAccess(BaseAccess): if self.user.is_superuser: return True - # If a user can launch the job template then they can relaunch a job from that - # job template + inventory_access = obj.inventory and self.user in obj.inventory.use_role + credential_access = obj.credential and self.user in obj.credential.use_role + + # Check if JT execute access (and related prompts) is sufficient if obj.job_template is not None: - return self.user in obj.job_template.execute_role + prompts_access = True + job_fields = {} + for fd in obj.job_template._ask_for_vars_dict(): + job_fields[fd] = getattr(obj, fd) + accepted_fields, ignored_fields = obj.job_template._accept_or_ignore_job_kwargs(**job_fields) + for fd in ignored_fields: + if fd == 'extra_vars': + if ignored_fields[fd]: + prompts_access = False + elif job_fields[fd] != getattr(obj.job_template, fd): + # Job has field that is not promptable + prompts_access = False + if obj.credential != obj.job_template.credential and not credential_access: + prompts_access = False + if obj.inventory != obj.job_template.inventory and not inventory_access: + prompts_access = False + if prompts_access and self.user in obj.job_template.execute_role: + return True - inventory_access = self.user in obj.inventory.use_role - credential_access = self.user in obj.credential.use_role - org_access = self.user in obj.inventory.organization.admin_role + org_access = obj.inventory and self.user in obj.inventory.organization.admin_role project_access = obj.project is None or self.user in obj.project.admin_role + # job can be relaunched if user could make an equivalent JT return inventory_access and credential_access and (org_access or project_access) def can_cancel(self, obj): diff --git a/awx/main/tests/functional/test_rbac_job_start.py b/awx/main/tests/functional/test_rbac_job_start.py index 18060126e1..00358d1c38 100644 --- a/awx/main/tests/functional/test_rbac_job_start.py +++ b/awx/main/tests/functional/test_rbac_job_start.py @@ -2,11 +2,7 @@ import pytest from awx.main.models.inventory import Inventory from awx.main.models.credential import Credential -from awx.main.models.jobs import JobTemplate - -@pytest.fixture -def machine_credential(): - return Credential.objects.create(name='machine-cred', kind='ssh', username='test_user', password='pas4word') +from awx.main.models.jobs import JobTemplate, Job @pytest.mark.django_db @pytest.mark.job_permissions @@ -45,3 +41,53 @@ def test_inventory_use_access(inventory, user): inventory.use_role.members.add(common_user) assert common_user.can_access(Inventory, 'use', inventory) + +@pytest.mark.django_db +class TestJobRelaunchAccess: + @pytest.fixture + def jt_no_prompts(self, machine_credential, inventory): + return JobTemplate.objects.create(name='test-job_template', credential=machine_credential, inventory=inventory) + + @pytest.fixture + def jt_with_prompts(self, jt_no_prompts): + jt_no_prompts.update( + ask_tags_on_launch=True, ask_variables_on_launch=True, ask_skip_tags_on_launch=True, + ask_limit_on_launch=True, ask_job_type_on_launch=True, ask_inventory_on_launch=True, + ask_credential_on_launch=True) + return jt_no_prompts + + @pytest.fixture + def job_no_prompts(self, jt_no_prompts): + return jt_no_prompts.create_unified_job() + + @pytest.fixture + def job_with_prompts(self, jt_with_prompts, organization): + new_cred = Credential.objects.create(name='new-cred', kind='ssh', username='test_user', password='pas4word') + new_inv = Inventory.objects.create(name='new-inv', organization=organization) + return jt_with_prompts.create_unified_job(credential=new_cred, inventory=new_inv) + + def test_no_relaunch_without_prompted_fields_access(self, job_with_prompts, rando): + "Has JT execute_role but no use_role on inventory & credential - deny relaunch" + job_with_prompts.job_template.execute_role.members.add(rando) + assert not rando.can_access(Job, 'start', job_with_prompts) + + def test_can_relaunch_with_prompted_fields_access(self, job_with_prompts, rando): + "Has use_role on the prompted inventory & credential - allow relaunch" + job_with_prompts.job_template.execute_role.members.add(rando) + job_with_prompts.credential.use_role.members.add(rando) + job_with_prompts.inventory.use_role.members.add(rando) + assert rando.can_access(Job, 'start', job_with_prompts) + + def test_no_relaunch_after_limit_change(self, job_no_prompts, rando): + "State of the job contradicts the JT state - deny relaunch" + job_no_prompts.job_template.execute_role.members.add(rando) + job_no_prompts.limit = 'webservers' + job_no_prompts.save() + assert not rando.can_access(Job, 'start', job_no_prompts) + + def test_can_relaunch_if_limit_was_prompt(self, job_with_prompts, rando): + "Job state differs from JT, but only on prompted fields - allow relaunch" + job_with_prompts.job_template.execute_role.members.add(rando) + job_with_prompts.limit = 'webservers' + job_with_prompts.save() + assert not rando.can_access(Job, 'start', job_with_prompts) From 115dfac471169269689546d2f552faf88296082f Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 29 Sep 2016 16:42:41 -0400 Subject: [PATCH 2/2] do not introspect extra_vars, speed up tests, add base case test --- awx/main/access.py | 5 +--- .../tests/functional/test_rbac_job_start.py | 25 +++++++++---------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index f3b8ef22e1..d122d48da8 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1105,10 +1105,7 @@ class JobAccess(BaseAccess): job_fields[fd] = getattr(obj, fd) accepted_fields, ignored_fields = obj.job_template._accept_or_ignore_job_kwargs(**job_fields) for fd in ignored_fields: - if fd == 'extra_vars': - if ignored_fields[fd]: - prompts_access = False - elif job_fields[fd] != getattr(obj.job_template, fd): + if fd != 'extra_vars' and job_fields[fd] != getattr(obj.job_template, fd): # Job has field that is not promptable prompts_access = False if obj.credential != obj.job_template.credential and not credential_access: diff --git a/awx/main/tests/functional/test_rbac_job_start.py b/awx/main/tests/functional/test_rbac_job_start.py index 00358d1c38..c934973cf4 100644 --- a/awx/main/tests/functional/test_rbac_job_start.py +++ b/awx/main/tests/functional/test_rbac_job_start.py @@ -45,26 +45,25 @@ def test_inventory_use_access(inventory, user): @pytest.mark.django_db class TestJobRelaunchAccess: @pytest.fixture - def jt_no_prompts(self, machine_credential, inventory): - return JobTemplate.objects.create(name='test-job_template', credential=machine_credential, inventory=inventory) + def job_no_prompts(self, machine_credential, inventory): + jt = JobTemplate.objects.create(name='test-job_template', credential=machine_credential, inventory=inventory) + return jt.create_unified_job() @pytest.fixture - def jt_with_prompts(self, jt_no_prompts): - jt_no_prompts.update( + def job_with_prompts(self, machine_credential, inventory, organization): + jt = JobTemplate.objects.create( + name='test-job-template-prompts', credential=machine_credential, inventory=inventory, ask_tags_on_launch=True, ask_variables_on_launch=True, ask_skip_tags_on_launch=True, ask_limit_on_launch=True, ask_job_type_on_launch=True, ask_inventory_on_launch=True, ask_credential_on_launch=True) - return jt_no_prompts - - @pytest.fixture - def job_no_prompts(self, jt_no_prompts): - return jt_no_prompts.create_unified_job() - - @pytest.fixture - def job_with_prompts(self, jt_with_prompts, organization): new_cred = Credential.objects.create(name='new-cred', kind='ssh', username='test_user', password='pas4word') new_inv = Inventory.objects.create(name='new-inv', organization=organization) - return jt_with_prompts.create_unified_job(credential=new_cred, inventory=new_inv) + return jt.create_unified_job(credential=new_cred, inventory=new_inv) + + def test_normal_relaunch_via_job_template(self, job_no_prompts, rando): + "Has JT execute_role, job unchanged relative to JT" + job_no_prompts.job_template.execute_role.members.add(rando) + assert rando.can_access(Job, 'start', job_no_prompts) def test_no_relaunch_without_prompted_fields_access(self, job_with_prompts, rando): "Has JT execute_role but no use_role on inventory & credential - deny relaunch"