block user from entering encrypted as bare default

This commit is contained in:
AlanCoding 2017-12-01 13:19:56 -05:00
parent 8d162f9044
commit 47f45bf9b3
No known key found for this signature in database
GPG Key ID: FD2C3C012A72926B
3 changed files with 173 additions and 18 deletions

View File

@ -2896,9 +2896,10 @@ class JobTemplateSurveySpec(GenericAPIView):
if len(new_spec["spec"]) < 1:
return Response(dict(error=_("'spec' doesn't contain any items.")), status=status.HTTP_400_BAD_REQUEST)
idx = 0
variable_set = set()
for survey_item in new_spec["spec"]:
old_spec = obj.survey_spec
old_spec_dict = JobTemplate.pivot_spec(old_spec)
for idx, survey_item in enumerate(new_spec["spec"]):
if not isinstance(survey_item, dict):
return Response(dict(error=_("Survey question %s is not a json object.") % str(idx)), status=status.HTTP_400_BAD_REQUEST)
if "type" not in survey_item:
@ -2922,22 +2923,28 @@ class JobTemplateSurveySpec(GenericAPIView):
).format(
question_default=survey_item["default"], variable_name=survey_item["variable"])
), status=status.HTTP_400_BAD_REQUEST)
elif survey_item["default"].startswith('$encrypted$'):
if not obj.survey_spec:
return Response(dict(error=_(
"$encrypted$ is reserved keyword for password questions and may not "
"be used as a default for '{variable_name}' in survey question {question_position}."
).format(
variable_name=survey_item["variable"], question_position=str(idx))
), status=status.HTTP_400_BAD_REQUEST)
else:
old_spec = obj.survey_spec
for old_item in old_spec['spec']:
if old_item['variable'] == survey_item['variable']:
survey_item['default'] = old_item['default']
else:
survey_item['default'] = encrypt_value(survey_item['default'])
idx += 1
if ("default" in survey_item and isinstance(survey_item['default'], six.string_types) and
survey_item['default'].startswith('$encrypted$')):
# Submission expects the existence of encrypted DB value to replace given default
if survey_item["type"] != "password":
return Response(dict(error=_(
"$encrypted$ is a reserved keyword for password question defaults, "
"survey question {question_position} is type {question_type}."
).format(
question_position=str(idx), question_type=survey_item["type"])
), status=status.HTTP_400_BAD_REQUEST)
old_element = old_spec_dict.get(survey_item['variable'], {})
if (survey_item['variable'] not in old_spec_dict or 'default' not in old_element or
not old_element['default'].startswith('$encrypted$') or
old_element['default'] == '$encrypted$'):
return Response(dict(error=_(
"$encrypted$ is a reserved keyword, may not be used for new default in position {question_position}."
).format(question_position=str(idx))), status=status.HTTP_400_BAD_REQUEST)
survey_item['default'] = old_element['default']
elif survey_item["type"] == "password" and 'default' in survey_item:
# Submission provides new encrypted default
survey_item['default'] = encrypt_value(survey_item['default'])
obj.survey_spec = new_spec
obj.save(update_fields=['survey_spec'])

View File

@ -245,6 +245,17 @@ class SurveyJobTemplateMixin(models.Model):
choice_list))
return errors
@staticmethod
def pivot_spec(spec):
'''
Utility method that will return a dictionary keyed off variable names
'''
pivoted = {}
for element_data in spec.get('spec', []):
if 'variable' in element_data:
pivoted[element_data['variable']] = element_data
return pivoted
def survey_variable_validation(self, data):
errors = []
if not self.survey_enabled:

View File

@ -1,6 +1,7 @@
import mock
import pytest
import requests
from copy import deepcopy
from collections import namedtuple
@ -10,14 +11,19 @@ from awx.api.views import (
InventoryInventorySourcesUpdate,
InventoryHostsList,
HostInsights,
JobTemplateSurveySpec
)
from awx.main.models import (
Host,
JobTemplate,
User
)
from awx.main.managers import HostManager
from rest_framework.test import APIRequestFactory
@pytest.fixture
def mock_response_new(mocker):
@ -215,3 +221,134 @@ class TestInventoryHostsList(object):
view = InventoryHostsList()
view.get_queryset()
mock_query.assert_called_once_with('localhost')
class TestSurveySpecValidation:
@pytest.fixture
def spec_view(self):
def view_factory(old_spec):
obj = JobTemplate()
if old_spec:
obj.survey_spec = old_spec
def save(**kwargs):
pass
def get_object():
return get_object.object
get_object.object = obj
obj.save = save
user = User(username='admin')
def can_access(*args, **kwargs):
return True
user.can_access = can_access
request = APIRequestFactory().get('/api/v2/job_templates/42/survey_spec/')
request.user = user
view = JobTemplateSurveySpec()
view.request = request
view.get_object = get_object
return view
return view_factory
def test_create_text_encrypted(self, spec_view):
view = spec_view(None)
view.request.data = {
"name": "new survey",
"description": "foobar",
"spec": [
{
"question_description": "",
"min": 0,
"default": "$encrypted$",
"max": 1024,
"required": True,
"choices": "",
"variable": "openshift_username",
"question_name": "OpenShift Username",
"type": "text"
}
]
}
resp = view.post(view.request)
assert resp.status_code == 400
assert '$encrypted$ is a reserved keyword for password question defaults' in str(resp.data['error'])
def test_change_encrypted_var_name(self, spec_view):
old = {
"name": "old survey",
"description": "foobar",
"spec": [
{
"question_description": "",
"min": 0,
"default": "$encrypted$foooooooo",
"max": 1024,
"required": True,
"choices": "",
"variable": "openshift_username",
"question_name": "OpenShift Username",
"type": "password"
}
]
}
view = spec_view(old)
new = deepcopy(old)
new['spec'][0]['variable'] = 'openstack_username'
view.request.data = new
resp = view.post(view.request)
assert resp.status_code == 400
assert 'may not be used for new default' in str(resp.data['error'])
def test_use_saved_encrypted_default(self, spec_view):
'''
Save is allowed, the $encrypted$ replacement is done
'''
old = {
"name": "old survey",
"description": "foobar",
"spec": [
{
"question_description": "",
"min": 0,
"default": "$encrypted$foooooooo",
"max": 1024,
"required": True,
"choices": "",
"variable": "openshift_username",
"question_name": "OpenShift Username",
"type": "password"
}
]
}
view = spec_view(old)
new = deepcopy(old)
new['spec'][0]['default'] = '$encrypted$'
new['spec'][0]['required'] = False
view.request.data = new
resp = view.post(view.request)
assert resp.status_code == 200
assert resp.data is None
assert view.get_object.object.survey_spec == {
"name": "old survey",
"description": "foobar",
"spec": [
{
"question_description": "",
"min": 0,
"default": "$encrypted$foooooooo",
"max": 1024,
"required": False, # only thing changed
"choices": "",
"variable": "openshift_username",
"question_name": "OpenShift Username",
"type": "password"
}
]
}