From b0b7f7a2958a2a7e71ca987dc112144250001cbf Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 7 Jun 2018 11:24:38 -0400 Subject: [PATCH] prohibit relaunching workflow jobs from other users --- awx/main/access.py | 20 ++++++++++++++----- .../tests/functional/test_rbac_workflow.py | 16 ++++++++++++++- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 9ff9973269..f7c37b9970 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1927,12 +1927,22 @@ class WorkflowJobAccess(BaseAccess): if not wfjt: return False - # execute permission to WFJT is mandatory for any relaunch - if self.user not in wfjt.execute_role: - return False + # If job was launched by another user, it could have survey passwords + if obj.created_by_id != self.user.pk: + # Obtain prompts used to start original job + JobLaunchConfig = obj._meta.get_field('launch_config').related_model + try: + config = JobLaunchConfig.objects.get(job=obj) + except JobLaunchConfig.DoesNotExist: + config = None - # user's WFJT access doesn't guarentee permission to launch, introspect nodes - return self.can_recreate(obj) + if config is None or config.prompts_dict(): + if self.save_messages: + self.messages['detail'] = _('Job was launched with prompts provided by another user.') + return False + + # execute permission to WFJT is mandatory for any relaunch + return (self.user in wfjt.execute_role) def can_recreate(self, obj): node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template') diff --git a/awx/main/tests/functional/test_rbac_workflow.py b/awx/main/tests/functional/test_rbac_workflow.py index 5cd63027d2..116b9ec834 100644 --- a/awx/main/tests/functional/test_rbac_workflow.py +++ b/awx/main/tests/functional/test_rbac_workflow.py @@ -7,7 +7,7 @@ from awx.main.access import ( # WorkflowJobNodeAccess ) -from awx.main.models import InventorySource +from awx.main.models import InventorySource, JobLaunchConfig @pytest.fixture @@ -135,6 +135,20 @@ class TestWorkflowJobAccess: access = WorkflowJobAccess(rando) assert access.can_cancel(workflow_job) + def test_execute_role_relaunch(self, wfjt, workflow_job, rando): + wfjt.execute_role.members.add(rando) + JobLaunchConfig.objects.create(job=workflow_job) + assert WorkflowJobAccess(rando).can_start(workflow_job) + + def test_cannot_relaunch_friends_job(self, wfjt, rando, alice): + workflow_job = wfjt.workflow_jobs.create(name='foo', created_by=alice) + JobLaunchConfig.objects.create( + job=workflow_job, + extra_data={'foo': 'fooforyou'} + ) + wfjt.execute_role.members.add(alice) + assert not WorkflowJobAccess(rando).can_start(workflow_job) + @pytest.mark.django_db class TestWFJTCopyAccess: