Added an awx-manage command for generating OAuth2 token.

This commit is contained in:
Yunfan Zhang 2018-06-04 15:49:04 -04:00
parent 01dcb3f733
commit 9945a3bc64
3 changed files with 94 additions and 26 deletions

View File

@ -949,7 +949,7 @@ class UserSerializer(BaseSerializer):
tokens = self.reverse('api:o_auth2_token_list', kwargs={'pk': obj.pk}),
authorized_tokens = self.reverse('api:user_authorized_token_list', kwargs={'pk': obj.pk}),
personal_tokens = self.reverse('api:o_auth2_personal_token_list', kwargs={'pk': obj.pk}),
))
return res
@ -986,7 +986,7 @@ class UserSerializer(BaseSerializer):
class UserAuthorizedTokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
@ -997,7 +997,7 @@ class UserAuthorizedTokenSerializer(BaseSerializer):
'expires', 'scope', 'application'
)
read_only_fields = ('user', 'token', 'expires')
def get_token(self, obj):
request = self.context.get('request', None)
try:
@ -1006,8 +1006,8 @@ class UserAuthorizedTokenSerializer(BaseSerializer):
else:
return TOKEN_CENSOR
except ObjectDoesNotExist:
return ''
return ''
def get_refresh_token(self, obj):
request = self.context.get('request', None)
try:
@ -1035,12 +1035,12 @@ class UserAuthorizedTokenSerializer(BaseSerializer):
access_token=obj
)
return obj
class OAuth2ApplicationSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete']
class Meta:
model = OAuth2Application
fields = (
@ -1053,15 +1053,15 @@ class OAuth2ApplicationSerializer(BaseSerializer):
'user': {'allow_null': True, 'required': False},
'organization': {'allow_null': False},
'authorization_grant_type': {'allow_null': False}
}
}
def to_representation(self, obj):
ret = super(OAuth2ApplicationSerializer, self).to_representation(obj)
if obj.client_type == 'public':
ret.pop('client_secret', None)
return ret
def get_modified(self, obj):
if obj is None:
return None
@ -1170,9 +1170,10 @@ class OAuth2TokenSerializer(BaseSerializer):
).format(self.ALLOWED_SCOPES))
return value
def create(self, validated_data):
current_user = self.context['request'].user
validated_data['user'] = current_user
def create(self, validated_data, from_command_line=False):
if not from_command_line:
current_user = self.context['request'].user
validated_data['user'] = current_user
validated_data['token'] = generate_token()
validated_data['expires'] = now() + timedelta(
seconds=settings.OAUTH2_PROVIDER['ACCESS_TOKEN_EXPIRE_SECONDS']
@ -1189,16 +1190,16 @@ class OAuth2TokenSerializer(BaseSerializer):
access_token=obj
)
return obj
class OAuth2TokenDetailSerializer(OAuth2TokenSerializer):
class Meta:
read_only_fields = ('*', 'user', 'application')
read_only_fields = ('*', 'user', 'application')
class OAuth2AuthorizedTokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
@ -1212,7 +1213,7 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
extra_kwargs = {
'scope': {'allow_null': False, 'required': True}
}
def get_token(self, obj):
request = self.context.get('request', None)
try:
@ -1221,8 +1222,8 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
else:
return TOKEN_CENSOR
except ObjectDoesNotExist:
return ''
return ''
def get_refresh_token(self, obj):
request = self.context.get('request', None)
try:
@ -1232,7 +1233,7 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
return TOKEN_CENSOR
except ObjectDoesNotExist:
return ''
def create(self, validated_data):
current_user = self.context['request'].user
validated_data['user'] = current_user
@ -1252,10 +1253,10 @@ class OAuth2AuthorizedTokenSerializer(BaseSerializer):
access_token=obj
)
return obj
class OAuth2PersonalTokenSerializer(BaseSerializer):
refresh_token = serializers.SerializerMethodField()
token = serializers.SerializerMethodField()
@ -1311,7 +1312,7 @@ class OAuth2PersonalTokenSerializer(BaseSerializer):
obj = super(OAuth2PersonalTokenSerializer, self).create(validated_data)
obj.save()
return obj
class OrganizationSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete']

View File

@ -0,0 +1,28 @@
# Django
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
# AWX
from awx.api.serializers import OAuth2TokenSerializer
class Command(BaseCommand):
"""Command that creates an OAuth2 token for a certain user. Returns the value of created token."""
help='Creates an OAuth2 token for a user.'
def add_arguments(self, parser):
parser.add_argument('--user', dest='user', type=str)
def handle(self, *args, **options):
if not options['user']:
raise CommandError('Username not supplied. Usage: awx-manage create_oauth2_token --user=username.')
try:
user = User.objects.get(username=options['user'])
except ObjectDoesNotExist:
raise CommandError('The user does not exist.')
config = {'user': user, 'scope':'write'}
serializer_obj = OAuth2TokenSerializer()
token_record = serializer_obj.create(config, True)
self.stdout.write(token_record.token)

View File

@ -0,0 +1,39 @@
import pytest
import string
import random
import StringIO
from django.contrib.auth.models import User
from django.core.management import call_command
from awx.main.models.oauth import OAuth2AccessToken
from django.core.management.base import CommandError
@pytest.mark.django_db
@pytest.mark.inventory_import
class TestOAuth2CreateCommand:
def test_no_user_option(self):
out = StringIO.StringIO()
with pytest.raises(CommandError) as excinfo:
call_command('create_oauth2_token', stdout=out)
assert 'Username not supplied.' in excinfo.value.message
out.close()
def test_non_existing_user(self):
out = StringIO.StringIO()
fake_username = ''
while fake_username == '' or User.objects.filter(username=fake_username).exists():
fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
arg = '--user=' + fake_username
with pytest.raises(CommandError) as excinfo:
call_command('create_oauth2_token', arg, stdout=out)
assert 'The user does not exist.' in excinfo.value.message
out.close()
def test_correct_user(self, alice):
out = StringIO.StringIO()
arg = '--user=' + 'alice'
call_command('create_oauth2_token', arg, stdout=out)
generated_token = out.getvalue().strip()
assert OAuth2AccessToken.objects.filter(user=alice, token=generated_token).count() == 1
assert OAuth2AccessToken.objects.get(user=alice, token=generated_token).scope == 'write'
out.close()