mirror of
https://github.com/ansible/awx.git
synced 2026-01-18 21:21:21 -03:30
Merge pull request #2824 from rooftopcellist/test_refresh_token
Test refresh token
This commit is contained in:
commit
03058cd1e8
@ -262,36 +262,6 @@ def test_oauth_list_user_tokens(oauth_application, post, get, admin, alice):
|
|||||||
assert response.data['count'] == 1
|
assert response.data['count'] == 1
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_refresh_accesstoken(oauth_application, post, get, delete, admin):
|
|
||||||
response = post(
|
|
||||||
reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}),
|
|
||||||
{'scope': 'read'}, admin, expect=201
|
|
||||||
)
|
|
||||||
token = AccessToken.objects.get(token=response.data['token'])
|
|
||||||
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
|
|
||||||
assert AccessToken.objects.count() == 1
|
|
||||||
assert RefreshToken.objects.count() == 1
|
|
||||||
|
|
||||||
refresh_url = drf_reverse('api:oauth_authorization_root_view') + 'token/'
|
|
||||||
response = post(
|
|
||||||
refresh_url,
|
|
||||||
data='grant_type=refresh_token&refresh_token=' + refresh_token.token,
|
|
||||||
content_type='application/x-www-form-urlencoded',
|
|
||||||
HTTP_AUTHORIZATION='Basic ' + base64.b64encode(':'.join([
|
|
||||||
oauth_application.client_id, oauth_application.client_secret
|
|
||||||
]))
|
|
||||||
)
|
|
||||||
|
|
||||||
new_token = json.loads(response._container[0])['access_token']
|
|
||||||
new_refresh_token = json.loads(response._container[0])['refresh_token']
|
|
||||||
assert token not in AccessToken.objects.all()
|
|
||||||
assert AccessToken.objects.get(token=new_token) != 0
|
|
||||||
assert RefreshToken.objects.get(token=new_refresh_token) != 0
|
|
||||||
refresh_token = RefreshToken.objects.get(token=refresh_token)
|
|
||||||
assert refresh_token.revoked
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_implicit_authorization(oauth_application, admin):
|
def test_implicit_authorization(oauth_application, admin):
|
||||||
oauth_application.client_type = 'confidential'
|
oauth_application.client_type = 'confidential'
|
||||||
@ -314,3 +284,77 @@ def test_implicit_authorization(oauth_application, admin):
|
|||||||
assert 'http://test.com' in response.url and 'access_token' in response.url
|
assert 'http://test.com' in response.url and 'access_token' in response.url
|
||||||
# Make sure no refresh token is created for app with implicit grant type.
|
# Make sure no refresh token is created for app with implicit grant type.
|
||||||
assert refresh_token_count == RefreshToken.objects.count()
|
assert refresh_token_count == RefreshToken.objects.count()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_refresh_accesstoken(oauth_application, post, get, delete, admin):
|
||||||
|
response = post(
|
||||||
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}),
|
||||||
|
{'scope': 'read'}, admin, expect=201
|
||||||
|
)
|
||||||
|
assert AccessToken.objects.count() == 1
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
token = AccessToken.objects.get(token=response.data['token'])
|
||||||
|
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
|
||||||
|
|
||||||
|
refresh_url = drf_reverse('api:oauth_authorization_root_view') + 'token/'
|
||||||
|
response = post(
|
||||||
|
refresh_url,
|
||||||
|
data='grant_type=refresh_token&refresh_token=' + refresh_token.token,
|
||||||
|
content_type='application/x-www-form-urlencoded',
|
||||||
|
HTTP_AUTHORIZATION='Basic ' + base64.b64encode(':'.join([
|
||||||
|
oauth_application.client_id, oauth_application.client_secret
|
||||||
|
]))
|
||||||
|
)
|
||||||
|
assert RefreshToken.objects.filter(token=refresh_token).exists()
|
||||||
|
original_refresh_token = RefreshToken.objects.get(token=refresh_token)
|
||||||
|
assert token not in AccessToken.objects.all()
|
||||||
|
assert AccessToken.objects.count() == 1
|
||||||
|
# the same RefreshToken remains but is marked revoked
|
||||||
|
assert RefreshToken.objects.count() == 2
|
||||||
|
new_token = json.loads(response._container[0])['access_token']
|
||||||
|
new_refresh_token = json.loads(response._container[0])['refresh_token']
|
||||||
|
assert AccessToken.objects.filter(token=new_token).count() == 1
|
||||||
|
# checks that RefreshTokens are rotated (new RefreshToken issued)
|
||||||
|
assert RefreshToken.objects.filter(token=new_refresh_token).count() == 1
|
||||||
|
assert original_refresh_token.revoked # is not None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_revoke_access_then_refreshtoken(oauth_application, post, get, delete, admin):
|
||||||
|
response = post(
|
||||||
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}),
|
||||||
|
{'scope': 'read'}, admin, expect=201
|
||||||
|
)
|
||||||
|
token = AccessToken.objects.get(token=response.data['token'])
|
||||||
|
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
|
||||||
|
assert AccessToken.objects.count() == 1
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
|
||||||
|
token.revoke()
|
||||||
|
assert AccessToken.objects.count() == 0
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
assert not refresh_token.revoked
|
||||||
|
|
||||||
|
refresh_token.revoke()
|
||||||
|
assert AccessToken.objects.count() == 0
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_revoke_refreshtoken(oauth_application, post, get, delete, admin):
|
||||||
|
response = post(
|
||||||
|
reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}),
|
||||||
|
{'scope': 'read'}, admin, expect=201
|
||||||
|
)
|
||||||
|
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
|
||||||
|
assert AccessToken.objects.count() == 1
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
|
||||||
|
refresh_token.revoke()
|
||||||
|
assert AccessToken.objects.count() == 0
|
||||||
|
# the same RefreshToken is recycled
|
||||||
|
new_refresh_token = RefreshToken.objects.all().first()
|
||||||
|
assert refresh_token == new_refresh_token
|
||||||
|
assert new_refresh_token.revoked
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user