diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 8d502eb5c9..54309838f5 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -2959,6 +2959,16 @@ class JobTemplateSurveySpec(GenericAPIView): obj_permission_type = 'admin' serializer_class = EmptySerializer + ALLOWED_TYPES = { + 'text': six.string_types, + 'textarea': six.string_types, + 'password': six.string_types, + 'multiplechoice': six.string_types, + 'multiselect': six.string_types, + 'integer': int, + 'float': float + } + def get(self, request, *args, **kwargs): obj = self.get_object() if not feature_enabled('surveys'): @@ -2985,7 +2995,8 @@ class JobTemplateSurveySpec(GenericAPIView): obj.save(update_fields=['survey_spec']) return Response() - def _validate_spec_data(self, new_spec, old_spec): + @staticmethod + def _validate_spec_data(new_spec, old_spec): schema_errors = {} for field, expect_type, type_label in [ ('name', six.string_types, 'string'), @@ -3006,40 +3017,80 @@ class JobTemplateSurveySpec(GenericAPIView): variable_set = set() old_spec_dict = JobTemplate.pivot_spec(old_spec) for idx, survey_item in enumerate(new_spec["spec"]): + context = dict( + idx=six.text_type(idx), + survey_item=survey_item + ) + # General element validation if not isinstance(survey_item, dict): return Response(dict(error=_("Survey question %s is not a json object.") % str(idx)), status=status.HTTP_400_BAD_REQUEST) - if "type" not in survey_item: - return Response(dict(error=_("'type' missing from survey question %s.") % str(idx)), status=status.HTTP_400_BAD_REQUEST) - if "question_name" not in survey_item: - return Response(dict(error=_("'question_name' missing from survey question %s.") % str(idx)), status=status.HTTP_400_BAD_REQUEST) - if "variable" not in survey_item: - return Response(dict(error=_("'variable' missing from survey question %s.") % str(idx)), status=status.HTTP_400_BAD_REQUEST) + for field_name in ['type', 'question_name', 'variable', 'required']: + if field_name not in survey_item: + return Response(dict(error=_("'{field_name}' missing from survey question {idx}").format( + field_name=field_name, **context + )), status=status.HTTP_400_BAD_REQUEST) + val = survey_item[field_name] + allow_types = six.string_types + type_label = 'string' + if field_name == 'required': + allow_types = bool + type_label = 'boolean' + if not isinstance(val, allow_types): + return Response(dict(error=_("'{field_name}' in survey question {idx} expected to be {type_label}.").format( + field_name=field_name, type_label=type_label, **context + ))) if survey_item['variable'] in variable_set: return Response(dict(error=_("'variable' '%(item)s' duplicated in survey question %(survey)s.") % { 'item': survey_item['variable'], 'survey': str(idx)}), status=status.HTTP_400_BAD_REQUEST) else: variable_set.add(survey_item['variable']) - if "required" not in survey_item: - return Response(dict(error=_("'required' missing from survey question %s.") % str(idx)), status=status.HTTP_400_BAD_REQUEST) - if survey_item["type"] == "password" and "default" in survey_item: - if not isinstance(survey_item['default'], six.string_types): + # Type-specific validation + # validate question type <-> default type + qtype = survey_item["type"] + if qtype not in JobTemplateSurveySpec.ALLOWED_TYPES: + return Response(dict(error=_( + "'{survey_item[type]}' in survey question {idx} is not one of '{allowed_types}' allowed question types." + ).format( + allowed_types=', '.join(JobTemplateSurveySpec.ALLOWED_TYPES.keys()), **context + ))) + if 'default' in survey_item: + if not isinstance(survey_item['default'], JobTemplateSurveySpec.ALLOWED_TYPES[qtype]): + type_label = 'string' + if qtype in ['integer', 'float']: + type_label = qtype return Response(dict(error=_( - "Value {question_default} for '{variable_name}' expected to be a string." + "Default value {survey_item[default]} in survey question {idx} expected to be {type_label}." ).format( - question_default=survey_item["default"], variable_name=survey_item["variable"]) - ), status=status.HTTP_400_BAD_REQUEST) + type_label=type_label, **context + )), status=status.HTTP_400_BAD_REQUEST) + # additional type-specific properties, the UI provides these even + # if not applicable to the question, TODO: request that they not do this + for key in ['min', 'max']: + if key in survey_item: + if survey_item[key] is not None and (not isinstance(survey_item[key], int)): + return Response(dict(error=_( + "The {min_or_max} limit in survey question {idx} expected to be integer." + ).format(min_or_max=key, **context))) + if 'choices' in survey_item: + if not isinstance(survey_item['choices'], six.string_types): + return Response(dict(error=_( + "Choices in survey question {idx} expected to be string." + ).format(**context))) + if qtype in ['multiplechoice', 'multiselect'] and 'choices' not in survey_item: + return Response(dict(error=_( + "Survey question {idx} of type {survey_item[type]} must specify choices.".format(**context) + ))) + # Process encryption substitution if ("default" in survey_item and isinstance(survey_item['default'], six.string_types) and survey_item['default'].startswith('$encrypted$')): # Submission expects the existence of encrypted DB value to replace given default - if survey_item["type"] != "password": + if qtype != "password": return Response(dict(error=_( "$encrypted$ is a reserved keyword for password question defaults, " - "survey question {question_position} is type {question_type}." - ).format( - question_position=str(idx), question_type=survey_item["type"]) - ), status=status.HTTP_400_BAD_REQUEST) + "survey question {idx} is type {survey_item[type]}." + ).format(**context)), status=status.HTTP_400_BAD_REQUEST) old_element = old_spec_dict.get(survey_item['variable'], {}) encryptedish_default_exists = False if 'default' in old_element: @@ -3051,10 +3102,10 @@ class JobTemplateSurveySpec(GenericAPIView): encryptedish_default_exists = True if not encryptedish_default_exists: return Response(dict(error=_( - "$encrypted$ is a reserved keyword, may not be used for new default in position {question_position}." - ).format(question_position=str(idx))), status=status.HTTP_400_BAD_REQUEST) + "$encrypted$ is a reserved keyword, may not be used for new default in position {idx}." + ).format(**context)), status=status.HTTP_400_BAD_REQUEST) survey_item['default'] = old_element['default'] - elif survey_item["type"] == "password" and 'default' in survey_item: + elif qtype == "password" and 'default' in survey_item: # Submission provides new encrypted default survey_item['default'] = encrypt_value(survey_item['default']) diff --git a/awx/main/tests/functional/api/test_survey_spec.py b/awx/main/tests/functional/api/test_survey_spec.py index 20155ac654..7eac251132 100644 --- a/awx/main/tests/functional/api/test_survey_spec.py +++ b/awx/main/tests/functional/api/test_survey_spec.py @@ -285,9 +285,7 @@ def test_survey_spec_passwords_with_default_required(job_template_factory, post, @pytest.mark.django_db @pytest.mark.parametrize('default, status', [ ('SUPERSECRET', 200), - (['some', 'invalid', 'list'], 400), ({'some-invalid': 'dict'}, 400), - (False, 400) ]) def test_survey_spec_default_passwords_are_encrypted(job_template, post, admin_user, default, status): job_template.survey_enabled = True @@ -317,7 +315,7 @@ def test_survey_spec_default_passwords_are_encrypted(job_template, post, admin_u 'secret_value': default } else: - assert "for 'secret_value' expected to be a string." in str(resp.data) + assert "expected to be string." in str(resp.data) @mock.patch('awx.api.views.feature_enabled', lambda feature: True) @@ -346,37 +344,6 @@ def test_survey_spec_default_passwords_encrypted_on_update(job_template, post, p assert updated_jt.survey_spec == JobTemplate.objects.get(pk=job_template.pk).survey_spec -# Tests related to survey content validation -@mock.patch('awx.api.views.feature_enabled', lambda feature: True) -@pytest.mark.django_db -@pytest.mark.survey -def test_survey_spec_non_dict_error(deploy_jobtemplate, post, admin_user): - """When a question doesn't follow the standard format, verify error thrown.""" - response = post( - url=reverse('api:job_template_survey_spec', kwargs={'pk': deploy_jobtemplate.id}), - data={ - "description": "Email of the submitter", - "spec": ["What is your email?"], "name": "Email survey" - }, - user=admin_user, - expect=400 - ) - assert response.data['error'] == "Survey question 0 is not a json object." - - -@mock.patch('awx.api.views.feature_enabled', lambda feature: True) -@pytest.mark.django_db -@pytest.mark.survey -def test_survey_spec_dual_names_error(survey_spec_factory, deploy_jobtemplate, post, user): - response = post( - url=reverse('api:job_template_survey_spec', kwargs={'pk': deploy_jobtemplate.id}), - data=survey_spec_factory(['submitter_email', 'submitter_email']), - user=user('admin', True), - expect=400 - ) - assert response.data['error'] == "'variable' 'submitter_email' duplicated in survey question 1." - - # Test actions that should be allowed with non-survey license @mock.patch('awx.main.access.BaseAccess.check_license', new=mock_no_surveys) @pytest.mark.django_db diff --git a/awx/main/tests/unit/api/test_views.py b/awx/main/tests/unit/api/test_views.py index fe1c4974dc..b1c6be4bb0 100644 --- a/awx/main/tests/unit/api/test_views.py +++ b/awx/main/tests/unit/api/test_views.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- import mock import pytest import requests @@ -293,7 +294,7 @@ class TestSurveySpecValidation: new['spec'][0]['default'] = '$encrypted$' new['spec'][0]['required'] = False resp = view._validate_spec_data(new, old) - assert resp is None + assert resp is None, resp.data assert new == { "name": "old survey", "description": "foobar", @@ -357,3 +358,59 @@ class TestSurveySpecValidation: } ] } + + + @staticmethod + def spec_from_element(survey_item): + survey_item.setdefault('name', 'foo') + survey_item.setdefault('variable', 'foo') + survey_item.setdefault('required', False) + survey_item.setdefault('question_name', 'foo') + survey_item.setdefault('type', 'text') + spec = { + 'name': 'test survey', + 'description': 'foo', + 'spec': [survey_item] + } + return spec + + + @pytest.mark.parametrize("survey_item, error_text", [ + ({'type': 'password', 'default': ['some', 'invalid', 'list']}, 'expected to be string'), + ({'type': 'password', 'default': False}, 'expected to be string'), + ({'type': 'integer', 'default': 'foo'}, 'expected to be int'), + ({'type': 'integer', 'default': u'🐉'}, 'expected to be int'), + ({'type': 'foo'}, 'allowed question types'), + ({'type': u'🐉'}, 'allowed question types'), + ({'type': 'multiplechoice'}, 'multiplechoice must specify choices'), + ({'type': 'multiplechoice', 'choices': 45}, 'Choices in survey question 0 expected to be string'), + ({'type': 'integer', 'min': 'foo'}, 'min limit in survey question 0 expected to be integer'), + ({'question_name': 42}, "'question_name' in survey question 0 expected to be string.") + ]) + def test_survey_question_element_validation(self, survey_item, error_text): + spec = self.spec_from_element(survey_item) + r = JobTemplateSurveySpec._validate_spec_data(spec, {}) + assert r is not None, (spec, error_text) + assert 'error' in r.data + assert error_text in r.data['error'] + + + def test_survey_spec_non_dict_error(self): + spec = self.spec_from_element({}) + spec['spec'][0] = 'foo' + r = JobTemplateSurveySpec._validate_spec_data(spec, {}) + assert 'Survey question 0 is not a json object' in r.data['error'] + + + def test_survey_spec_dual_names_error(self): + spec = self.spec_from_element({}) + spec['spec'].append(spec['spec'][0].copy()) + r = JobTemplateSurveySpec._validate_spec_data(spec, {}) + assert "'variable' 'foo' duplicated in survey question 1." in r.data['error'] + + + def test_survey_spec_element_missing_property(self): + spec = self.spec_from_element({}) + spec['spec'][0].pop('type') + r = JobTemplateSurveySpec._validate_spec_data(spec, {}) + assert "'type' missing from survey question 0" in r.data['error']