From 52d46c88e47fe3078e64b11420a8ca64e8106c59 Mon Sep 17 00:00:00 2001 From: Christian Adams Date: Tue, 28 Feb 2023 13:44:34 -0500 Subject: [PATCH] External users should not be able to change their password (#13491) * Azure AD users should not be able to change their password * Multiple auth changes Moving get_external_user function into awx.sso.common Altering get_external_user to not look at current config, just user object values Altering how api/conf.py detects external auth config (and making reusable function in awx.sso.common) Altering logic in api.serializers in _update_pasword to use awx.sso.common * Adding unit tests --------- Co-authored-by: John Westcott IV --- awx/api/conf.py | 17 +---- awx/api/serializers.py | 22 +----- awx/main/models/oauth.py | 2 +- awx/main/utils/common.py | 24 ------ awx/sso/common.py | 42 +++++++++++ awx/sso/tests/functional/test_common.py | 99 ++++++++++++++++++++++++- 6 files changed, 148 insertions(+), 58 deletions(-) diff --git a/awx/api/conf.py b/awx/api/conf.py index b9c2ee701a..0697f40c56 100644 --- a/awx/api/conf.py +++ b/awx/api/conf.py @@ -1,5 +1,4 @@ # Django -from django.conf import settings from django.utils.translation import gettext_lazy as _ # Django REST Framework @@ -9,6 +8,7 @@ from rest_framework import serializers from awx.conf import fields, register, register_validate from awx.api.fields import OAuth2ProviderField from oauth2_provider.settings import oauth2_settings +from awx.sso.common import is_remote_auth_enabled register( @@ -108,19 +108,8 @@ register( def authentication_validate(serializer, attrs): - remote_auth_settings = [ - 'AUTH_LDAP_SERVER_URI', - 'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', - 'SOCIAL_AUTH_GITHUB_KEY', - 'SOCIAL_AUTH_GITHUB_ORG_KEY', - 'SOCIAL_AUTH_GITHUB_TEAM_KEY', - 'SOCIAL_AUTH_SAML_ENABLED_IDPS', - 'RADIUS_SERVER', - 'TACACSPLUS_HOST', - ] - if attrs.get('DISABLE_LOCAL_AUTH', False): - if not any(getattr(settings, s, None) for s in remote_auth_settings): - raise serializers.ValidationError(_("There are no remote authentication systems configured.")) + if attrs.get('DISABLE_LOCAL_AUTH', False) and not is_remote_auth_enabled(): + raise serializers.ValidationError(_("There are no remote authentication systems configured.")) return attrs diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 9e617c3a14..7dad4459fa 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -108,7 +108,6 @@ from awx.main.utils import ( extract_ansible_vars, encrypt_dict, prefetch_page_capabilities, - get_external_account, truncate_stdout, ) from awx.main.utils.filters import SmartFilter @@ -124,6 +123,8 @@ from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, Ver # AWX Utils from awx.api.validators import HostnameRegexValidator +from awx.sso.common import get_external_account + logger = logging.getLogger('awx.api.serializers') # Fields that should be summarized regardless of object type. @@ -987,23 +988,8 @@ class UserSerializer(BaseSerializer): def _update_password(self, obj, new_password): # For now we're not raising an error, just not saving password for # users managed by LDAP who already have an unusable password set. - if getattr(settings, 'AUTH_LDAP_SERVER_URI', None): - try: - if obj.pk and obj.profile.ldap_dn and not obj.has_usable_password(): - new_password = None - except AttributeError: - pass - if ( - getattr(settings, 'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_GITHUB_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_GITHUB_ORG_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_GITHUB_TEAM_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_SAML_ENABLED_IDPS', None) - ) and obj.social_auth.all(): - new_password = None - if (getattr(settings, 'RADIUS_SERVER', None) or getattr(settings, 'TACACSPLUS_HOST', None)) and obj.enterprise_auth.all(): - new_password = None - if new_password: + # Get external password will return something like ldap or enterprise or None if the user isn't external. We only want to allow a password update for a None option + if new_password and not self.get_external_account(obj): obj.set_password(new_password) obj.save(update_fields=['password']) diff --git a/awx/main/models/oauth.py b/awx/main/models/oauth.py index c9927f78bd..fbd7772119 100644 --- a/awx/main/models/oauth.py +++ b/awx/main/models/oauth.py @@ -14,7 +14,7 @@ from oauth2_provider.models import AbstractApplication, AbstractAccessToken from oauth2_provider.generators import generate_client_secret from oauthlib import oauth2 -from awx.main.utils import get_external_account +from awx.sso.common import get_external_account from awx.main.fields import OAuth2ClientSecretField diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index dedd02f995..26920535dc 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -80,7 +80,6 @@ __all__ = [ 'set_environ', 'IllegalArgumentError', 'get_custom_venv_choices', - 'get_external_account', 'ScheduleTaskManager', 'ScheduleDependencyManager', 'ScheduleWorkflowManager', @@ -1089,29 +1088,6 @@ def has_model_field_prefetched(model_obj, field_name): return getattr(getattr(model_obj, field_name, None), 'prefetch_cache_name', '') in getattr(model_obj, '_prefetched_objects_cache', {}) -def get_external_account(user): - from django.conf import settings - - account_type = None - if getattr(settings, 'AUTH_LDAP_SERVER_URI', None): - try: - if user.pk and user.profile.ldap_dn and not user.has_usable_password(): - account_type = "ldap" - except AttributeError: - pass - if ( - getattr(settings, 'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_GITHUB_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_GITHUB_ORG_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_GITHUB_TEAM_KEY', None) - or getattr(settings, 'SOCIAL_AUTH_SAML_ENABLED_IDPS', None) - ) and user.social_auth.all(): - account_type = "social" - if (getattr(settings, 'RADIUS_SERVER', None) or getattr(settings, 'TACACSPLUS_HOST', None)) and user.enterprise_auth.all(): - account_type = "enterprise" - return account_type - - class classproperty: def __init__(self, fget=None, fset=None, fdel=None, doc=None): self.fget = fget diff --git a/awx/sso/common.py b/awx/sso/common.py index 4d601bb22e..99abc51d5a 100644 --- a/awx/sso/common.py +++ b/awx/sso/common.py @@ -169,3 +169,45 @@ def get_or_create_org_with_default_galaxy_cred(**kwargs): else: logger.debug("Could not find default Ansible Galaxy credential to add to org") return org + + +def get_external_account(user): + account_type = None + + # Previously this method also checked for active configuration which meant that if a user logged in from LDAP + # and then LDAP was no longer configured it would "convert" the user from an LDAP account_type to none. + # This did have one benefit that if a login type was removed intentionally the user could be given a username password. + # But it had a limitation that the user would have to have an active session (or an admin would have to go set a temp password). + # It also lead to the side affect that if LDAP was ever reconfigured the user would convert back to LDAP but still have a local password. + # That local password could then be used to bypass LDAP authentication. + try: + if user.pk and user.profile.ldap_dn and not user.has_usable_password(): + account_type = "ldap" + except AttributeError: + pass + + if user.social_auth.all(): + account_type = "social" + + if user.enterprise_auth.all(): + account_type = "enterprise" + + return account_type + + +def is_remote_auth_enabled(): + from django.conf import settings + + # Append LDAP, Radius, TACACS+ and SAML options + settings_that_turn_on_remote_auth = [ + 'AUTH_LDAP_SERVER_URI', + 'SOCIAL_AUTH_SAML_ENABLED_IDPS', + 'RADIUS_SERVER', + 'TACACSPLUS_HOST', + ] + # Also include any SOCAIL_AUTH_*KEY (except SAML) + for social_auth_key in dir(settings): + if social_auth_key.startswith('SOCIAL_AUTH_') and social_auth_key.endswith('_KEY') and 'SAML' not in social_auth_key: + settings_that_turn_on_remote_auth.append(social_auth_key) + + return any(getattr(settings, s, None) for s in settings_that_turn_on_remote_auth) diff --git a/awx/sso/tests/functional/test_common.py b/awx/sso/tests/functional/test_common.py index 4fc3edd841..f2b3e5781d 100644 --- a/awx/sso/tests/functional/test_common.py +++ b/awx/sso/tests/functional/test_common.py @@ -2,9 +2,22 @@ import pytest from collections import Counter from django.core.exceptions import FieldError from django.utils.timezone import now +from django.test.utils import override_settings from awx.main.models import Credential, CredentialType, Organization, Team, User -from awx.sso.common import get_orgs_by_ids, reconcile_users_org_team_mappings, create_org_and_teams, get_or_create_org_with_default_galaxy_cred +from awx.sso.common import ( + get_orgs_by_ids, + reconcile_users_org_team_mappings, + create_org_and_teams, + get_or_create_org_with_default_galaxy_cred, + is_remote_auth_enabled, + get_external_account, +) + + +class MicroMockObject(object): + def all(self): + return True @pytest.mark.django_db @@ -278,3 +291,87 @@ class TestCommonFunctions: for o in Organization.objects.all(): assert o.galaxy_credentials.count() == 0 + + @pytest.mark.parametrize( + "enable_ldap, enable_social, enable_enterprise, expected_results", + [ + (False, False, False, None), + (True, False, False, 'ldap'), + (True, True, False, 'social'), + (True, True, True, 'enterprise'), + (False, True, True, 'enterprise'), + (False, False, True, 'enterprise'), + (False, True, False, 'social'), + ], + ) + def test_get_external_account(self, enable_ldap, enable_social, enable_enterprise, expected_results): + try: + user = User.objects.get(username="external_tester") + except User.DoesNotExist: + user = User(username="external_tester") + user.set_unusable_password() + user.save() + + if enable_ldap: + user.profile.ldap_dn = 'test.dn' + if enable_social: + from social_django.models import UserSocialAuth + + social_auth, _ = UserSocialAuth.objects.get_or_create( + uid='667ec049-cdf3-45d0-a4dc-0465f7505954', + provider='oidc', + extra_data={}, + user_id=user.id, + ) + user.social_auth.set([social_auth]) + if enable_enterprise: + from awx.sso.models import UserEnterpriseAuth + + enterprise_auth = UserEnterpriseAuth(user=user, provider='tacacs+') + enterprise_auth.save() + + assert get_external_account(user) == expected_results + + @pytest.mark.parametrize( + "setting, expected", + [ + # Set none of the social auth settings + ('JUNK_SETTING', False), + # Set the hard coded settings + ('AUTH_LDAP_SERVER_URI', True), + ('SOCIAL_AUTH_SAML_ENABLED_IDPS', True), + ('RADIUS_SERVER', True), + ('TACACSPLUS_HOST', True), + # Set some SOCIAL_SOCIAL_AUTH_OIDC_KEYAUTH_*_KEY settings + ('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True), + ('SOCIAL_AUTH_GITHUB_ENTERPRISE_KEY', True), + ('SOCIAL_AUTH_GITHUB_ENTERPRISE_ORG_KEY', True), + ('SOCIAL_AUTH_GITHUB_ENTERPRISE_TEAM_KEY', True), + ('SOCIAL_AUTH_GITHUB_KEY', True), + ('SOCIAL_AUTH_GITHUB_ORG_KEY', True), + ('SOCIAL_AUTH_GITHUB_TEAM_KEY', True), + ('SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', True), + ('SOCIAL_AUTH_OIDC_KEY', True), + # Try a hypothetical future one + ('SOCIAL_AUTH_GIBBERISH_KEY', True), + # Do a SAML one + ('SOCIAL_AUTH_SAML_SP_PRIVATE_KEY', False), + ], + ) + def test_is_remote_auth_enabled(self, setting, expected): + with override_settings(**{setting: True}): + assert is_remote_auth_enabled() == expected + + @pytest.mark.parametrize( + "key_one, key_one_value, key_two, key_two_value, expected", + [ + ('JUNK_SETTING', True, 'JUNK2_SETTING', True, False), + ('AUTH_LDAP_SERVER_URI', True, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True, True), + ('JUNK_SETTING', True, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True, True), + ('AUTH_LDAP_SERVER_URI', False, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', False, False), + ], + ) + def test_is_remote_auth_enabled_multiple_keys(self, key_one, key_one_value, key_two, key_two_value, expected): + with override_settings(**{key_one: key_one_value}): + with override_settings(**{key_two: key_two_value}): + assert is_remote_auth_enabled() == expected