mirror of
https://github.com/ansible/awx.git
synced 2026-03-21 19:07:39 -02:30
Merge pull request #6631 from ryanpetrello/refresh-token-expiry
properly respect REFRESH_TOKEN_EXPIRE_SECONDS when generating new tokens Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
@@ -1,11 +1,15 @@
|
|||||||
# Copyright (c) 2017 Ansible, Inc.
|
# Copyright (c) 2017 Ansible, Inc.
|
||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from django.utils.timezone import now
|
||||||
|
from django.conf import settings
|
||||||
from django.conf.urls import url
|
from django.conf.urls import url
|
||||||
|
|
||||||
from oauthlib import oauth2
|
from oauthlib import oauth2
|
||||||
from oauth2_provider import views
|
from oauth2_provider import views
|
||||||
|
|
||||||
|
from awx.main.models import RefreshToken
|
||||||
from awx.api.views import (
|
from awx.api.views import (
|
||||||
ApiOAuthAuthorizationRootView,
|
ApiOAuthAuthorizationRootView,
|
||||||
)
|
)
|
||||||
@@ -14,6 +18,21 @@ from awx.api.views import (
|
|||||||
class TokenView(views.TokenView):
|
class TokenView(views.TokenView):
|
||||||
|
|
||||||
def create_token_response(self, request):
|
def create_token_response(self, request):
|
||||||
|
# Django OAuth2 Toolkit has a bug whereby refresh tokens are *never*
|
||||||
|
# properly expired (ugh):
|
||||||
|
#
|
||||||
|
# https://github.com/jazzband/django-oauth-toolkit/issues/746
|
||||||
|
#
|
||||||
|
# This code detects and auto-expires them on refresh grant
|
||||||
|
# requests.
|
||||||
|
if request.POST.get('grant_type') == 'refresh_token' and 'refresh_token' in request.POST:
|
||||||
|
refresh_token = RefreshToken.objects.filter(
|
||||||
|
token=request.POST['refresh_token']
|
||||||
|
).first()
|
||||||
|
if refresh_token:
|
||||||
|
expire_seconds = settings.OAUTH2_PROVIDER.get('REFRESH_TOKEN_EXPIRE_SECONDS', 0)
|
||||||
|
if refresh_token.created + timedelta(seconds=expire_seconds) < now():
|
||||||
|
return request.build_absolute_uri(), {}, 'The refresh token has expired.', '403'
|
||||||
try:
|
try:
|
||||||
return super(TokenView, self).create_token_response(request)
|
return super(TokenView, self).create_token_response(request)
|
||||||
except oauth2.AccessDeniedError as e:
|
except oauth2.AccessDeniedError as e:
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import pytest
|
|
||||||
import base64
|
import base64
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
from django.db import connection
|
from django.db import connection
|
||||||
from django.test.utils import override_settings
|
from django.test.utils import override_settings
|
||||||
@@ -326,6 +328,38 @@ def test_refresh_accesstoken(oauth_application, post, get, delete, admin):
|
|||||||
assert original_refresh_token.revoked # is not None
|
assert original_refresh_token.revoked # is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_refresh_token_expiration_is_respected(oauth_application, post, get, delete, admin):
|
||||||
|
response = post(
|
||||||
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}),
|
||||||
|
{'scope': 'read'}, admin, expect=201
|
||||||
|
)
|
||||||
|
assert AccessToken.objects.count() == 1
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
|
||||||
|
refresh_url = drf_reverse('api:oauth_authorization_root_view') + 'token/'
|
||||||
|
short_lived = {
|
||||||
|
'ACCESS_TOKEN_EXPIRE_SECONDS': 1,
|
||||||
|
'AUTHORIZATION_CODE_EXPIRE_SECONDS': 1,
|
||||||
|
'REFRESH_TOKEN_EXPIRE_SECONDS': 1
|
||||||
|
}
|
||||||
|
time.sleep(1)
|
||||||
|
with override_settings(OAUTH2_PROVIDER=short_lived):
|
||||||
|
response = post(
|
||||||
|
refresh_url,
|
||||||
|
data='grant_type=refresh_token&refresh_token=' + refresh_token.token,
|
||||||
|
content_type='application/x-www-form-urlencoded',
|
||||||
|
HTTP_AUTHORIZATION='Basic ' + smart_str(base64.b64encode(smart_bytes(':'.join([
|
||||||
|
oauth_application.client_id, oauth_application.client_secret
|
||||||
|
]))))
|
||||||
|
)
|
||||||
|
assert response.status_code == 403
|
||||||
|
assert b'The refresh token has expired.' in response.content
|
||||||
|
assert RefreshToken.objects.filter(token=refresh_token).exists()
|
||||||
|
assert AccessToken.objects.count() == 1
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_revoke_access_then_refreshtoken(oauth_application, post, get, delete, admin):
|
def test_revoke_access_then_refreshtoken(oauth_application, post, get, delete, admin):
|
||||||
|
|||||||
Reference in New Issue
Block a user