fix bug, handle RBAC, add test

This commit is contained in:
AlanCoding 2018-09-28 16:03:29 -04:00 committed by Jake McDermott
parent 44fa3b18a9
commit 0c52d17951
No known key found for this signature in database
GPG Key ID: 9A6F084352C3A0B7
6 changed files with 45 additions and 11 deletions

View File

@ -3727,7 +3727,7 @@ class LaunchConfigurationBaseSerializer(BaseSerializer):
if obj is None:
return ret
if 'extra_data' in ret and obj.survey_passwords:
ret['extra_data'] = obj.display_extra_data()
ret['extra_data'] = obj.display_extra_vars()
return ret
def get_summary_fields(self, obj):
@ -4450,6 +4450,11 @@ class WorkflowJobLaunchSerializer(BaseSerializer):
**attrs)
self._ignored_fields = rejected
if template.inventory and template.inventory.pending_deletion is True:
errors['inventory'] = _("The inventory associated with this Workflow is being deleted.")
elif 'inventory' in accepted and accepted['inventory'].pending_deletion:
errors['inventory'] = _("The provided inventory is being deleted.")
if errors:
raise serializers.ValidationError(errors)

View File

@ -3117,6 +3117,9 @@ class WorkflowJobTemplateLaunch(WorkflowsEnforcementMixin, RetrieveAPIView):
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
if not request.user.can_access(JobLaunchConfig, 'add', serializer.validated_data, template=obj):
raise PermissionDenied()
new_job = obj.create_unified_job(**serializer.validated_data)
new_job.signal_start()

View File

@ -1949,19 +1949,29 @@ class WorkflowJobAccess(BaseAccess):
if not template:
return False
# If job was launched by another user, it could have survey passwords
if obj.created_by_id != self.user.pk:
# Obtain prompts used to start original job
JobLaunchConfig = obj._meta.get_field('launch_config').related_model
try:
config = JobLaunchConfig.objects.get(job=obj)
except JobLaunchConfig.DoesNotExist:
config = None
# Obtain prompts used to start original job
JobLaunchConfig = obj._meta.get_field('launch_config').related_model
try:
config = JobLaunchConfig.objects.get(job=obj)
except JobLaunchConfig.DoesNotExist:
if self.save_messages:
self.messages['detail'] = _('Workflow Job was launched with unknown prompts.')
return False
if config is None or config.prompts_dict():
# Check if access to prompts to prevent relaunch
if config.prompts_dict():
if obj.created_by_id != self.user.pk:
if self.save_messages:
self.messages['detail'] = _('Job was launched with prompts provided by another user.')
return False
if not JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}):
if self.save_messages:
self.messages['detail'] = _('Job was launched with prompts you lack access to.')
return False
if config.has_unprompted(template):
if self.save_messages:
self.messages['detail'] = _('Job was launched with prompts no longer accepted.')
return False
# execute permission to WFJT is mandatory for any relaunch
return (self.user in template.execute_role)

View File

@ -1019,6 +1019,8 @@ class LaunchTimeConfig(LaunchTimeConfigBase):
for field_name in JobTemplate.get_ask_mapping().keys():
if field_name == 'extra_vars':
continue
try:
LaunchTimeConfig._meta.get_field(field_name)
except FieldDoesNotExist:

View File

@ -149,6 +149,20 @@ class TestWorkflowJobAccess:
wfjt.execute_role.members.add(alice)
assert not WorkflowJobAccess(rando).can_start(workflow_job)
def test_relaunch_inventory_access(self, workflow_job, inventory, rando):
wfjt = workflow_job.workflow_job_template
wfjt.execute_role.members.add(rando)
assert rando in wfjt.execute_role
workflow_job.created_by = rando
workflow_job.inventory = inventory
workflow_job.save()
wfjt.ask_inventory_on_launch = True
wfjt.save()
JobLaunchConfig.objects.create(job=workflow_job, inventory=inventory)
assert not WorkflowJobAccess(rando).can_start(workflow_job)
inventory.use_role.members.add(rando)
assert WorkflowJobAccess(rando).can_start(workflow_job)
@pytest.mark.django_db
class TestWFJTCopyAccess:

View File

@ -236,4 +236,4 @@ class TestWorkflowJobNodeJobKWARGS:
def test_get_ask_mapping_integrity():
assert WorkflowJobTemplate.get_ask_mapping().keys() == ['extra_vars']
assert WorkflowJobTemplate.get_ask_mapping().keys() == ['extra_vars', 'inventory']