use field validation in both filter classes

This commit is contained in:
AlanCoding 2018-04-10 13:44:08 -04:00
parent 9319bbce8d
commit 6306ac2825
No known key found for this signature in database
GPG Key ID: FD2C3C012A72926B
3 changed files with 113 additions and 66 deletions

View File

@ -77,6 +77,65 @@ class TypeFilterBackend(BaseFilterBackend):
raise ParseError(*e.args)
def get_field_from_path(model, path):
'''
Given a Django ORM lookup path (possibly over multiple models)
Returns the last field in the line, and also the revised lookup path
ex., given
model=Organization
path='project__timeout'
returns tuple of field at the end of the line as well as a corrected
path, for special cases we do substitutions
(<IntegerField for timeout>, 'project__timeout')
'''
# Store of all the fields used to detect repeats
field_set = set([])
new_parts = []
for name in path.split('__'):
if model is None:
raise ParseError(_('No related model for field {}.').format(name))
# HACK: Make project and inventory source filtering by old field names work for backwards compatibility.
if model._meta.object_name in ('Project', 'InventorySource'):
name = {
'current_update': 'current_job',
'last_update': 'last_job',
'last_update_failed': 'last_job_failed',
'last_updated': 'last_job_run',
}.get(name, name)
if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model):
name = 'polymorphic_ctype'
new_parts.append('polymorphic_ctype__model')
else:
new_parts.append(name)
if name in getattr(model, 'PASSWORD_FIELDS', ()):
raise PermissionDenied(_('Filtering on password fields is not allowed.'))
elif name == 'pk':
field = model._meta.pk
else:
name_alt = name.replace("_", "")
if name_alt in model._meta.fields_map.keys():
field = model._meta.fields_map[name_alt]
new_parts.pop()
new_parts.append(name_alt)
else:
field = model._meta.get_field(name)
if 'auth' in name or 'token' in name:
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False):
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
elif getattr(field, '__prevent_search__', False):
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
if field in field_set:
# Field traversed twice, could create infinite JOINs, DoSing Tower
raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name))
field_set.add(field)
model = getattr(field, 'related_model', None)
return field, '__'.join(new_parts)
class FieldLookupBackend(BaseFilterBackend):
'''
Filter using field lookups provided via query string parameters.
@ -91,61 +150,23 @@ class FieldLookupBackend(BaseFilterBackend):
'isnull', 'search')
def get_field_from_lookup(self, model, lookup):
field = None
parts = lookup.split('__')
if parts and parts[-1] not in self.SUPPORTED_LOOKUPS:
parts.append('exact')
if '__' in lookup and lookup.rsplit('__', 1)[-1] in self.SUPPORTED_LOOKUPS:
path, suffix = lookup.rsplit('__', 1)
else:
path = lookup
suffix = 'exact'
if not path:
raise ParseError(_('Query string field name not provided.'))
# FIXME: Could build up a list of models used across relationships, use
# those lookups combined with request.user.get_queryset(Model) to make
# sure user cannot query using objects he could not view.
new_parts = []
field, new_path = get_field_from_path(model, path)
# Store of all the fields used to detect repeats
field_set = set([])
for name in parts[:-1]:
# HACK: Make project and inventory source filtering by old field names work for backwards compatibility.
if model._meta.object_name in ('Project', 'InventorySource'):
name = {
'current_update': 'current_job',
'last_update': 'last_job',
'last_update_failed': 'last_job_failed',
'last_updated': 'last_job_run',
}.get(name, name)
if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model):
name = 'polymorphic_ctype'
new_parts.append('polymorphic_ctype__model')
else:
new_parts.append(name)
if name in getattr(model, 'PASSWORD_FIELDS', ()):
raise PermissionDenied(_('Filtering on password fields is not allowed.'))
elif name == 'pk':
field = model._meta.pk
else:
name_alt = name.replace("_", "")
if name_alt in model._meta.fields_map.keys():
field = model._meta.fields_map[name_alt]
new_parts.pop()
new_parts.append(name_alt)
else:
field = model._meta.get_field(name)
if 'auth' in name or 'token' in name:
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False):
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
elif getattr(field, '__prevent_search__', False):
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
if field in field_set:
# Field traversed twice, could create infinite JOINs, DoSing Tower
raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name))
field_set.add(field)
model = getattr(field, 'related_model', None) or field.model
if parts:
new_parts.append(parts[-1])
new_lookup = '__'.join(new_parts)
new_lookup = new_path
new_lookup = '__'.join([new_path, suffix])
return field, new_lookup
def to_python_related(self, value):
@ -371,7 +392,7 @@ class OrderByBackend(BaseFilterBackend):
else:
order_by = (value,)
if order_by:
order_by = self._strip_sensitive_model_fields(queryset.model, order_by)
order_by = self._validate_ordering_fields(queryset.model, order_by)
# Special handling of the type field for ordering. In this
# case, we're not sorting exactly on the type field, but
@ -396,15 +417,17 @@ class OrderByBackend(BaseFilterBackend):
# Return a 400 for invalid field names.
raise ParseError(*e.args)
def _strip_sensitive_model_fields(self, model, order_by):
def _validate_ordering_fields(self, model, order_by):
for field_name in order_by:
# strip off the negation prefix `-` if it exists
_field_name = field_name.split('-')[-1]
prefix = ''
path = field_name
if field_name[0] == '-':
prefix = field_name[0]
path = field_name[1:]
try:
# if the field name is encrypted/sensitive, don't sort on it
if _field_name in getattr(model, 'PASSWORD_FIELDS', ()) or \
getattr(model._meta.get_field(_field_name), '__prevent_search__', False):
raise ParseError(_('cannot order by field %s') % _field_name)
except FieldDoesNotExist:
pass
yield field_name
field, new_path = get_field_from_path(model, path)
new_path = '{}{}'.format(prefix, new_path)
except (FieldError, FieldDoesNotExist) as e:
raise ParseError(e.args[0])
yield new_path

View File

@ -126,9 +126,8 @@ def test_list_cannot_order_by_unsearchable_field(get, organization, alice, order
)
custom_script.admin_role.members.add(alice)
response = get(reverse('api:inventory_script_list'), alice,
QUERY_STRING='order_by=%s' % order_by, status=400)
assert response.status_code == 400
get(reverse('api:inventory_script_list'), alice,
QUERY_STRING='order_by=%s' % order_by, expect=403)
@pytest.mark.parametrize("role_field,expected_status_code", [

View File

@ -3,14 +3,18 @@
import pytest
from rest_framework.exceptions import PermissionDenied, ParseError
from awx.api.filters import FieldLookupBackend
from awx.api.filters import FieldLookupBackend, OrderByBackend, get_field_from_path
from awx.main.models import (AdHocCommand, ActivityStream,
CustomInventoryScript, Credential, Job,
JobTemplate, SystemJob, UnifiedJob, User,
WorkflowJob, WorkflowJobTemplate,
WorkflowJobOptions, InventorySource)
WorkflowJobOptions, InventorySource,
JobEvent)
from awx.main.models.jobs import JobOptions
# Django
from django.db.models.fields import FieldDoesNotExist
def test_related():
field_lookup = FieldLookupBackend()
@ -20,6 +24,27 @@ def test_related():
print(new_lookup)
def test_invalid_filter_key():
field_lookup = FieldLookupBackend()
# FieldDoesNotExist is caught and converted to ParseError by filter_queryset
with pytest.raises(FieldDoesNotExist) as excinfo:
field_lookup.value_to_python(JobEvent, 'event_data.task_action', 'foo')
assert 'has no field named' in str(excinfo)
def test_invalid_field_hop():
with pytest.raises(ParseError) as excinfo:
get_field_from_path(Credential, 'organization__description__user')
assert 'No related model for' in str(excinfo)
def test_invalid_order_by_key():
field_order_by = OrderByBackend()
with pytest.raises(ParseError) as excinfo:
[f for f in field_order_by._validate_ordering_fields(JobEvent, ('event_data.task_action',))]
assert 'has no field named' in str(excinfo)
@pytest.mark.parametrize(u"empty_value", [u'', ''])
def test_empty_in(empty_value):
field_lookup = FieldLookupBackend()