don't allow OAuth2 token creation for "external" users

see: https://github.com/ansible/tower/issues/2326
This commit is contained in:
Ryan Petrello
2018-07-10 11:51:37 -04:00
parent d9713f9b3f
commit df0e28ec65
8 changed files with 99 additions and 22 deletions

View File

@@ -11,7 +11,9 @@ from django.conf import settings
# Django OAuth Toolkit
from oauth2_provider.models import AbstractApplication, AbstractAccessToken
from oauth2_provider.generators import generate_client_secret
from oauthlib import oauth2
from awx.main.utils import get_external_account
from awx.main.fields import OAuth2ClientSecretField
@@ -123,3 +125,12 @@ class OAuth2AccessToken(AbstractAccessToken):
self.last_used = now()
self.save(update_fields=['last_used'])
return valid
def save(self, *args, **kwargs):
if self.user and settings.ALLOW_OAUTH2_FOR_EXTERNAL_USERS is False:
external_account = get_external_account(self.user)
if external_account is not None:
raise oauth2.AccessDeniedError(_(
'OAuth2 Tokens cannot be created by users associated with an external authentication provider ({})'
).format(external_account))
super(OAuth2AccessToken, self).save(*args, **kwargs)

View File

@@ -3,12 +3,14 @@ import base64
import json
from django.db import connection
from django.test.utils import override_settings
from awx.main.utils.encryption import decrypt_value, get_encryption_key
from awx.api.versioning import reverse, drf_reverse
from awx.main.models.oauth import (OAuth2Application as Application,
OAuth2AccessToken as AccessToken,
)
from awx.sso.models import UserEnterpriseAuth
from oauth2_provider.models import RefreshToken
@@ -29,6 +31,29 @@ def test_personal_access_token_creation(oauth_application, post, alice):
assert 'refresh_token' in resp_json
@pytest.mark.django_db
@pytest.mark.parametrize('allow_oauth, status', [(True, 201), (False, 403)])
def test_token_creation_disabled_for_external_accounts(oauth_application, post, alice, allow_oauth, status):
UserEnterpriseAuth(user=alice, provider='radius').save()
url = drf_reverse('api:oauth_authorization_root_view') + 'token/'
with override_settings(RADIUS_SERVER='example.org', ALLOW_OAUTH2_FOR_EXTERNAL_USERS=allow_oauth):
resp = post(
url,
data='grant_type=password&username=alice&password=alice&scope=read',
content_type='application/x-www-form-urlencoded',
HTTP_AUTHORIZATION='Basic ' + base64.b64encode(':'.join([
oauth_application.client_id, oauth_application.client_secret
])),
status=status
)
if allow_oauth:
assert AccessToken.objects.count() == 1
else:
assert 'OAuth2 Tokens cannot be created by users associated with an external authentication provider' in resp.content
assert AccessToken.objects.count() == 0
@pytest.mark.django_db
def test_pat_creation_no_default_scope(oauth_application, post, admin):
# tests that the default scope is overriden

View File

@@ -52,7 +52,7 @@ __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
'extract_ansible_vars', 'get_search_fields', 'get_system_task_capacity', 'get_cpu_capacity', 'get_mem_capacity',
'wrap_args_with_proot', 'build_proot_temp_dir', 'check_proot_installed', 'model_to_dict',
'model_instance_diff', 'timestamp_apiformat', 'parse_yaml_or_json', 'RequireDebugTrueOrTest',
'has_model_field_prefetched', 'set_environ', 'IllegalArgumentError', 'get_custom_venv_choices']
'has_model_field_prefetched', 'set_environ', 'IllegalArgumentError', 'get_custom_venv_choices', 'get_external_account']
def get_object_or_400(klass, *args, **kwargs):
@@ -1073,3 +1073,25 @@ def has_model_field_prefetched(model_obj, field_name):
# NOTE: Update this function if django internal implementation changes.
return getattr(getattr(model_obj, field_name, None),
'prefetch_cache_name', '') in getattr(model_obj, '_prefetched_objects_cache', {})
def get_external_account(user):
from django.conf import settings
from awx.conf.license import feature_enabled
account_type = None
if getattr(settings, 'AUTH_LDAP_SERVER_URI', None) and feature_enabled('ldap'):
try:
if user.pk and user.profile.ldap_dn and not user.has_usable_password():
account_type = "ldap"
except AttributeError:
pass
if (getattr(settings, 'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', None) or
getattr(settings, 'SOCIAL_AUTH_GITHUB_KEY', None) or
getattr(settings, 'SOCIAL_AUTH_GITHUB_ORG_KEY', None) or
getattr(settings, 'SOCIAL_AUTH_GITHUB_TEAM_KEY', None) or
getattr(settings, 'SOCIAL_AUTH_SAML_ENABLED_IDPS', None)) and user.social_auth.all():
account_type = "social"
if (getattr(settings, 'RADIUS_SERVER', None) or
getattr(settings, 'TACACSPLUS_HOST', None)) and user.enterprise_auth.all():
account_type = "enterprise"
return account_type