From 30d2a4f5c9f6cca9c53bcba8c9e111b662160e8a Mon Sep 17 00:00:00 2001 From: Chris Church Date: Thu, 8 Aug 2013 15:58:42 -0400 Subject: [PATCH] For AC-331. Major updates to API filtering to allow negative filtering, use of a list of items with the __in lookup, more flexible boolean/null values, more validation of filter values, and tests. --- awx/main/filters.py | 174 +++++++++++--- awx/main/templates/main/_list_common.md | 60 ++++- awx/main/tests/base.py | 12 +- awx/main/tests/users.py | 301 ++++++++++++++++++++++++ awx/settings/defaults.py | 4 +- 5 files changed, 510 insertions(+), 41 deletions(-) diff --git a/awx/main/filters.py b/awx/main/filters.py index f44a6ec704..3191107725 100644 --- a/awx/main/filters.py +++ b/awx/main/filters.py @@ -1,53 +1,169 @@ # Copyright (c) 2013 AnsibleWorks, Inc. # All Rights Reserved. +# Python +import re + # Django -from django.core.exceptions import FieldError +from django.core.exceptions import FieldError, ValidationError +from django.db import models +from django.db.models import Q +from django.db.models.related import RelatedObject +from django.db.models.fields import FieldDoesNotExist # Django REST Framework from rest_framework.exceptions import ParseError from rest_framework.filters import BaseFilterBackend -class DefaultFilterBackend(BaseFilterBackend): +class ActiveOnlyBackend(BaseFilterBackend): + ''' + Filter to show only objects where is_active/active is True. + ''' def filter_queryset(self, request, queryset, view): - - # Filtering by is_active/active that was previously in BaseList. - qs = queryset for field in queryset.model._meta.fields: if field.name == 'is_active': - qs = qs.filter(is_active=True) + queryset = queryset.filter(is_active=True) elif field.name == 'active': - qs = qs.filter(active=True) + queryset = queryset.filter(active=True) + return queryset - # Apply filters and ordering specified via QUERY_PARAMS. +class FieldLookupBackend(BaseFilterBackend): + ''' + Filter using field lookups provided via query string parameters. + ''' + + RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by') + + SUPPORTED_LOOKUPS = ('exact', 'iexact', 'contains', 'icontains', + 'startswith', 'istartswith', 'endswith', 'iendswith', + 'regex', 'iregex', 'gt', 'gte', 'lt', 'lte', 'in', + 'isnull') + + def get_field_from_lookup(self, model, lookup): + field = None + parts = lookup.split('__') + if parts and parts[-1] not in self.SUPPORTED_LOOKUPS: + parts.append('exact') + # FIXME: Could build up a list of models used across relationships, use + # those lookups combined with request.user.get_queryset(Model) to make + # sure user cannot query using objects he could not view. + for n, name in enumerate(parts[:-1]): + if name == 'pk': + field = model._meta.pk + else: + field = model._meta.get_field_by_name(name)[0] + if n < (len(parts) - 2): + if getattr(field, 'rel', None): + model = field.rel.to + else: + model = field.model + return field + + def to_python_boolean(self, value, allow_none=False): + if value.lower() in ('true', '1'): + return True + elif value.lower() in ('false', '0'): + return False + elif allow_none and value.lower() in ('none', 'null'): + return None + else: + raise ValueError(u'Unable to convert "%s" to boolean' % unicode(value)) + + def to_python_related(self, value): + if value.lower() in ('none', 'null'): + return None + else: + return int(value) + + def value_to_python_for_field(self, field, value): + if isinstance(field, models.NullBooleanField): + return self.to_python_boolean(value, allow_none=True) + elif isinstance(field, models.BooleanField): + return self.to_python_boolean(value) + elif isinstance(field, RelatedObject): + return self.to_python_related(value) + else: + return field.to_python(value) + + def value_to_python(self, model, lookup, value): + field = self.get_field_from_lookup(model, lookup) + if lookup.endswith('__isnull'): + value = self.to_python_boolean(value) + elif lookup.endswith('__in'): + items = [] + for item in value.split(','): + items.append(self.value_to_python_for_field(field, item)) + value = items + elif lookup.endswith('__regex') or lookup.endswith('__iregex'): + try: + re.compile(value) + except re.error, e: + raise ValueError(e.args[0]) + return value + else: + value = self.value_to_python_for_field(field, value) + return value + + def filter_queryset(self, request, queryset, view): try: - + # Apply filters and excludes specified via QUERY_PARAMS. filters = {} - order_by = None - + excludes = {} for key, value in request.QUERY_PARAMS.items(): - - if key in ('page', 'page_size', 'format'): + if key in self.RESERVED_NAMES: continue + # Custom __int filter suffix (internal use only). + if key.endswith('__int'): + key = key[:-5] + value = int(value) + # Custom not__ filter prefix. + q_not = False + if key.startswith('not__'): + key = key[5:] + q_not = True + + # Convert value to python and add to the appropriate dict. + value = self.value_to_python(queryset.model, key, value) + if q_not: + excludes[key] = value + else: + filters[key] = value + if filters: + queryset = queryset.filter(**filters) + if excludes: + queryset = queryset.exclude(**excludes) + return queryset + except (FieldError, FieldDoesNotExist, ValueError), e: + raise ParseError(e.args[0]) + except ValidationError, e: + raise ParseError(e.messages) + +class OrderByBackend(BaseFilterBackend): + ''' + Filter to apply ordering based on query string parameters. + ''' + + def filter_queryset(self, request, queryset, view): + try: + order_by = None + for key, value in request.QUERY_PARAMS.items(): if key in ('order', 'order_by'): order_by = value - continue - - if key.endswith('__int'): - key = key.replace('__int', '') - value = int(value) - - filters[key] = value - - qs = qs.filter(**filters) - + if ',' in value: + order_by = value.split(',') + else: + order_by = (value,) if order_by: - qs = qs.order_by(order_by) - - except (FieldError, ValueError), e: - # Handle invalid field names or values and return a 400. + queryset = queryset.order_by(*order_by) + # Fetch the first result to run the query, otherwise we don't + # always catch the FieldError for invalid field names. + try: + queryset[0] + except IndexError: + pass + return queryset + except FieldError, e: + # Return a 400 for invalid field names. raise ParseError(*e.args) - - return qs diff --git a/awx/main/templates/main/_list_common.md b/awx/main/templates/main/_list_common.md index 89cdbfe741..ab20a762fc 100644 --- a/awx/main/templates/main/_list_common.md +++ b/awx/main/templates/main/_list_common.md @@ -31,6 +31,11 @@ Prefix the field name with a dash `-` to sort in reverse: ?order_by=-{{ order_field }} +Multiple sorting fields may be specified by separating the field names with a +comma `,`: + + ?order_by={{ order_field }},some_other_field + ## Pagination Use the `page_size` query string parameter to change the number of results @@ -45,13 +50,54 @@ string parameters automatically. ## Filtering Any additional query string parameters may be used to filter the list of -results returned to those matching a given value. Only fields that exist in -the database can be used for filtering. +results returned to those matching a given value. Only fields and relations +that exist in the database may be used for filtering. Any special characters +in the specified value should be url-encoded. For example: - ?{{ order_field }}=value + ?field=value%20xyz -Field lookups may also be used for slightly more advanced queries, for example: +Fields may also span relations, only for fields and relationships defined in +the database: - ?{{ order_field }}__startswith=A - ?{{ order_field }}__endsswith=C - ?{{ order_field }}__contains=ABC + ?other__field=value + +To exclude results matching certain criteria, prefix the field parameter with +`not__`: + + ?not__field=value + +Field lookups may also be used for more advanced queries, by appending the +lookup to the field name: + + ?field__lookup=value + +The following field lookups are supported: + +* `exact`: Exact match (default lookup if not specified). +* `iexact`: Case-insensitive version of `exact`. +* `contains`: Field contains value. +* `icontains`: Case-insensitive version of `contains`. +* `startswith`: Field starts with value. +* `istartswith`: Case-insensitive version of `startswith`. +* `endswith`: Field ends with value. +* `iendswith`: Case-insensitive version of `endswith`. +* `regex`: Field matches the given regular expression. +* `iregex`: Case-insensitive version of `regex`. +* `gt`: Greater than comparison. +* `gte`: Greater than or equal to comparison. +* `lt`: Less than comparison. +* `lte`: Less than or equal to comparison. +* `isnull`: Check whether the given field or related object is null; expects a + boolean value. +* `in`: Check whether the given field's value is present in the list provided; + expects a list of items. + +Boolean values may be specified as `True` or `1` for true, `False` or `0` for +false (both case-insensitive). + +Null values may be specified as `None` or `Null` (both case-insensitive), +though it is preferred to use the `isnull` lookup to explicitly check for null +values. + +Lists (for the `in` lookup) may be specified as a comma-separated list of +values. diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 6c290507c5..763ceeb720 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -112,7 +112,7 @@ class BaseTestMixin(object): data_ids = [x['id'] for x in data['results']] qs_ids = queryset.values_list('pk', flat=True) if check_order: - self.assertEqual(data_ids, qs_ids) + self.assertEqual(tuple(data_ids), tuple(qs_ids)) else: self.assertEqual(set(data_ids), set(qs_ids)) @@ -261,19 +261,23 @@ class BaseTestMixin(object): else: f(url, expect=401) - def check_get_list(self, url, user, qs, fields=None, expect=200): + def check_get_list(self, url, user, qs, fields=None, expect=200, + check_order=False): ''' Check that the given list view URL returns results for the given user that match the given queryset. ''' with self.current_user(user): - self.options(url, expect=expect) + if expect == 400: + self.options(url, expect=200) + else: + self.options(url, expect=expect) self.head(url, expect=expect) response = self.get(url, expect=expect) if expect != 200: return self.check_pagination_and_size(response, qs.count()) - self.check_list_ids(response, qs) + self.check_list_ids(response, qs, check_order) if fields: for obj in response['results']: self.assertTrue(set(obj.keys()) <= set(fields)) diff --git a/awx/main/tests/users.py b/awx/main/tests/users.py index 6033404672..385b8d6932 100644 --- a/awx/main/tests/users.py +++ b/awx/main/tests/users.py @@ -1,13 +1,18 @@ # Copyright (c) 2013 AnsibleWorks, Inc. # All Rights Reserved. +# Python +import datetime import json +import urllib +# Django from django.contrib.auth.models import User import django.test from django.test.client import Client from django.core.urlresolvers import reverse +# AWX from awx.main.models import * from awx.main.tests.base import BaseTest @@ -247,4 +252,300 @@ class UsersTest(BaseTest): # FIXME: add test that shows posting a user w/o id to /organizations/2/admins/ can create a new one & associate # FIXME: add test that shows posting a projects w/o id to /organizations/2/projects/ can create a new one & associate + def test_user_list_ordering(self): + base_url = reverse('main:user_list') + base_qs = User.objects.distinct() + # Check list view with ordering by name. + url = '%s?order_by=username' % base_url + qs = base_qs.order_by('username') + self.check_get_list(url, self.super_django_user, qs, check_order=True) + + # Check list view with ordering by username in reverse. + url = '%s?order=-username' % base_url + qs = base_qs.order_by('-username') + self.check_get_list(url, self.super_django_user, qs, check_order=True) + + # Check list view with multiple ordering fields. + url = '%s?order_by=-pk,username' % base_url + qs = base_qs.order_by('-pk', 'username') + self.check_get_list(url, self.super_django_user, qs, check_order=True) + + # Check list view with invalid field name. + url = '%s?order_by=invalidfieldname' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Check list view with no field name. + url = '%s?order_by=' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + def test_user_list_filtering(self): + # Also serves as general-purpose testing for custom API filters. + base_url = reverse('main:user_list') + base_qs = User.objects.distinct() + + # Filter by username. + url = '%s?username=normal' % base_url + qs = base_qs.filter(username='normal') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __exact suffix. + url = '%s?username__exact=normal' % base_url + qs = base_qs.filter(username__exact='normal') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __iexact suffix. + url = '%s?username__iexact=NORMAL' % base_url + qs = base_qs.filter(username__iexact='NORMAL') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __contains suffix. + url = '%s?username__contains=dmi' % base_url + qs = base_qs.filter(username__contains='dmi') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __icontains suffix. + url = '%s?username__icontains=DMI' % base_url + qs = base_qs.filter(username__icontains='DMI') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __startswith suffix. + url = '%s?username__startswith=no' % base_url + qs = base_qs.filter(username__startswith='no') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __istartswith suffix. + url = '%s?username__istartswith=NO' % base_url + qs = base_qs.filter(username__istartswith='NO') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __endswith suffix. + url = '%s?username__endswith=al' % base_url + qs = base_qs.filter(username__endswith='al') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __iendswith suffix. + url = '%s?username__iendswith=AL' % base_url + qs = base_qs.filter(username__iendswith='AL') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __regex suffix. + url = '%s?username__regex=%s' % (base_url, urllib.quote_plus(r'^admin|no.+$')) + qs = base_qs.filter(username__regex=r'^admin|no.+$') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __iregex suffix. + url = '%s?username__iregex=%s' % (base_url, urllib.quote_plus(r'^ADMIN|NO.+$')) + qs = base_qs.filter(username__iregex=r'^ADMIN|NO.+$') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by invalid regex value. + url = '%s?username__regex=%s' % (base_url, urllib.quote_plus('[')) + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Exclude by username. + url = '%s?not__username=normal' % base_url + qs = base_qs.exclude(username='normal') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Exclude by username with suffix. + url = '%s?not__username__startswith=no' % base_url + qs = base_qs.exclude(username__startswith='no') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by multiple username specs. + url = '%s?username__startswith=no&username__endswith=al' % base_url + qs = base_qs.filter(username__startswith='no', username__endswith='al') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by is_superuser (when True). + url = '%s?is_superuser=True' % base_url + qs = base_qs.filter(is_superuser=True) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by is_superuser (when False). + url = '%s?is_superuser=False' % base_url + qs = base_qs.filter(is_superuser=False) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by is_superuser (when 1). + url = '%s?is_superuser=1' % base_url + qs = base_qs.filter(is_superuser=True) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by is_superuser (when 0). + url = '%s?is_superuser=0' % base_url + qs = base_qs.filter(is_superuser=False) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by is_superuser (when TRUE). + url = '%s?is_superuser=TRUE' % base_url + qs = base_qs.filter(is_superuser=True) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by invalid value for boolean field. + url = '%s?is_superuser=notatbool' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter by is_staff (field not exposed via API). FIXME: Should + # eventually not be allowed! + url = '%s?is_staff=true' % base_url + qs = base_qs.filter(is_staff=True) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by pk/id + url = '%s?pk=%d' % (base_url, self.normal_django_user.pk) + qs = base_qs.filter(pk=self.normal_django_user.pk) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + url = '%s?id=%d' % (base_url, self.normal_django_user.pk) + qs = base_qs.filter(id=self.normal_django_user.pk) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by pk gt/gte/lt/lte. + url = '%s?pk__gt=0' % base_url + qs = base_qs.filter(pk__gt=0) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + url = '%s?pk__gte=0' % base_url + qs = base_qs.filter(pk__gt=0) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + url = '%s?pk__lt=999' % base_url + qs = base_qs.filter(pk__lt=999) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + url = '%s?pk__lte=999' % base_url + qs = base_qs.filter(pk__lte=999) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by invalid value for integer field. + url = '%s?pk=notanint' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter by int using custom __int suffix. + url = '%s?pk__int=%d' % (base_url, self.super_django_user.pk) + qs = base_qs.filter(pk=self.super_django_user.pk) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by related organization not present. + url = '%s?organizations=None' % base_url + qs = base_qs.filter(organizations=None) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + url = '%s?organizations__isnull=true' % base_url + qs = base_qs.filter(organizations__isnull=True) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by related organization present. + url = '%s?organizations__isnull=0' % base_url + qs = base_qs.filter(organizations__isnull=False) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by related organizations name. + url = '%s?organizations__name__startswith=org' % base_url + qs = base_qs.filter(organizations__name__startswith='org') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by related organizations admins username. + url = '%s?organizations__admins__username__startswith=norm' % base_url + qs = base_qs.filter(organizations__admins__username__startswith='norm') + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by username with __in list. + url = '%s?username__in=normal,admin' % base_url + qs = base_qs.filter(username__in=('normal', 'admin')) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by organizations with __in list. + url = '%s?organizations__in=%d,0' % (base_url, self.organizations[0].pk) + qs = base_qs.filter(organizations__in=(self.organizations[0].pk, 0)) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Exclude by organizations with __in list. + url = '%s?not__organizations__in=%d,0' % (base_url, self.organizations[0].pk) + qs = base_qs.exclude(organizations__in=(self.organizations[0].pk, 0)) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by organizations created timestamp (passing only a date). + url = '%s?organizations__created__gt=2013-01-01' % base_url + qs = base_qs.filter(organizations__created__gt=datetime.date(2013, 1, 1)) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by organizations created timestamp (passing datetime). + url = '%s?organizations__created__lt=%s' % (base_url, urllib.quote_plus('2037-03-07 12:34:56')) + qs = base_qs.filter(organizations__created__lt=datetime.datetime(2037, 3, 7, 12, 34, 56)) + self.assertTrue(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + + # Filter by organizations created timestamp (invalid datetime value). + url = '%s?organizations__created__gt=yesterday' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter by organizations created year (valid django lookup, but not + # allowed via API). + url = '%s?organizations__created__year=2013' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter by invalid field. + url = '%s?email_address=nobody@example.com' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter by invalid field across lookups. + url = '%s?organizations__users__teams__laser=green' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter by invalid relation within lookups. + url = '%s?organizations__users__llamas__name=freddie' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter by invalid query string field names. + url = '%s?__' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + url = '%s?not__' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + url = '%s?__foo' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + url = '%s?username__' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + url = '%s?username+=normal' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) + + # Filter with some unicode characters included in field name and value. + url = u'%s?username=arrr\u2620' % base_url + qs = base_qs.filter(username=u'arrr\u2620') + self.assertFalse(qs.count()) + self.check_get_list(url, self.super_django_user, qs) + url = u'%s?user\u2605name=normal' % base_url + self.check_get_list(url, self.super_django_user, base_qs, expect=400) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 40147ddbb3..ddc7df0217 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -151,7 +151,9 @@ REST_FRAMEWORK = { 'awx.main.permissions.ModelAccessPermission', ), 'DEFAULT_FILTER_BACKENDS': ( - 'awx.main.filters.DefaultFilterBackend', + 'awx.main.filters.ActiveOnlyBackend', + 'awx.main.filters.FieldLookupBackend', + 'awx.main.filters.OrderByBackend', ), 'DEFAULT_PARSER_CLASSES': ( 'rest_framework.parsers.JSONParser',