diff --git a/awx/api/views.py b/awx/api/views.py index bb65c904a9..06c6d68ec7 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2896,9 +2896,10 @@ class JobTemplateSurveySpec(GenericAPIView): if len(new_spec["spec"]) < 1: return Response(dict(error=_("'spec' doesn't contain any items.")), status=status.HTTP_400_BAD_REQUEST) - idx = 0 variable_set = set() - for survey_item in new_spec["spec"]: + old_spec = obj.survey_spec + old_spec_dict = JobTemplate.pivot_spec(old_spec) + for idx, survey_item in enumerate(new_spec["spec"]): if not isinstance(survey_item, dict): return Response(dict(error=_("Survey question %s is not a json object.") % str(idx)), status=status.HTTP_400_BAD_REQUEST) if "type" not in survey_item: @@ -2922,22 +2923,28 @@ class JobTemplateSurveySpec(GenericAPIView): ).format( question_default=survey_item["default"], variable_name=survey_item["variable"]) ), status=status.HTTP_400_BAD_REQUEST) - elif survey_item["default"].startswith('$encrypted$'): - if not obj.survey_spec: - return Response(dict(error=_( - "$encrypted$ is reserved keyword for password questions and may not " - "be used as a default for '{variable_name}' in survey question {question_position}." - ).format( - variable_name=survey_item["variable"], question_position=str(idx)) - ), status=status.HTTP_400_BAD_REQUEST) - else: - old_spec = obj.survey_spec - for old_item in old_spec['spec']: - if old_item['variable'] == survey_item['variable']: - survey_item['default'] = old_item['default'] - else: - survey_item['default'] = encrypt_value(survey_item['default']) - idx += 1 + + if ("default" in survey_item and isinstance(survey_item['default'], six.string_types) and + survey_item['default'].startswith('$encrypted$')): + # Submission expects the existence of encrypted DB value to replace given default + if survey_item["type"] != "password": + return Response(dict(error=_( + "$encrypted$ is a reserved keyword for password question defaults, " + "survey question {question_position} is type {question_type}." + ).format( + question_position=str(idx), question_type=survey_item["type"]) + ), status=status.HTTP_400_BAD_REQUEST) + old_element = old_spec_dict.get(survey_item['variable'], {}) + if (survey_item['variable'] not in old_spec_dict or 'default' not in old_element or + not old_element['default'].startswith('$encrypted$') or + old_element['default'] == '$encrypted$'): + return Response(dict(error=_( + "$encrypted$ is a reserved keyword, may not be used for new default in position {question_position}." + ).format(question_position=str(idx))), status=status.HTTP_400_BAD_REQUEST) + survey_item['default'] = old_element['default'] + elif survey_item["type"] == "password" and 'default' in survey_item: + # Submission provides new encrypted default + survey_item['default'] = encrypt_value(survey_item['default']) obj.survey_spec = new_spec obj.save(update_fields=['survey_spec']) diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index ba01ad8ce6..062a9638ee 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -245,6 +245,17 @@ class SurveyJobTemplateMixin(models.Model): choice_list)) return errors + @staticmethod + def pivot_spec(spec): + ''' + Utility method that will return a dictionary keyed off variable names + ''' + pivoted = {} + for element_data in spec.get('spec', []): + if 'variable' in element_data: + pivoted[element_data['variable']] = element_data + return pivoted + def survey_variable_validation(self, data): errors = [] if not self.survey_enabled: diff --git a/awx/main/tests/unit/api/test_views.py b/awx/main/tests/unit/api/test_views.py index 68ba754d48..0c93730d98 100644 --- a/awx/main/tests/unit/api/test_views.py +++ b/awx/main/tests/unit/api/test_views.py @@ -1,6 +1,7 @@ import mock import pytest import requests +from copy import deepcopy from collections import namedtuple @@ -10,14 +11,19 @@ from awx.api.views import ( InventoryInventorySourcesUpdate, InventoryHostsList, HostInsights, + JobTemplateSurveySpec ) from awx.main.models import ( Host, + JobTemplate, + User ) from awx.main.managers import HostManager +from rest_framework.test import APIRequestFactory + @pytest.fixture def mock_response_new(mocker): @@ -215,3 +221,134 @@ class TestInventoryHostsList(object): view = InventoryHostsList() view.get_queryset() mock_query.assert_called_once_with('localhost') + + +class TestSurveySpecValidation: + + @pytest.fixture + def spec_view(self): + def view_factory(old_spec): + obj = JobTemplate() + if old_spec: + obj.survey_spec = old_spec + + def save(**kwargs): + pass + + def get_object(): + return get_object.object + + get_object.object = obj + obj.save = save + + user = User(username='admin') + + def can_access(*args, **kwargs): + return True + + user.can_access = can_access + + request = APIRequestFactory().get('/api/v2/job_templates/42/survey_spec/') + request.user = user + view = JobTemplateSurveySpec() + view.request = request + view.get_object = get_object + return view + return view_factory + + def test_create_text_encrypted(self, spec_view): + view = spec_view(None) + view.request.data = { + "name": "new survey", + "description": "foobar", + "spec": [ + { + "question_description": "", + "min": 0, + "default": "$encrypted$", + "max": 1024, + "required": True, + "choices": "", + "variable": "openshift_username", + "question_name": "OpenShift Username", + "type": "text" + } + ] + } + resp = view.post(view.request) + assert resp.status_code == 400 + assert '$encrypted$ is a reserved keyword for password question defaults' in str(resp.data['error']) + + + def test_change_encrypted_var_name(self, spec_view): + old = { + "name": "old survey", + "description": "foobar", + "spec": [ + { + "question_description": "", + "min": 0, + "default": "$encrypted$foooooooo", + "max": 1024, + "required": True, + "choices": "", + "variable": "openshift_username", + "question_name": "OpenShift Username", + "type": "password" + } + ] + } + view = spec_view(old) + new = deepcopy(old) + new['spec'][0]['variable'] = 'openstack_username' + view.request.data = new + resp = view.post(view.request) + assert resp.status_code == 400 + assert 'may not be used for new default' in str(resp.data['error']) + + def test_use_saved_encrypted_default(self, spec_view): + ''' + Save is allowed, the $encrypted$ replacement is done + ''' + old = { + "name": "old survey", + "description": "foobar", + "spec": [ + { + "question_description": "", + "min": 0, + "default": "$encrypted$foooooooo", + "max": 1024, + "required": True, + "choices": "", + "variable": "openshift_username", + "question_name": "OpenShift Username", + "type": "password" + } + ] + } + view = spec_view(old) + new = deepcopy(old) + new['spec'][0]['default'] = '$encrypted$' + new['spec'][0]['required'] = False + view.request.data = new + resp = view.post(view.request) + assert resp.status_code == 200 + assert resp.data is None + assert view.get_object.object.survey_spec == { + "name": "old survey", + "description": "foobar", + "spec": [ + { + "question_description": "", + "min": 0, + "default": "$encrypted$foooooooo", + "max": 1024, + "required": False, # only thing changed + "choices": "", + "variable": "openshift_username", + "question_name": "OpenShift Username", + "type": "password" + } + ] + }