AC-654 Add API support for token auth using X-Auth-Token header in addition to AUthorization header, fixes issue where API requests from the UI are picking up the basic auth from an API session.

This commit is contained in:
Chris Church 2013-11-19 23:22:30 -05:00
parent 52c0a93293
commit e4851c6e18
5 changed files with 75 additions and 26 deletions

View File

@ -4,6 +4,7 @@
# Django REST Framework
from rest_framework import authentication
from rest_framework import exceptions
from rest_framework import HTTP_HEADER_ENCODING
# AWX
from awx.main.models import Job, AuthToken
@ -16,9 +17,33 @@ class TokenAuthentication(authentication.TokenAuthentication):
model = AuthToken
def _get_x_auth_token_header(self, request):
auth = request.META.get('HTTP_X_AUTH_TOKEN', '')
if type(auth) == type(''):
# Work around django test client oddness
auth = auth.encode(HTTP_HEADER_ENCODING)
return auth
def authenticate(self, request):
self.request = request
return super(TokenAuthentication, self).authenticate(request)
# Prefer the custom X-Auth-Token header over the Authorization header,
# to handle cases where the browser submits saved Basic auth and
# overrides the UI's normal use of the Authorization header.
auth = self._get_x_auth_token_header(request).split()
if not auth or auth[0].lower() != 'token':
auth = authentication.get_authorization_header(request).split()
if not auth or auth[0].lower() != 'token':
return None
if len(auth) == 1:
msg = 'Invalid token header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid token header. Token string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(auth[1])
def authenticate_credentials(self, key):
try:

View File

@ -85,7 +85,12 @@ class APIView(views.APIView):
continue
if resp_hdr.split()[0] and resp_hdr.split()[0] == req_hdr.split()[0]:
return resp_hdr
return super(APIView, self).get_authenticate_header(request)
# If it can't be determined from the request, use the last
# authenticator (should be Basic).
try:
return authenticator.authenticate_header(request)
except NameError:
pass
def get_description_context(self):
return {

View File

@ -172,18 +172,29 @@ class BaseTestMixin(object):
client_kwargs['HTTP_ACCEPT'] = accept
if remote_addr is not None:
client_kwargs['REMOTE_ADDR'] = remote_addr
client = Client(**client_kwargs)
auth = auth or self._current_auth
if auth:
if isinstance(auth, (list, tuple)):
# Dict is only used to test case when both Authorization and
# X-Auth-Token headers are passed.
if isinstance(auth, dict):
basic = auth.get('basic', ())
if basic:
basic_auth = base64.b64encode('%s:%s' % (basic[0], basic[1]))
basic_auth = basic_auth.decode('ascii')
client_kwargs['HTTP_AUTHORIZATION'] = 'Basic %s' % basic_auth
token = auth.get('token', '')
if token and not basic:
client_kwargs['HTTP_AUTHORIZATION'] = 'Token %s' % token
elif token:
client_kwargs['HTTP_X_AUTH_TOKEN'] = 'Token %s' % token
elif isinstance(auth, (list, tuple)):
#client.login(username=auth[0], password=auth[1])
basic_auth = base64.b64encode('%s:%s' % (auth[0], auth[1]))
basic_auth = basic_auth.decode('ascii')
client_kwargs['HTTP_AUTHORIZATION'] = 'Basic %s' % basic_auth
client = Client(**client_kwargs)
elif isinstance(auth, basestring):
client_kwargs['HTTP_AUTHORIZATION'] = 'Token %s' % auth
client = Client(**client_kwargs)
client = Client(**client_kwargs)
method = getattr(client, method_name)
response = None
if data is not None:

View File

@ -47,6 +47,7 @@ class UsersTest(BaseTest):
def test_auth_token_login(self):
auth_token_url = reverse('api:auth_token_view')
user_me_url = reverse('api:user_me_list')
# Always returns a 405 for any GET request, regardless of credentials.
self.get(auth_token_url, expect=405, auth=None)
@ -69,16 +70,26 @@ class UsersTest(BaseTest):
auth_token = response['token']
# Verify we can access our own user information with the auth token.
response = self.get(reverse('api:user_me_list'), expect=200,
auth=auth_token)
response = self.get(user_me_url, expect=200, auth=auth_token)
self.assertEquals(response['results'][0]['username'], 'normal')
self.assertEquals(response['count'], 1)
# If basic auth is passed via the Authorization header and the UI also
# passes token auth via the X-Auth-Token header, the API should favor
# the X-Auth-Token value.
mixed_auth = {
'basic': self.get_super_credentials(),
'token': auth_token,
}
response = self.get(user_me_url, expect=200, auth=mixed_auth)
self.assertEquals(response['results'][0]['username'], 'normal')
self.assertEquals(response['count'], 1)
# If we simulate a different remote address, should not be able to use
# the first auth token.
remote_addr = '127.0.0.2'
response = self.get(reverse('api:user_me_list'), expect=401,
auth=auth_token, remote_addr=remote_addr)
response = self.get(user_me_url, expect=401, auth=auth_token,
remote_addr=remote_addr)
self.assertEqual(response['detail'], 'Invalid token')
# The WWW-Authenticate header should specify Token auth, since that
@ -97,45 +108,42 @@ class UsersTest(BaseTest):
# Verify we can access our own user information with the second auth
# token from the other remote address.
response = self.get(reverse('api:user_me_list'), expect=200,
auth=auth_token2, remote_addr=remote_addr)
response = self.get(user_me_url, expect=200, auth=auth_token2,
remote_addr=remote_addr)
self.assertEquals(response['results'][0]['username'], 'normal')
self.assertEquals(response['count'], 1)
# The second auth token also can't be used from the first address, but
# the first auth token is still valid from its address.
response = self.get(reverse('api:user_me_list'), expect=401,
auth=auth_token2)
response = self.get(user_me_url, expect=401, auth=auth_token2)
self.assertEqual(response['detail'], 'Invalid token')
response_header = response.response.get('WWW-Authenticate', '')
self.assertEqual(response_header.split()[0], 'Token')
response = self.get(reverse('api:user_me_list'), expect=200,
auth=auth_token)
response = self.get(user_me_url, expect=200, auth=auth_token)
# A request without authentication should ask for Basic by default.
response = self.get(reverse('api:user_me_list'), expect=401)
response = self.get(user_me_url, expect=401)
response_header = response.response.get('WWW-Authenticate', '')
self.assertEqual(response_header.split()[0], 'Basic')
# A request that attempts Basic auth should request Basic auth again.
response = self.get(reverse('api:user_me_list'), expect=401,
auth=('invalid', 'password'))
response = self.get(user_me_url, expect=401,
auth=('invalid', 'password'))
response_header = response.response.get('WWW-Authenticate', '')
self.assertEqual(response_header.split()[0], 'Basic')
# Invalidate a key (simulate expiration), now token auth should fail
# with the first token, but still work with the second.
self.normal_django_user.auth_tokens.get(key=auth_token).invalidate()
response = self.get(reverse('api:user_me_list'), expect=401,
auth=auth_token)
response = self.get(user_me_url, expect=401, auth=auth_token)
self.assertEqual(response['detail'], 'Token is expired')
response = self.get(reverse('api:user_me_list'), expect=200,
auth=auth_token2, remote_addr=remote_addr)
response = self.get(user_me_url, expect=200, auth=auth_token2,
remote_addr=remote_addr)
# Token auth should be denied if the user is inactive.
self.normal_django_user.mark_inactive()
response = self.get(reverse('api:user_me_list'), expect=401,
auth=auth_token2, remote_addr=remote_addr)
response = self.get(user_me_url, expect=401, auth=auth_token2,
remote_addr=remote_addr)
self.assertEqual(response['detail'], 'User inactive or deleted')
def test_ordinary_user_can_modify_some_fields_about_himself_but_not_all_and_passwords_work(self):

View File

@ -143,8 +143,8 @@ REST_FRAMEWORK = {
'PAGINATE_BY': 25,
'PAGINATE_BY_PARAM': 'page_size',
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework.authentication.BasicAuthentication',
'awx.api.authentication.TokenAuthentication',
'rest_framework.authentication.BasicAuthentication',
#'rest_framework.authentication.SessionAuthentication',
),
'DEFAULT_PERMISSION_CLASSES': (