mirror of
https://github.com/ansible/awx.git
synced 2026-01-10 15:32:07 -03:30
External users should not be able to change their password (#13491)
* Azure AD users should not be able to change their password * Multiple auth changes Moving get_external_user function into awx.sso.common Altering get_external_user to not look at current config, just user object values Altering how api/conf.py detects external auth config (and making reusable function in awx.sso.common) Altering logic in api.serializers in _update_pasword to use awx.sso.common * Adding unit tests --------- Co-authored-by: John Westcott IV <john.westcott.iv@redhat.com>
This commit is contained in:
parent
c2df22e0f0
commit
52d46c88e4
@ -1,5 +1,4 @@
|
||||
# Django
|
||||
from django.conf import settings
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
# Django REST Framework
|
||||
@ -9,6 +8,7 @@ from rest_framework import serializers
|
||||
from awx.conf import fields, register, register_validate
|
||||
from awx.api.fields import OAuth2ProviderField
|
||||
from oauth2_provider.settings import oauth2_settings
|
||||
from awx.sso.common import is_remote_auth_enabled
|
||||
|
||||
|
||||
register(
|
||||
@ -108,19 +108,8 @@ register(
|
||||
|
||||
|
||||
def authentication_validate(serializer, attrs):
|
||||
remote_auth_settings = [
|
||||
'AUTH_LDAP_SERVER_URI',
|
||||
'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY',
|
||||
'SOCIAL_AUTH_GITHUB_KEY',
|
||||
'SOCIAL_AUTH_GITHUB_ORG_KEY',
|
||||
'SOCIAL_AUTH_GITHUB_TEAM_KEY',
|
||||
'SOCIAL_AUTH_SAML_ENABLED_IDPS',
|
||||
'RADIUS_SERVER',
|
||||
'TACACSPLUS_HOST',
|
||||
]
|
||||
if attrs.get('DISABLE_LOCAL_AUTH', False):
|
||||
if not any(getattr(settings, s, None) for s in remote_auth_settings):
|
||||
raise serializers.ValidationError(_("There are no remote authentication systems configured."))
|
||||
if attrs.get('DISABLE_LOCAL_AUTH', False) and not is_remote_auth_enabled():
|
||||
raise serializers.ValidationError(_("There are no remote authentication systems configured."))
|
||||
return attrs
|
||||
|
||||
|
||||
|
||||
@ -108,7 +108,6 @@ from awx.main.utils import (
|
||||
extract_ansible_vars,
|
||||
encrypt_dict,
|
||||
prefetch_page_capabilities,
|
||||
get_external_account,
|
||||
truncate_stdout,
|
||||
)
|
||||
from awx.main.utils.filters import SmartFilter
|
||||
@ -124,6 +123,8 @@ from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, Ver
|
||||
# AWX Utils
|
||||
from awx.api.validators import HostnameRegexValidator
|
||||
|
||||
from awx.sso.common import get_external_account
|
||||
|
||||
logger = logging.getLogger('awx.api.serializers')
|
||||
|
||||
# Fields that should be summarized regardless of object type.
|
||||
@ -987,23 +988,8 @@ class UserSerializer(BaseSerializer):
|
||||
def _update_password(self, obj, new_password):
|
||||
# For now we're not raising an error, just not saving password for
|
||||
# users managed by LDAP who already have an unusable password set.
|
||||
if getattr(settings, 'AUTH_LDAP_SERVER_URI', None):
|
||||
try:
|
||||
if obj.pk and obj.profile.ldap_dn and not obj.has_usable_password():
|
||||
new_password = None
|
||||
except AttributeError:
|
||||
pass
|
||||
if (
|
||||
getattr(settings, 'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_GITHUB_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_GITHUB_ORG_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_GITHUB_TEAM_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_SAML_ENABLED_IDPS', None)
|
||||
) and obj.social_auth.all():
|
||||
new_password = None
|
||||
if (getattr(settings, 'RADIUS_SERVER', None) or getattr(settings, 'TACACSPLUS_HOST', None)) and obj.enterprise_auth.all():
|
||||
new_password = None
|
||||
if new_password:
|
||||
# Get external password will return something like ldap or enterprise or None if the user isn't external. We only want to allow a password update for a None option
|
||||
if new_password and not self.get_external_account(obj):
|
||||
obj.set_password(new_password)
|
||||
obj.save(update_fields=['password'])
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@ from oauth2_provider.models import AbstractApplication, AbstractAccessToken
|
||||
from oauth2_provider.generators import generate_client_secret
|
||||
from oauthlib import oauth2
|
||||
|
||||
from awx.main.utils import get_external_account
|
||||
from awx.sso.common import get_external_account
|
||||
from awx.main.fields import OAuth2ClientSecretField
|
||||
|
||||
|
||||
|
||||
@ -80,7 +80,6 @@ __all__ = [
|
||||
'set_environ',
|
||||
'IllegalArgumentError',
|
||||
'get_custom_venv_choices',
|
||||
'get_external_account',
|
||||
'ScheduleTaskManager',
|
||||
'ScheduleDependencyManager',
|
||||
'ScheduleWorkflowManager',
|
||||
@ -1089,29 +1088,6 @@ def has_model_field_prefetched(model_obj, field_name):
|
||||
return getattr(getattr(model_obj, field_name, None), 'prefetch_cache_name', '') in getattr(model_obj, '_prefetched_objects_cache', {})
|
||||
|
||||
|
||||
def get_external_account(user):
|
||||
from django.conf import settings
|
||||
|
||||
account_type = None
|
||||
if getattr(settings, 'AUTH_LDAP_SERVER_URI', None):
|
||||
try:
|
||||
if user.pk and user.profile.ldap_dn and not user.has_usable_password():
|
||||
account_type = "ldap"
|
||||
except AttributeError:
|
||||
pass
|
||||
if (
|
||||
getattr(settings, 'SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_GITHUB_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_GITHUB_ORG_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_GITHUB_TEAM_KEY', None)
|
||||
or getattr(settings, 'SOCIAL_AUTH_SAML_ENABLED_IDPS', None)
|
||||
) and user.social_auth.all():
|
||||
account_type = "social"
|
||||
if (getattr(settings, 'RADIUS_SERVER', None) or getattr(settings, 'TACACSPLUS_HOST', None)) and user.enterprise_auth.all():
|
||||
account_type = "enterprise"
|
||||
return account_type
|
||||
|
||||
|
||||
class classproperty:
|
||||
def __init__(self, fget=None, fset=None, fdel=None, doc=None):
|
||||
self.fget = fget
|
||||
|
||||
@ -169,3 +169,45 @@ def get_or_create_org_with_default_galaxy_cred(**kwargs):
|
||||
else:
|
||||
logger.debug("Could not find default Ansible Galaxy credential to add to org")
|
||||
return org
|
||||
|
||||
|
||||
def get_external_account(user):
|
||||
account_type = None
|
||||
|
||||
# Previously this method also checked for active configuration which meant that if a user logged in from LDAP
|
||||
# and then LDAP was no longer configured it would "convert" the user from an LDAP account_type to none.
|
||||
# This did have one benefit that if a login type was removed intentionally the user could be given a username password.
|
||||
# But it had a limitation that the user would have to have an active session (or an admin would have to go set a temp password).
|
||||
# It also lead to the side affect that if LDAP was ever reconfigured the user would convert back to LDAP but still have a local password.
|
||||
# That local password could then be used to bypass LDAP authentication.
|
||||
try:
|
||||
if user.pk and user.profile.ldap_dn and not user.has_usable_password():
|
||||
account_type = "ldap"
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
if user.social_auth.all():
|
||||
account_type = "social"
|
||||
|
||||
if user.enterprise_auth.all():
|
||||
account_type = "enterprise"
|
||||
|
||||
return account_type
|
||||
|
||||
|
||||
def is_remote_auth_enabled():
|
||||
from django.conf import settings
|
||||
|
||||
# Append LDAP, Radius, TACACS+ and SAML options
|
||||
settings_that_turn_on_remote_auth = [
|
||||
'AUTH_LDAP_SERVER_URI',
|
||||
'SOCIAL_AUTH_SAML_ENABLED_IDPS',
|
||||
'RADIUS_SERVER',
|
||||
'TACACSPLUS_HOST',
|
||||
]
|
||||
# Also include any SOCAIL_AUTH_*KEY (except SAML)
|
||||
for social_auth_key in dir(settings):
|
||||
if social_auth_key.startswith('SOCIAL_AUTH_') and social_auth_key.endswith('_KEY') and 'SAML' not in social_auth_key:
|
||||
settings_that_turn_on_remote_auth.append(social_auth_key)
|
||||
|
||||
return any(getattr(settings, s, None) for s in settings_that_turn_on_remote_auth)
|
||||
|
||||
@ -2,9 +2,22 @@ import pytest
|
||||
from collections import Counter
|
||||
from django.core.exceptions import FieldError
|
||||
from django.utils.timezone import now
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from awx.main.models import Credential, CredentialType, Organization, Team, User
|
||||
from awx.sso.common import get_orgs_by_ids, reconcile_users_org_team_mappings, create_org_and_teams, get_or_create_org_with_default_galaxy_cred
|
||||
from awx.sso.common import (
|
||||
get_orgs_by_ids,
|
||||
reconcile_users_org_team_mappings,
|
||||
create_org_and_teams,
|
||||
get_or_create_org_with_default_galaxy_cred,
|
||||
is_remote_auth_enabled,
|
||||
get_external_account,
|
||||
)
|
||||
|
||||
|
||||
class MicroMockObject(object):
|
||||
def all(self):
|
||||
return True
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@ -278,3 +291,87 @@ class TestCommonFunctions:
|
||||
|
||||
for o in Organization.objects.all():
|
||||
assert o.galaxy_credentials.count() == 0
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"enable_ldap, enable_social, enable_enterprise, expected_results",
|
||||
[
|
||||
(False, False, False, None),
|
||||
(True, False, False, 'ldap'),
|
||||
(True, True, False, 'social'),
|
||||
(True, True, True, 'enterprise'),
|
||||
(False, True, True, 'enterprise'),
|
||||
(False, False, True, 'enterprise'),
|
||||
(False, True, False, 'social'),
|
||||
],
|
||||
)
|
||||
def test_get_external_account(self, enable_ldap, enable_social, enable_enterprise, expected_results):
|
||||
try:
|
||||
user = User.objects.get(username="external_tester")
|
||||
except User.DoesNotExist:
|
||||
user = User(username="external_tester")
|
||||
user.set_unusable_password()
|
||||
user.save()
|
||||
|
||||
if enable_ldap:
|
||||
user.profile.ldap_dn = 'test.dn'
|
||||
if enable_social:
|
||||
from social_django.models import UserSocialAuth
|
||||
|
||||
social_auth, _ = UserSocialAuth.objects.get_or_create(
|
||||
uid='667ec049-cdf3-45d0-a4dc-0465f7505954',
|
||||
provider='oidc',
|
||||
extra_data={},
|
||||
user_id=user.id,
|
||||
)
|
||||
user.social_auth.set([social_auth])
|
||||
if enable_enterprise:
|
||||
from awx.sso.models import UserEnterpriseAuth
|
||||
|
||||
enterprise_auth = UserEnterpriseAuth(user=user, provider='tacacs+')
|
||||
enterprise_auth.save()
|
||||
|
||||
assert get_external_account(user) == expected_results
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"setting, expected",
|
||||
[
|
||||
# Set none of the social auth settings
|
||||
('JUNK_SETTING', False),
|
||||
# Set the hard coded settings
|
||||
('AUTH_LDAP_SERVER_URI', True),
|
||||
('SOCIAL_AUTH_SAML_ENABLED_IDPS', True),
|
||||
('RADIUS_SERVER', True),
|
||||
('TACACSPLUS_HOST', True),
|
||||
# Set some SOCIAL_SOCIAL_AUTH_OIDC_KEYAUTH_*_KEY settings
|
||||
('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True),
|
||||
('SOCIAL_AUTH_GITHUB_ENTERPRISE_KEY', True),
|
||||
('SOCIAL_AUTH_GITHUB_ENTERPRISE_ORG_KEY', True),
|
||||
('SOCIAL_AUTH_GITHUB_ENTERPRISE_TEAM_KEY', True),
|
||||
('SOCIAL_AUTH_GITHUB_KEY', True),
|
||||
('SOCIAL_AUTH_GITHUB_ORG_KEY', True),
|
||||
('SOCIAL_AUTH_GITHUB_TEAM_KEY', True),
|
||||
('SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', True),
|
||||
('SOCIAL_AUTH_OIDC_KEY', True),
|
||||
# Try a hypothetical future one
|
||||
('SOCIAL_AUTH_GIBBERISH_KEY', True),
|
||||
# Do a SAML one
|
||||
('SOCIAL_AUTH_SAML_SP_PRIVATE_KEY', False),
|
||||
],
|
||||
)
|
||||
def test_is_remote_auth_enabled(self, setting, expected):
|
||||
with override_settings(**{setting: True}):
|
||||
assert is_remote_auth_enabled() == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key_one, key_one_value, key_two, key_two_value, expected",
|
||||
[
|
||||
('JUNK_SETTING', True, 'JUNK2_SETTING', True, False),
|
||||
('AUTH_LDAP_SERVER_URI', True, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True, True),
|
||||
('JUNK_SETTING', True, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True, True),
|
||||
('AUTH_LDAP_SERVER_URI', False, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', False, False),
|
||||
],
|
||||
)
|
||||
def test_is_remote_auth_enabled_multiple_keys(self, key_one, key_one_value, key_two, key_two_value, expected):
|
||||
with override_settings(**{key_one: key_one_value}):
|
||||
with override_settings(**{key_two: key_two_value}):
|
||||
assert is_remote_auth_enabled() == expected
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user