prohibit relaunching workflow jobs from other users

This commit is contained in:
AlanCoding 2018-06-07 11:24:38 -04:00
parent b1f36572c6
commit b0b7f7a295
No known key found for this signature in database
GPG Key ID: FD2C3C012A72926B
2 changed files with 30 additions and 6 deletions

View File

@ -1927,12 +1927,22 @@ class WorkflowJobAccess(BaseAccess):
if not wfjt:
return False
# execute permission to WFJT is mandatory for any relaunch
if self.user not in wfjt.execute_role:
return False
# If job was launched by another user, it could have survey passwords
if obj.created_by_id != self.user.pk:
# Obtain prompts used to start original job
JobLaunchConfig = obj._meta.get_field('launch_config').related_model
try:
config = JobLaunchConfig.objects.get(job=obj)
except JobLaunchConfig.DoesNotExist:
config = None
# user's WFJT access doesn't guarentee permission to launch, introspect nodes
return self.can_recreate(obj)
if config is None or config.prompts_dict():
if self.save_messages:
self.messages['detail'] = _('Job was launched with prompts provided by another user.')
return False
# execute permission to WFJT is mandatory for any relaunch
return (self.user in wfjt.execute_role)
def can_recreate(self, obj):
node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template')

View File

@ -7,7 +7,7 @@ from awx.main.access import (
# WorkflowJobNodeAccess
)
from awx.main.models import InventorySource
from awx.main.models import InventorySource, JobLaunchConfig
@pytest.fixture
@ -135,6 +135,20 @@ class TestWorkflowJobAccess:
access = WorkflowJobAccess(rando)
assert access.can_cancel(workflow_job)
def test_execute_role_relaunch(self, wfjt, workflow_job, rando):
wfjt.execute_role.members.add(rando)
JobLaunchConfig.objects.create(job=workflow_job)
assert WorkflowJobAccess(rando).can_start(workflow_job)
def test_cannot_relaunch_friends_job(self, wfjt, rando, alice):
workflow_job = wfjt.workflow_jobs.create(name='foo', created_by=alice)
JobLaunchConfig.objects.create(
job=workflow_job,
extra_data={'foo': 'fooforyou'}
)
wfjt.execute_role.members.add(alice)
assert not WorkflowJobAccess(rando).can_start(workflow_job)
@pytest.mark.django_db
class TestWFJTCopyAccess: