mirror of
https://github.com/ansible/awx.git
synced 2026-04-13 22:19:27 -02:30
Merge pull request #2044 from cdvv7788/AWX-2030
Add command to revoke tokens (https://github.com/ansible/awx/issues/2030) Reviewed-by: Cristian Vargas https://github.com/cdvv7788
This commit is contained in:
37
awx/main/management/commands/revoke_oauth2_tokens.py
Normal file
37
awx/main/management/commands/revoke_oauth2_tokens.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
# Django
|
||||||
|
from django.core.management.base import BaseCommand, CommandError
|
||||||
|
from django.contrib.auth.models import User
|
||||||
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
|
|
||||||
|
# AWX
|
||||||
|
from awx.main.models.oauth import OAuth2AccessToken
|
||||||
|
from oauth2_provider.models import RefreshToken
|
||||||
|
|
||||||
|
|
||||||
|
def revoke_tokens(token_list):
|
||||||
|
for token in token_list:
|
||||||
|
token.revoke()
|
||||||
|
print('revoked {} {}'.format(token.__class__.__name__, token.token))
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
"""Command that revokes OAuth2 access tokens."""
|
||||||
|
help='Revokes OAuth2 access tokens. Use --all to revoke access and refresh tokens.'
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
parser.add_argument('--user', dest='user', type=str, help='revoke OAuth2 tokens for a specific username')
|
||||||
|
parser.add_argument('--all', dest='all', action='store_true', help='revoke OAuth2 access tokens and refresh tokens')
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
if not options['user']:
|
||||||
|
if options['all']:
|
||||||
|
revoke_tokens(RefreshToken.objects.filter(revoked=None))
|
||||||
|
revoke_tokens(OAuth2AccessToken.objects.all())
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
user = User.objects.get(username=options['user'])
|
||||||
|
except ObjectDoesNotExist:
|
||||||
|
raise CommandError('A user with that username does not exist.')
|
||||||
|
if options['all']:
|
||||||
|
revoke_tokens(RefreshToken.objects.filter(revoked=None).filter(user=user))
|
||||||
|
revoke_tokens(user.main_oauth2accesstoken.filter(user=user))
|
||||||
@@ -0,0 +1,79 @@
|
|||||||
|
# Python
|
||||||
|
import datetime
|
||||||
|
import pytest
|
||||||
|
import string
|
||||||
|
import random
|
||||||
|
import StringIO
|
||||||
|
|
||||||
|
# Django
|
||||||
|
from django.core.management import call_command
|
||||||
|
from django.core.management.base import CommandError
|
||||||
|
|
||||||
|
# AWX
|
||||||
|
from awx.main.models import RefreshToken
|
||||||
|
from awx.main.models.oauth import OAuth2AccessToken
|
||||||
|
from awx.api.versioning import reverse
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
class TestOAuth2RevokeCommand:
|
||||||
|
|
||||||
|
def test_non_existing_user(self):
|
||||||
|
out = StringIO.StringIO()
|
||||||
|
fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
|
||||||
|
arg = '--user=' + fake_username
|
||||||
|
with pytest.raises(CommandError) as excinfo:
|
||||||
|
call_command('revoke_oauth2_tokens', arg, stdout=out)
|
||||||
|
assert 'A user with that username does not exist' in excinfo.value.message
|
||||||
|
out.close()
|
||||||
|
|
||||||
|
def test_revoke_all_access_tokens(self, post, admin, alice):
|
||||||
|
url = reverse('api:o_auth2_token_list')
|
||||||
|
for user in (admin, alice):
|
||||||
|
post(
|
||||||
|
url,
|
||||||
|
{'description': 'test token', 'scope': 'read'},
|
||||||
|
user
|
||||||
|
)
|
||||||
|
assert OAuth2AccessToken.objects.count() == 2
|
||||||
|
call_command('revoke_oauth2_tokens')
|
||||||
|
assert OAuth2AccessToken.objects.count() == 0
|
||||||
|
|
||||||
|
def test_revoke_access_token_for_user(self, post, admin, alice):
|
||||||
|
url = reverse('api:o_auth2_token_list')
|
||||||
|
post(
|
||||||
|
url,
|
||||||
|
{'description': 'test token', 'scope': 'read'},
|
||||||
|
alice
|
||||||
|
)
|
||||||
|
assert OAuth2AccessToken.objects.count() == 1
|
||||||
|
call_command('revoke_oauth2_tokens', '--user=admin')
|
||||||
|
assert OAuth2AccessToken.objects.count() == 1
|
||||||
|
call_command('revoke_oauth2_tokens', '--user=alice')
|
||||||
|
assert OAuth2AccessToken.objects.count() == 0
|
||||||
|
|
||||||
|
def test_revoke_all_refresh_tokens(self, post, admin, oauth_application):
|
||||||
|
url = reverse('api:o_auth2_token_list')
|
||||||
|
post(
|
||||||
|
url,
|
||||||
|
{
|
||||||
|
'description': 'test token for',
|
||||||
|
'scope': 'read',
|
||||||
|
'application': oauth_application.pk
|
||||||
|
},
|
||||||
|
admin
|
||||||
|
)
|
||||||
|
assert OAuth2AccessToken.objects.count() == 1
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
|
||||||
|
call_command('revoke_oauth2_tokens')
|
||||||
|
assert OAuth2AccessToken.objects.count() == 0
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
for r in RefreshToken.objects.all():
|
||||||
|
assert r.revoked is None
|
||||||
|
|
||||||
|
call_command('revoke_oauth2_tokens', '--all')
|
||||||
|
assert RefreshToken.objects.count() == 1
|
||||||
|
for r in RefreshToken.objects.all():
|
||||||
|
assert r.revoked is not None
|
||||||
|
assert isinstance(r.revoked, datetime.datetime)
|
||||||
Reference in New Issue
Block a user