For AC-331. Major updates to API filtering to allow negative filtering, use of a list of items with the __in lookup, more flexible boolean/null values, more validation of filter values, and tests.

This commit is contained in:
Chris Church
2013-08-08 15:58:42 -04:00
parent b9c8002409
commit 30d2a4f5c9
5 changed files with 510 additions and 41 deletions

View File

@@ -1,53 +1,169 @@
# Copyright (c) 2013 AnsibleWorks, Inc. # Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved. # All Rights Reserved.
# Python
import re
# Django # Django
from django.core.exceptions import FieldError from django.core.exceptions import FieldError, ValidationError
from django.db import models
from django.db.models import Q
from django.db.models.related import RelatedObject
from django.db.models.fields import FieldDoesNotExist
# Django REST Framework # Django REST Framework
from rest_framework.exceptions import ParseError from rest_framework.exceptions import ParseError
from rest_framework.filters import BaseFilterBackend from rest_framework.filters import BaseFilterBackend
class DefaultFilterBackend(BaseFilterBackend): class ActiveOnlyBackend(BaseFilterBackend):
'''
Filter to show only objects where is_active/active is True.
'''
def filter_queryset(self, request, queryset, view): def filter_queryset(self, request, queryset, view):
# Filtering by is_active/active that was previously in BaseList.
qs = queryset
for field in queryset.model._meta.fields: for field in queryset.model._meta.fields:
if field.name == 'is_active': if field.name == 'is_active':
qs = qs.filter(is_active=True) queryset = queryset.filter(is_active=True)
elif field.name == 'active': elif field.name == 'active':
qs = qs.filter(active=True) queryset = queryset.filter(active=True)
return queryset
# Apply filters and ordering specified via QUERY_PARAMS. class FieldLookupBackend(BaseFilterBackend):
'''
Filter using field lookups provided via query string parameters.
'''
RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by')
SUPPORTED_LOOKUPS = ('exact', 'iexact', 'contains', 'icontains',
'startswith', 'istartswith', 'endswith', 'iendswith',
'regex', 'iregex', 'gt', 'gte', 'lt', 'lte', 'in',
'isnull')
def get_field_from_lookup(self, model, lookup):
field = None
parts = lookup.split('__')
if parts and parts[-1] not in self.SUPPORTED_LOOKUPS:
parts.append('exact')
# FIXME: Could build up a list of models used across relationships, use
# those lookups combined with request.user.get_queryset(Model) to make
# sure user cannot query using objects he could not view.
for n, name in enumerate(parts[:-1]):
if name == 'pk':
field = model._meta.pk
else:
field = model._meta.get_field_by_name(name)[0]
if n < (len(parts) - 2):
if getattr(field, 'rel', None):
model = field.rel.to
else:
model = field.model
return field
def to_python_boolean(self, value, allow_none=False):
if value.lower() in ('true', '1'):
return True
elif value.lower() in ('false', '0'):
return False
elif allow_none and value.lower() in ('none', 'null'):
return None
else:
raise ValueError(u'Unable to convert "%s" to boolean' % unicode(value))
def to_python_related(self, value):
if value.lower() in ('none', 'null'):
return None
else:
return int(value)
def value_to_python_for_field(self, field, value):
if isinstance(field, models.NullBooleanField):
return self.to_python_boolean(value, allow_none=True)
elif isinstance(field, models.BooleanField):
return self.to_python_boolean(value)
elif isinstance(field, RelatedObject):
return self.to_python_related(value)
else:
return field.to_python(value)
def value_to_python(self, model, lookup, value):
field = self.get_field_from_lookup(model, lookup)
if lookup.endswith('__isnull'):
value = self.to_python_boolean(value)
elif lookup.endswith('__in'):
items = []
for item in value.split(','):
items.append(self.value_to_python_for_field(field, item))
value = items
elif lookup.endswith('__regex') or lookup.endswith('__iregex'):
try:
re.compile(value)
except re.error, e:
raise ValueError(e.args[0])
return value
else:
value = self.value_to_python_for_field(field, value)
return value
def filter_queryset(self, request, queryset, view):
try: try:
# Apply filters and excludes specified via QUERY_PARAMS.
filters = {} filters = {}
order_by = None excludes = {}
for key, value in request.QUERY_PARAMS.items(): for key, value in request.QUERY_PARAMS.items():
if key in self.RESERVED_NAMES:
if key in ('page', 'page_size', 'format'):
continue continue
# Custom __int filter suffix (internal use only).
if key.endswith('__int'):
key = key[:-5]
value = int(value)
# Custom not__ filter prefix.
q_not = False
if key.startswith('not__'):
key = key[5:]
q_not = True
# Convert value to python and add to the appropriate dict.
value = self.value_to_python(queryset.model, key, value)
if q_not:
excludes[key] = value
else:
filters[key] = value
if filters:
queryset = queryset.filter(**filters)
if excludes:
queryset = queryset.exclude(**excludes)
return queryset
except (FieldError, FieldDoesNotExist, ValueError), e:
raise ParseError(e.args[0])
except ValidationError, e:
raise ParseError(e.messages)
class OrderByBackend(BaseFilterBackend):
'''
Filter to apply ordering based on query string parameters.
'''
def filter_queryset(self, request, queryset, view):
try:
order_by = None
for key, value in request.QUERY_PARAMS.items():
if key in ('order', 'order_by'): if key in ('order', 'order_by'):
order_by = value order_by = value
continue if ',' in value:
order_by = value.split(',')
if key.endswith('__int'): else:
key = key.replace('__int', '') order_by = (value,)
value = int(value)
filters[key] = value
qs = qs.filter(**filters)
if order_by: if order_by:
qs = qs.order_by(order_by) queryset = queryset.order_by(*order_by)
# Fetch the first result to run the query, otherwise we don't
except (FieldError, ValueError), e: # always catch the FieldError for invalid field names.
# Handle invalid field names or values and return a 400. try:
queryset[0]
except IndexError:
pass
return queryset
except FieldError, e:
# Return a 400 for invalid field names.
raise ParseError(*e.args) raise ParseError(*e.args)
return qs

View File

@@ -31,6 +31,11 @@ Prefix the field name with a dash `-` to sort in reverse:
?order_by=-{{ order_field }} ?order_by=-{{ order_field }}
Multiple sorting fields may be specified by separating the field names with a
comma `,`:
?order_by={{ order_field }},some_other_field
## Pagination ## Pagination
Use the `page_size` query string parameter to change the number of results Use the `page_size` query string parameter to change the number of results
@@ -45,13 +50,54 @@ string parameters automatically.
## Filtering ## Filtering
Any additional query string parameters may be used to filter the list of Any additional query string parameters may be used to filter the list of
results returned to those matching a given value. Only fields that exist in results returned to those matching a given value. Only fields and relations
the database can be used for filtering. that exist in the database may be used for filtering. Any special characters
in the specified value should be url-encoded. For example:
?{{ order_field }}=value ?field=value%20xyz
Field lookups may also be used for slightly more advanced queries, for example: Fields may also span relations, only for fields and relationships defined in
the database:
?{{ order_field }}__startswith=A ?other__field=value
?{{ order_field }}__endsswith=C
?{{ order_field }}__contains=ABC To exclude results matching certain criteria, prefix the field parameter with
`not__`:
?not__field=value
Field lookups may also be used for more advanced queries, by appending the
lookup to the field name:
?field__lookup=value
The following field lookups are supported:
* `exact`: Exact match (default lookup if not specified).
* `iexact`: Case-insensitive version of `exact`.
* `contains`: Field contains value.
* `icontains`: Case-insensitive version of `contains`.
* `startswith`: Field starts with value.
* `istartswith`: Case-insensitive version of `startswith`.
* `endswith`: Field ends with value.
* `iendswith`: Case-insensitive version of `endswith`.
* `regex`: Field matches the given regular expression.
* `iregex`: Case-insensitive version of `regex`.
* `gt`: Greater than comparison.
* `gte`: Greater than or equal to comparison.
* `lt`: Less than comparison.
* `lte`: Less than or equal to comparison.
* `isnull`: Check whether the given field or related object is null; expects a
boolean value.
* `in`: Check whether the given field's value is present in the list provided;
expects a list of items.
Boolean values may be specified as `True` or `1` for true, `False` or `0` for
false (both case-insensitive).
Null values may be specified as `None` or `Null` (both case-insensitive),
though it is preferred to use the `isnull` lookup to explicitly check for null
values.
Lists (for the `in` lookup) may be specified as a comma-separated list of
values.

View File

@@ -112,7 +112,7 @@ class BaseTestMixin(object):
data_ids = [x['id'] for x in data['results']] data_ids = [x['id'] for x in data['results']]
qs_ids = queryset.values_list('pk', flat=True) qs_ids = queryset.values_list('pk', flat=True)
if check_order: if check_order:
self.assertEqual(data_ids, qs_ids) self.assertEqual(tuple(data_ids), tuple(qs_ids))
else: else:
self.assertEqual(set(data_ids), set(qs_ids)) self.assertEqual(set(data_ids), set(qs_ids))
@@ -261,19 +261,23 @@ class BaseTestMixin(object):
else: else:
f(url, expect=401) f(url, expect=401)
def check_get_list(self, url, user, qs, fields=None, expect=200): def check_get_list(self, url, user, qs, fields=None, expect=200,
check_order=False):
''' '''
Check that the given list view URL returns results for the given user Check that the given list view URL returns results for the given user
that match the given queryset. that match the given queryset.
''' '''
with self.current_user(user): with self.current_user(user):
self.options(url, expect=expect) if expect == 400:
self.options(url, expect=200)
else:
self.options(url, expect=expect)
self.head(url, expect=expect) self.head(url, expect=expect)
response = self.get(url, expect=expect) response = self.get(url, expect=expect)
if expect != 200: if expect != 200:
return return
self.check_pagination_and_size(response, qs.count()) self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs) self.check_list_ids(response, qs, check_order)
if fields: if fields:
for obj in response['results']: for obj in response['results']:
self.assertTrue(set(obj.keys()) <= set(fields)) self.assertTrue(set(obj.keys()) <= set(fields))

View File

@@ -1,13 +1,18 @@
# Copyright (c) 2013 AnsibleWorks, Inc. # Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved. # All Rights Reserved.
# Python
import datetime
import json import json
import urllib
# Django
from django.contrib.auth.models import User from django.contrib.auth.models import User
import django.test import django.test
from django.test.client import Client from django.test.client import Client
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * from awx.main.models import *
from awx.main.tests.base import BaseTest from awx.main.tests.base import BaseTest
@@ -247,4 +252,300 @@ class UsersTest(BaseTest):
# FIXME: add test that shows posting a user w/o id to /organizations/2/admins/ can create a new one & associate # FIXME: add test that shows posting a user w/o id to /organizations/2/admins/ can create a new one & associate
# FIXME: add test that shows posting a projects w/o id to /organizations/2/projects/ can create a new one & associate # FIXME: add test that shows posting a projects w/o id to /organizations/2/projects/ can create a new one & associate
def test_user_list_ordering(self):
base_url = reverse('main:user_list')
base_qs = User.objects.distinct()
# Check list view with ordering by name.
url = '%s?order_by=username' % base_url
qs = base_qs.order_by('username')
self.check_get_list(url, self.super_django_user, qs, check_order=True)
# Check list view with ordering by username in reverse.
url = '%s?order=-username' % base_url
qs = base_qs.order_by('-username')
self.check_get_list(url, self.super_django_user, qs, check_order=True)
# Check list view with multiple ordering fields.
url = '%s?order_by=-pk,username' % base_url
qs = base_qs.order_by('-pk', 'username')
self.check_get_list(url, self.super_django_user, qs, check_order=True)
# Check list view with invalid field name.
url = '%s?order_by=invalidfieldname' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Check list view with no field name.
url = '%s?order_by=' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
def test_user_list_filtering(self):
# Also serves as general-purpose testing for custom API filters.
base_url = reverse('main:user_list')
base_qs = User.objects.distinct()
# Filter by username.
url = '%s?username=normal' % base_url
qs = base_qs.filter(username='normal')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __exact suffix.
url = '%s?username__exact=normal' % base_url
qs = base_qs.filter(username__exact='normal')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __iexact suffix.
url = '%s?username__iexact=NORMAL' % base_url
qs = base_qs.filter(username__iexact='NORMAL')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __contains suffix.
url = '%s?username__contains=dmi' % base_url
qs = base_qs.filter(username__contains='dmi')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __icontains suffix.
url = '%s?username__icontains=DMI' % base_url
qs = base_qs.filter(username__icontains='DMI')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __startswith suffix.
url = '%s?username__startswith=no' % base_url
qs = base_qs.filter(username__startswith='no')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __istartswith suffix.
url = '%s?username__istartswith=NO' % base_url
qs = base_qs.filter(username__istartswith='NO')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __endswith suffix.
url = '%s?username__endswith=al' % base_url
qs = base_qs.filter(username__endswith='al')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __iendswith suffix.
url = '%s?username__iendswith=AL' % base_url
qs = base_qs.filter(username__iendswith='AL')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __regex suffix.
url = '%s?username__regex=%s' % (base_url, urllib.quote_plus(r'^admin|no.+$'))
qs = base_qs.filter(username__regex=r'^admin|no.+$')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __iregex suffix.
url = '%s?username__iregex=%s' % (base_url, urllib.quote_plus(r'^ADMIN|NO.+$'))
qs = base_qs.filter(username__iregex=r'^ADMIN|NO.+$')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by invalid regex value.
url = '%s?username__regex=%s' % (base_url, urllib.quote_plus('['))
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Exclude by username.
url = '%s?not__username=normal' % base_url
qs = base_qs.exclude(username='normal')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Exclude by username with suffix.
url = '%s?not__username__startswith=no' % base_url
qs = base_qs.exclude(username__startswith='no')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by multiple username specs.
url = '%s?username__startswith=no&username__endswith=al' % base_url
qs = base_qs.filter(username__startswith='no', username__endswith='al')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by is_superuser (when True).
url = '%s?is_superuser=True' % base_url
qs = base_qs.filter(is_superuser=True)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by is_superuser (when False).
url = '%s?is_superuser=False' % base_url
qs = base_qs.filter(is_superuser=False)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by is_superuser (when 1).
url = '%s?is_superuser=1' % base_url
qs = base_qs.filter(is_superuser=True)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by is_superuser (when 0).
url = '%s?is_superuser=0' % base_url
qs = base_qs.filter(is_superuser=False)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by is_superuser (when TRUE).
url = '%s?is_superuser=TRUE' % base_url
qs = base_qs.filter(is_superuser=True)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by invalid value for boolean field.
url = '%s?is_superuser=notatbool' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by is_staff (field not exposed via API). FIXME: Should
# eventually not be allowed!
url = '%s?is_staff=true' % base_url
qs = base_qs.filter(is_staff=True)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by pk/id
url = '%s?pk=%d' % (base_url, self.normal_django_user.pk)
qs = base_qs.filter(pk=self.normal_django_user.pk)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
url = '%s?id=%d' % (base_url, self.normal_django_user.pk)
qs = base_qs.filter(id=self.normal_django_user.pk)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by pk gt/gte/lt/lte.
url = '%s?pk__gt=0' % base_url
qs = base_qs.filter(pk__gt=0)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
url = '%s?pk__gte=0' % base_url
qs = base_qs.filter(pk__gt=0)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
url = '%s?pk__lt=999' % base_url
qs = base_qs.filter(pk__lt=999)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
url = '%s?pk__lte=999' % base_url
qs = base_qs.filter(pk__lte=999)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by invalid value for integer field.
url = '%s?pk=notanint' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by int using custom __int suffix.
url = '%s?pk__int=%d' % (base_url, self.super_django_user.pk)
qs = base_qs.filter(pk=self.super_django_user.pk)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by related organization not present.
url = '%s?organizations=None' % base_url
qs = base_qs.filter(organizations=None)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
url = '%s?organizations__isnull=true' % base_url
qs = base_qs.filter(organizations__isnull=True)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by related organization present.
url = '%s?organizations__isnull=0' % base_url
qs = base_qs.filter(organizations__isnull=False)
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by related organizations name.
url = '%s?organizations__name__startswith=org' % base_url
qs = base_qs.filter(organizations__name__startswith='org')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by related organizations admins username.
url = '%s?organizations__admins__username__startswith=norm' % base_url
qs = base_qs.filter(organizations__admins__username__startswith='norm')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by username with __in list.
url = '%s?username__in=normal,admin' % base_url
qs = base_qs.filter(username__in=('normal', 'admin'))
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by organizations with __in list.
url = '%s?organizations__in=%d,0' % (base_url, self.organizations[0].pk)
qs = base_qs.filter(organizations__in=(self.organizations[0].pk, 0))
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Exclude by organizations with __in list.
url = '%s?not__organizations__in=%d,0' % (base_url, self.organizations[0].pk)
qs = base_qs.exclude(organizations__in=(self.organizations[0].pk, 0))
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by organizations created timestamp (passing only a date).
url = '%s?organizations__created__gt=2013-01-01' % base_url
qs = base_qs.filter(organizations__created__gt=datetime.date(2013, 1, 1))
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by organizations created timestamp (passing datetime).
url = '%s?organizations__created__lt=%s' % (base_url, urllib.quote_plus('2037-03-07 12:34:56'))
qs = base_qs.filter(organizations__created__lt=datetime.datetime(2037, 3, 7, 12, 34, 56))
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
# Filter by organizations created timestamp (invalid datetime value).
url = '%s?organizations__created__gt=yesterday' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by organizations created year (valid django lookup, but not
# allowed via API).
url = '%s?organizations__created__year=2013' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by invalid field.
url = '%s?email_address=nobody@example.com' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by invalid field across lookups.
url = '%s?organizations__users__teams__laser=green' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by invalid relation within lookups.
url = '%s?organizations__users__llamas__name=freddie' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by invalid query string field names.
url = '%s?__' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
url = '%s?not__' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
url = '%s?__foo' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
url = '%s?username__' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
url = '%s?username+=normal' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter with some unicode characters included in field name and value.
url = u'%s?username=arrr\u2620' % base_url
qs = base_qs.filter(username=u'arrr\u2620')
self.assertFalse(qs.count())
self.check_get_list(url, self.super_django_user, qs)
url = u'%s?user\u2605name=normal' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)

View File

@@ -151,7 +151,9 @@ REST_FRAMEWORK = {
'awx.main.permissions.ModelAccessPermission', 'awx.main.permissions.ModelAccessPermission',
), ),
'DEFAULT_FILTER_BACKENDS': ( 'DEFAULT_FILTER_BACKENDS': (
'awx.main.filters.DefaultFilterBackend', 'awx.main.filters.ActiveOnlyBackend',
'awx.main.filters.FieldLookupBackend',
'awx.main.filters.OrderByBackend',
), ),
'DEFAULT_PARSER_CLASSES': ( 'DEFAULT_PARSER_CLASSES': (
'rest_framework.parsers.JSONParser', 'rest_framework.parsers.JSONParser',