diff --git a/awx/api/filters.py b/awx/api/filters.py index 1c5a47f847..c7425f5e75 100644 --- a/awx/api/filters.py +++ b/awx/api/filters.py @@ -77,6 +77,65 @@ class TypeFilterBackend(BaseFilterBackend): raise ParseError(*e.args) +def get_field_from_path(model, path): + ''' + Given a Django ORM lookup path (possibly over multiple models) + Returns the last field in the line, and also the revised lookup path + ex., given + model=Organization + path='project__timeout' + returns tuple of field at the end of the line as well as a corrected + path, for special cases we do substitutions + (, 'project__timeout') + ''' + # Store of all the fields used to detect repeats + field_set = set([]) + new_parts = [] + for name in path.split('__'): + if model is None: + raise ParseError(_('No related model for field {}.').format(name)) + # HACK: Make project and inventory source filtering by old field names work for backwards compatibility. + if model._meta.object_name in ('Project', 'InventorySource'): + name = { + 'current_update': 'current_job', + 'last_update': 'last_job', + 'last_update_failed': 'last_job_failed', + 'last_updated': 'last_job_run', + }.get(name, name) + + if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model): + name = 'polymorphic_ctype' + new_parts.append('polymorphic_ctype__model') + else: + new_parts.append(name) + + if name in getattr(model, 'PASSWORD_FIELDS', ()): + raise PermissionDenied(_('Filtering on password fields is not allowed.')) + elif name == 'pk': + field = model._meta.pk + else: + name_alt = name.replace("_", "") + if name_alt in model._meta.fields_map.keys(): + field = model._meta.fields_map[name_alt] + new_parts.pop() + new_parts.append(name_alt) + else: + field = model._meta.get_field(name) + if 'auth' in name or 'token' in name: + raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) + if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False): + raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) + elif getattr(field, '__prevent_search__', False): + raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) + if field in field_set: + # Field traversed twice, could create infinite JOINs, DoSing Tower + raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name)) + field_set.add(field) + model = getattr(field, 'related_model', None) + + return field, '__'.join(new_parts) + + class FieldLookupBackend(BaseFilterBackend): ''' Filter using field lookups provided via query string parameters. @@ -91,61 +150,23 @@ class FieldLookupBackend(BaseFilterBackend): 'isnull', 'search') def get_field_from_lookup(self, model, lookup): - field = None - parts = lookup.split('__') - if parts and parts[-1] not in self.SUPPORTED_LOOKUPS: - parts.append('exact') + + if '__' in lookup and lookup.rsplit('__', 1)[-1] in self.SUPPORTED_LOOKUPS: + path, suffix = lookup.rsplit('__', 1) + else: + path = lookup + suffix = 'exact' + + if not path: + raise ParseError(_('Query string field name not provided.')) + # FIXME: Could build up a list of models used across relationships, use # those lookups combined with request.user.get_queryset(Model) to make # sure user cannot query using objects he could not view. - new_parts = [] + field, new_path = get_field_from_path(model, path) - # Store of all the fields used to detect repeats - field_set = set([]) - - for name in parts[:-1]: - # HACK: Make project and inventory source filtering by old field names work for backwards compatibility. - if model._meta.object_name in ('Project', 'InventorySource'): - name = { - 'current_update': 'current_job', - 'last_update': 'last_job', - 'last_update_failed': 'last_job_failed', - 'last_updated': 'last_job_run', - }.get(name, name) - - if name == 'type' and 'polymorphic_ctype' in get_all_field_names(model): - name = 'polymorphic_ctype' - new_parts.append('polymorphic_ctype__model') - else: - new_parts.append(name) - - if name in getattr(model, 'PASSWORD_FIELDS', ()): - raise PermissionDenied(_('Filtering on password fields is not allowed.')) - elif name == 'pk': - field = model._meta.pk - else: - name_alt = name.replace("_", "") - if name_alt in model._meta.fields_map.keys(): - field = model._meta.fields_map[name_alt] - new_parts.pop() - new_parts.append(name_alt) - else: - field = model._meta.get_field(name) - if 'auth' in name or 'token' in name: - raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) - if isinstance(field, ForeignObjectRel) and getattr(field.field, '__prevent_search__', False): - raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) - elif getattr(field, '__prevent_search__', False): - raise PermissionDenied(_('Filtering on %s is not allowed.' % name)) - if field in field_set: - # Field traversed twice, could create infinite JOINs, DoSing Tower - raise ParseError(_('Loops not allowed in filters, detected on field {}.').format(field.name)) - field_set.add(field) - model = getattr(field, 'related_model', None) or field.model - - if parts: - new_parts.append(parts[-1]) - new_lookup = '__'.join(new_parts) + new_lookup = new_path + new_lookup = '__'.join([new_path, suffix]) return field, new_lookup def to_python_related(self, value): @@ -371,7 +392,7 @@ class OrderByBackend(BaseFilterBackend): else: order_by = (value,) if order_by: - order_by = self._strip_sensitive_model_fields(queryset.model, order_by) + order_by = self._validate_ordering_fields(queryset.model, order_by) # Special handling of the type field for ordering. In this # case, we're not sorting exactly on the type field, but @@ -396,15 +417,17 @@ class OrderByBackend(BaseFilterBackend): # Return a 400 for invalid field names. raise ParseError(*e.args) - def _strip_sensitive_model_fields(self, model, order_by): + def _validate_ordering_fields(self, model, order_by): for field_name in order_by: # strip off the negation prefix `-` if it exists - _field_name = field_name.split('-')[-1] + prefix = '' + path = field_name + if field_name[0] == '-': + prefix = field_name[0] + path = field_name[1:] try: - # if the field name is encrypted/sensitive, don't sort on it - if _field_name in getattr(model, 'PASSWORD_FIELDS', ()) or \ - getattr(model._meta.get_field(_field_name), '__prevent_search__', False): - raise ParseError(_('cannot order by field %s') % _field_name) - except FieldDoesNotExist: - pass - yield field_name + field, new_path = get_field_from_path(model, path) + new_path = '{}{}'.format(prefix, new_path) + except (FieldError, FieldDoesNotExist) as e: + raise ParseError(e.args[0]) + yield new_path diff --git a/awx/main/tests/functional/api/test_inventory.py b/awx/main/tests/functional/api/test_inventory.py index c96bb8057c..2e4b7df63e 100644 --- a/awx/main/tests/functional/api/test_inventory.py +++ b/awx/main/tests/functional/api/test_inventory.py @@ -126,9 +126,8 @@ def test_list_cannot_order_by_unsearchable_field(get, organization, alice, order ) custom_script.admin_role.members.add(alice) - response = get(reverse('api:inventory_script_list'), alice, - QUERY_STRING='order_by=%s' % order_by, status=400) - assert response.status_code == 400 + get(reverse('api:inventory_script_list'), alice, + QUERY_STRING='order_by=%s' % order_by, expect=403) @pytest.mark.parametrize("role_field,expected_status_code", [ diff --git a/awx/main/tests/unit/api/test_filters.py b/awx/main/tests/unit/api/test_filters.py index 12ff3663a5..1a70b9716c 100644 --- a/awx/main/tests/unit/api/test_filters.py +++ b/awx/main/tests/unit/api/test_filters.py @@ -3,14 +3,18 @@ import pytest from rest_framework.exceptions import PermissionDenied, ParseError -from awx.api.filters import FieldLookupBackend +from awx.api.filters import FieldLookupBackend, OrderByBackend, get_field_from_path from awx.main.models import (AdHocCommand, ActivityStream, CustomInventoryScript, Credential, Job, JobTemplate, SystemJob, UnifiedJob, User, WorkflowJob, WorkflowJobTemplate, - WorkflowJobOptions, InventorySource) + WorkflowJobOptions, InventorySource, + JobEvent) from awx.main.models.jobs import JobOptions +# Django +from django.db.models.fields import FieldDoesNotExist + def test_related(): field_lookup = FieldLookupBackend() @@ -20,6 +24,27 @@ def test_related(): print(new_lookup) +def test_invalid_filter_key(): + field_lookup = FieldLookupBackend() + # FieldDoesNotExist is caught and converted to ParseError by filter_queryset + with pytest.raises(FieldDoesNotExist) as excinfo: + field_lookup.value_to_python(JobEvent, 'event_data.task_action', 'foo') + assert 'has no field named' in str(excinfo) + + +def test_invalid_field_hop(): + with pytest.raises(ParseError) as excinfo: + get_field_from_path(Credential, 'organization__description__user') + assert 'No related model for' in str(excinfo) + + +def test_invalid_order_by_key(): + field_order_by = OrderByBackend() + with pytest.raises(ParseError) as excinfo: + [f for f in field_order_by._validate_ordering_fields(JobEvent, ('event_data.task_action',))] + assert 'has no field named' in str(excinfo) + + @pytest.mark.parametrize(u"empty_value", [u'', '']) def test_empty_in(empty_value): field_lookup = FieldLookupBackend()