mirror of
https://github.com/ansible/awx.git
synced 2026-01-14 11:20:39 -03:30
Merge pull request #1321 from AlanCoding/order_field_validate
Use field validation in both filter classes
This commit is contained in:
commit
f122fb4e1d
@ -77,6 +77,65 @@ class TypeFilterBackend(BaseFilterBackend):
|
||||
raise ParseError(*e.args)
|
||||
|
||||
|
||||
def get_field_from_path(model, path):
|
||||
'''
|
||||
Given a Django ORM lookup path (possibly over multiple models)
|
||||
Returns the last field in the line, and also the revised lookup path
|
||||
ex., given
|
||||
model=Organization
|
||||
path='project__timeout'
|
||||
returns tuple of field at the end of the line as well as a corrected
|
||||
path, for special cases we do substitutions
|
||||
(<IntegerField for timeout>, 'project__timeout')
|
||||
'''
|
||||
# Store of all the fields used to detect repeats
|
||||
field_set = set([])
|
||||
new_parts = []
|
||||
for name in path.split('__'):
|
||||
if model is None:
|
||||
raise ParseError(_('No related model for field {}.').format(name))
|
||||
# HACK: Make project and inventory source filtering by old field names work for backwards compatibility.
|
||||
if model._meta.object_name in ('Project', 'InventorySource'):
|
||||
name = {
|
||||
'current_update': 'current_job',
|
||||
'last_update': 'last_job',
|
||||
'last_update_failed': 'last_job_failed',
|
||||
'last_updated': 'last_job_run',
|
||||
}.get(name, name)
|
||||
|
||||
if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model):
|
||||
name = 'polymorphic_ctype'
|
||||
new_parts.append('polymorphic_ctype__model')
|
||||
else:
|
||||
new_parts.append(name)
|
||||
|
||||
if name in getattr(model, 'PASSWORD_FIELDS', ()):
|
||||
raise PermissionDenied(_('Filtering on password fields is not allowed.'))
|
||||
elif name == 'pk':
|
||||
field = model._meta.pk
|
||||
else:
|
||||
name_alt = name.replace("_", "")
|
||||
if name_alt in model._meta.fields_map.keys():
|
||||
field = model._meta.fields_map[name_alt]
|
||||
new_parts.pop()
|
||||
new_parts.append(name_alt)
|
||||
else:
|
||||
field = model._meta.get_field(name)
|
||||
if 'auth' in name or 'token' in name:
|
||||
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
||||
if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False):
|
||||
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
||||
elif getattr(field, '__prevent_search__', False):
|
||||
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
||||
if field in field_set:
|
||||
# Field traversed twice, could create infinite JOINs, DoSing Tower
|
||||
raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name))
|
||||
field_set.add(field)
|
||||
model = getattr(field, 'related_model', None)
|
||||
|
||||
return field, '__'.join(new_parts)
|
||||
|
||||
|
||||
class FieldLookupBackend(BaseFilterBackend):
|
||||
'''
|
||||
Filter using field lookups provided via query string parameters.
|
||||
@ -91,61 +150,23 @@ class FieldLookupBackend(BaseFilterBackend):
|
||||
'isnull', 'search')
|
||||
|
||||
def get_field_from_lookup(self, model, lookup):
|
||||
field = None
|
||||
parts = lookup.split('__')
|
||||
if parts and parts[-1] not in self.SUPPORTED_LOOKUPS:
|
||||
parts.append('exact')
|
||||
|
||||
if '__' in lookup and lookup.rsplit('__', 1)[-1] in self.SUPPORTED_LOOKUPS:
|
||||
path, suffix = lookup.rsplit('__', 1)
|
||||
else:
|
||||
path = lookup
|
||||
suffix = 'exact'
|
||||
|
||||
if not path:
|
||||
raise ParseError(_('Query string field name not provided.'))
|
||||
|
||||
# FIXME: Could build up a list of models used across relationships, use
|
||||
# those lookups combined with request.user.get_queryset(Model) to make
|
||||
# sure user cannot query using objects he could not view.
|
||||
new_parts = []
|
||||
field, new_path = get_field_from_path(model, path)
|
||||
|
||||
# Store of all the fields used to detect repeats
|
||||
field_set = set([])
|
||||
|
||||
for name in parts[:-1]:
|
||||
# HACK: Make project and inventory source filtering by old field names work for backwards compatibility.
|
||||
if model._meta.object_name in ('Project', 'InventorySource'):
|
||||
name = {
|
||||
'current_update': 'current_job',
|
||||
'last_update': 'last_job',
|
||||
'last_update_failed': 'last_job_failed',
|
||||
'last_updated': 'last_job_run',
|
||||
}.get(name, name)
|
||||
|
||||
if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model):
|
||||
name = 'polymorphic_ctype'
|
||||
new_parts.append('polymorphic_ctype__model')
|
||||
else:
|
||||
new_parts.append(name)
|
||||
|
||||
if name in getattr(model, 'PASSWORD_FIELDS', ()):
|
||||
raise PermissionDenied(_('Filtering on password fields is not allowed.'))
|
||||
elif name == 'pk':
|
||||
field = model._meta.pk
|
||||
else:
|
||||
name_alt = name.replace("_", "")
|
||||
if name_alt in model._meta.fields_map.keys():
|
||||
field = model._meta.fields_map[name_alt]
|
||||
new_parts.pop()
|
||||
new_parts.append(name_alt)
|
||||
else:
|
||||
field = model._meta.get_field(name)
|
||||
if 'auth' in name or 'token' in name:
|
||||
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
||||
if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False):
|
||||
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
||||
elif getattr(field, '__prevent_search__', False):
|
||||
raise PermissionDenied(_('Filtering on %s is not allowed.' % name))
|
||||
if field in field_set:
|
||||
# Field traversed twice, could create infinite JOINs, DoSing Tower
|
||||
raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name))
|
||||
field_set.add(field)
|
||||
model = getattr(field, 'related_model', None) or field.model
|
||||
|
||||
if parts:
|
||||
new_parts.append(parts[-1])
|
||||
new_lookup = '__'.join(new_parts)
|
||||
new_lookup = new_path
|
||||
new_lookup = '__'.join([new_path, suffix])
|
||||
return field, new_lookup
|
||||
|
||||
def to_python_related(self, value):
|
||||
@ -371,7 +392,7 @@ class OrderByBackend(BaseFilterBackend):
|
||||
else:
|
||||
order_by = (value,)
|
||||
if order_by:
|
||||
order_by = self._strip_sensitive_model_fields(queryset.model, order_by)
|
||||
order_by = self._validate_ordering_fields(queryset.model, order_by)
|
||||
|
||||
# Special handling of the type field for ordering. In this
|
||||
# case, we're not sorting exactly on the type field, but
|
||||
@ -396,15 +417,17 @@ class OrderByBackend(BaseFilterBackend):
|
||||
# Return a 400 for invalid field names.
|
||||
raise ParseError(*e.args)
|
||||
|
||||
def _strip_sensitive_model_fields(self, model, order_by):
|
||||
def _validate_ordering_fields(self, model, order_by):
|
||||
for field_name in order_by:
|
||||
# strip off the negation prefix `-` if it exists
|
||||
_field_name = field_name.split('-')[-1]
|
||||
prefix = ''
|
||||
path = field_name
|
||||
if field_name[0] == '-':
|
||||
prefix = field_name[0]
|
||||
path = field_name[1:]
|
||||
try:
|
||||
# if the field name is encrypted/sensitive, don't sort on it
|
||||
if _field_name in getattr(model, 'PASSWORD_FIELDS', ()) or \
|
||||
getattr(model._meta.get_field(_field_name), '__prevent_search__', False):
|
||||
raise ParseError(_('cannot order by field %s') % _field_name)
|
||||
except FieldDoesNotExist:
|
||||
pass
|
||||
yield field_name
|
||||
field, new_path = get_field_from_path(model, path)
|
||||
new_path = '{}{}'.format(prefix, new_path)
|
||||
except (FieldError, FieldDoesNotExist) as e:
|
||||
raise ParseError(e.args[0])
|
||||
yield new_path
|
||||
|
||||
@ -126,9 +126,8 @@ def test_list_cannot_order_by_unsearchable_field(get, organization, alice, order
|
||||
)
|
||||
custom_script.admin_role.members.add(alice)
|
||||
|
||||
response = get(reverse('api:inventory_script_list'), alice,
|
||||
QUERY_STRING='order_by=%s' % order_by, status=400)
|
||||
assert response.status_code == 400
|
||||
get(reverse('api:inventory_script_list'), alice,
|
||||
QUERY_STRING='order_by=%s' % order_by, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("role_field,expected_status_code", [
|
||||
|
||||
@ -3,14 +3,18 @@
|
||||
import pytest
|
||||
|
||||
from rest_framework.exceptions import PermissionDenied, ParseError
|
||||
from awx.api.filters import FieldLookupBackend
|
||||
from awx.api.filters import FieldLookupBackend, OrderByBackend, get_field_from_path
|
||||
from awx.main.models import (AdHocCommand, ActivityStream,
|
||||
CustomInventoryScript, Credential, Job,
|
||||
JobTemplate, SystemJob, UnifiedJob, User,
|
||||
WorkflowJob, WorkflowJobTemplate,
|
||||
WorkflowJobOptions, InventorySource)
|
||||
WorkflowJobOptions, InventorySource,
|
||||
JobEvent)
|
||||
from awx.main.models.jobs import JobOptions
|
||||
|
||||
# Django
|
||||
from django.db.models.fields import FieldDoesNotExist
|
||||
|
||||
|
||||
def test_related():
|
||||
field_lookup = FieldLookupBackend()
|
||||
@ -20,6 +24,27 @@ def test_related():
|
||||
print(new_lookup)
|
||||
|
||||
|
||||
def test_invalid_filter_key():
|
||||
field_lookup = FieldLookupBackend()
|
||||
# FieldDoesNotExist is caught and converted to ParseError by filter_queryset
|
||||
with pytest.raises(FieldDoesNotExist) as excinfo:
|
||||
field_lookup.value_to_python(JobEvent, 'event_data.task_action', 'foo')
|
||||
assert 'has no field named' in str(excinfo)
|
||||
|
||||
|
||||
def test_invalid_field_hop():
|
||||
with pytest.raises(ParseError) as excinfo:
|
||||
get_field_from_path(Credential, 'organization__description__user')
|
||||
assert 'No related model for' in str(excinfo)
|
||||
|
||||
|
||||
def test_invalid_order_by_key():
|
||||
field_order_by = OrderByBackend()
|
||||
with pytest.raises(ParseError) as excinfo:
|
||||
[f for f in field_order_by._validate_ordering_fields(JobEvent, ('event_data.task_action',))]
|
||||
assert 'has no field named' in str(excinfo)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(u"empty_value", [u'', ''])
|
||||
def test_empty_in(empty_value):
|
||||
field_lookup = FieldLookupBackend()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user