From 5720601a2e9d27bb06a3a726fc27f3310bb435da Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 29 Apr 2019 20:38:36 -0400 Subject: [PATCH] allow relaunching other user jobs with public vars --- awx/main/access.py | 19 +++++++++++++++---- awx/main/tests/functional/api/test_job.py | 18 +++++++++++++----- awx/main/tests/functional/test_rbac_job.py | 2 +- awx/main/tests/unit/test_access.py | 15 +++++++++++++++ 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 6d615e8677..c7273bdefa 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -82,6 +82,17 @@ def get_object_from_data(field, Model, data, obj=None): raise ParseError(_("Bad data found in related field %s." % field)) +def vars_are_encrypted(vars): + '''Returns True if any of the values in the dictionary vars contains + content which is encrypted by the AWX encryption algorithm + ''' + for value in vars.values(): + if isinstance(value, str): + if value.startswith('$encrypted$'): + return True + return False + + def register_access(model_class, access_class): access_registry[model_class] = access_class @@ -1677,10 +1688,10 @@ class JobAccess(BaseAccess): prompts_access = False elif not config.has_user_prompts(obj.job_template): prompts_access = True - elif obj.created_by_id != self.user.pk: + elif obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data): prompts_access = False if self.save_messages: - self.messages['detail'] = _('Job was launched with prompts provided by another user.') + self.messages['detail'] = _('Job was launched with secret prompts provided by another user.') else: prompts_access = ( JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}) and @@ -2116,9 +2127,9 @@ class WorkflowJobAccess(BaseAccess): # Check if access to prompts to prevent relaunch if config.prompts_dict(): - if obj.created_by_id != self.user.pk: + if obj.created_by_id != self.user.pk and vars_are_encrypted(config.extra_data): if self.save_messages: - self.messages['detail'] = _('Job was launched with prompts provided by another user.') + self.messages['detail'] = _('Job was launched with secret prompts provided by another user.') return False if not JobLaunchConfigAccess(self.user).can_add({'reference_obj': config}): if self.save_messages: diff --git a/awx/main/tests/functional/api/test_job.py b/awx/main/tests/functional/api/test_job.py index a2aef4b525..8a6871c1d6 100644 --- a/awx/main/tests/functional/api/test_job.py +++ b/awx/main/tests/functional/api/test_job.py @@ -58,7 +58,7 @@ def test_job_relaunch_permission_denied_response( @pytest.mark.django_db -def test_job_relaunch_permission_denied_response_other_user(get, post, inventory, project, alice, bob): +def test_job_relaunch_permission_denied_response_other_user(get, post, inventory, project, alice, bob, survey_spec_factory): ''' Asserts custom permission denied message corresponding to awx/main/tests/functional/test_rbac_job.py::TestJobRelaunchAccess::test_other_user_prompts @@ -66,18 +66,26 @@ def test_job_relaunch_permission_denied_response_other_user(get, post, inventory jt = JobTemplate.objects.create( name='testjt', inventory=inventory, project=project, ask_credential_on_launch=True, - ask_variables_on_launch=True) + ask_variables_on_launch=True, + survey_spec=survey_spec_factory([{'variable': 'secret_key', 'default': '6kQngg3h8lgiSTvIEb21', 'type': 'password'}]), + survey_enabled=True + ) jt.execute_role.members.add(alice, bob) with impersonate(bob): - job = jt.create_unified_job(extra_vars={'job_var': 'foo2'}) + job = jt.create_unified_job(extra_vars={'job_var': 'foo2', 'secret_key': 'sk4t3Rb01'}) # User capability is shown for this r = get(job.get_absolute_url(), alice, expect=200) assert r.data['summary_fields']['user_capabilities']['start'] # Job has prompted data, launch denied w/ message - r = post(reverse('api:job_relaunch', kwargs={'pk':job.pk}), {}, alice, expect=403) - assert 'Job was launched with prompts provided by another user' in r.data['detail'] + r = post( + url=reverse('api:job_relaunch', kwargs={'pk':job.pk}), + data={}, + user=alice, + expect=403 + ) + assert 'Job was launched with secret prompts provided by another user' in r.data['detail'] @pytest.mark.django_db diff --git a/awx/main/tests/functional/test_rbac_job.py b/awx/main/tests/functional/test_rbac_job.py index 4f8b62a574..13866565fa 100644 --- a/awx/main/tests/functional/test_rbac_job.py +++ b/awx/main/tests/functional/test_rbac_job.py @@ -248,7 +248,7 @@ class TestJobRelaunchAccess: jt.execute_role.members.add(alice, bob) with impersonate(bob): - job = jt.create_unified_job(extra_vars={'job_var': 'foo2'}) + job = jt.create_unified_job(extra_vars={'job_var': 'foo2', 'my_secret': '$encrypted$foo'}) assert 'job_var' in job.launch_config.extra_data assert bob.can_access(Job, 'start', job, validate_license=False) diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index 9496f355c8..adb2b2dcae 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -11,6 +11,7 @@ from awx.main.access import ( JobTemplateAccess, WorkflowJobTemplateAccess, SystemJobTemplateAccess, + vars_are_encrypted ) from awx.main.models import ( @@ -114,6 +115,20 @@ class TestRelatedFieldAccess: 'related', mocker.MagicMock, data, obj=resource, mandatory=True) +def test_encrypted_vars_detection(): + assert vars_are_encrypted({ + 'aaa': {'b': 'c'}, + 'alist': [], + 'test_var_eight': '$encrypted$UTF8$AESCBC$Z0FBQUF...==', + 'test_var_five': 'four', + }) + assert not vars_are_encrypted({ + 'aaa': {'b': 'c'}, + 'alist': [], + 'test_var_five': 'four', + }) + + @pytest.fixture def job_template_with_ids(job_template_factory): # Create non-persisted objects with IDs to send to job_template_factory