Remove LDAP authentication (#15546)

Remove LDAP authentication from AWX
This commit is contained in:
Djebran Lezzoum
2024-10-02 15:40:16 +02:00
committed by jessicamack
parent 6dea7bfe17
commit f22b192fb4
67 changed files with 172 additions and 2813 deletions

View File

@@ -961,7 +961,6 @@ class UnifiedJobStdoutSerializer(UnifiedJobSerializer):
class UserSerializer(BaseSerializer):
password = serializers.CharField(required=False, default='', help_text=_('Field used to change the password.'))
ldap_dn = serializers.CharField(source='profile.ldap_dn', read_only=True)
external_account = serializers.SerializerMethodField(help_text=_('Set if the account is managed by an external service'))
is_system_auditor = serializers.BooleanField(default=False)
show_capabilities = ['edit', 'delete']
@@ -979,7 +978,6 @@ class UserSerializer(BaseSerializer):
'is_superuser',
'is_system_auditor',
'password',
'ldap_dn',
'last_login',
'external_account',
)
@@ -1028,8 +1026,10 @@ class UserSerializer(BaseSerializer):
def _update_password(self, obj, new_password):
# For now we're not raising an error, just not saving password for
# users managed by LDAP who already have an unusable password set.
# Get external password will return something like ldap or enterprise or None if the user isn't external. We only want to allow a password update for a None option
# users managed by external authentication services (who already have an unusable password set).
# get_external_account function will return something like social or enterprise when the user is external,
# and return None when the user isn't external.
# We want to allow a password update only for non-external accounts.
if new_password and new_password != '$encrypted$' and not self.get_external_account(obj):
obj.set_password(new_password)
obj.save(update_fields=['password'])
@@ -1085,37 +1085,6 @@ class UserSerializer(BaseSerializer):
)
return res
def _validate_ldap_managed_field(self, value, field_name):
if not getattr(settings, 'AUTH_LDAP_SERVER_URI', None):
return value
try:
is_ldap_user = bool(self.instance and self.instance.profile.ldap_dn)
except AttributeError:
is_ldap_user = False
if is_ldap_user:
ldap_managed_fields = ['username']
ldap_managed_fields.extend(getattr(settings, 'AUTH_LDAP_USER_ATTR_MAP', {}).keys())
ldap_managed_fields.extend(getattr(settings, 'AUTH_LDAP_USER_FLAGS_BY_GROUP', {}).keys())
if field_name in ldap_managed_fields:
if value != getattr(self.instance, field_name):
raise serializers.ValidationError(_('Unable to change %s on user managed by LDAP.') % field_name)
return value
def validate_username(self, value):
return self._validate_ldap_managed_field(value, 'username')
def validate_first_name(self, value):
return self._validate_ldap_managed_field(value, 'first_name')
def validate_last_name(self, value):
return self._validate_ldap_managed_field(value, 'last_name')
def validate_email(self, value):
return self._validate_ldap_managed_field(value, 'email')
def validate_is_superuser(self, value):
return self._validate_ldap_managed_field(value, 'is_superuser')
class UserActivityStreamSerializer(UserSerializer):
"""Changes to system auditor status are shown as separate entries,

View File

@@ -295,15 +295,6 @@ class ApiV2ConfigView(APIView):
become_methods=PRIVILEGE_ESCALATION_METHODS,
)
# If LDAP is enabled, user_ldap_fields will return a list of field
# names that are managed by LDAP and should be read-only for users with
# a non-empty ldap_dn attribute.
if getattr(settings, 'AUTH_LDAP_SERVER_URI', None):
user_ldap_fields = ['username', 'password']
user_ldap_fields.extend(getattr(settings, 'AUTH_LDAP_USER_ATTR_MAP', {}).keys())
user_ldap_fields.extend(getattr(settings, 'AUTH_LDAP_USER_FLAGS_BY_GROUP', {}).keys())
data['user_ldap_fields'] = user_ldap_fields
if (
request.user.is_superuser
or request.user.is_system_auditor

View File

@@ -1,13 +1,11 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
# AWX
from awx.conf.migrations._ldap_group_type import fill_ldap_group_type_params
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [('conf', '0005_v330_rename_two_session_settings')]
operations = [migrations.RunPython(fill_ldap_group_type_params)]
# this migration is doing nothing, and is here to preserve migrations files integrity
operations = []

View File

@@ -0,0 +1,115 @@
from django.db import migrations
LDAP_AUTH_CONF_KEYS = [
'AUTH_LDAP_SERVER_URI',
'AUTH_LDAP_BIND_DN',
'AUTH_LDAP_BIND_PASSWORD',
'AUTH_LDAP_START_TLS',
'AUTH_LDAP_CONNECTION_OPTIONS',
'AUTH_LDAP_USER_SEARCH',
'AUTH_LDAP_USER_DN_TEMPLATE',
'AUTH_LDAP_USER_ATTR_MAP',
'AUTH_LDAP_GROUP_SEARCH',
'AUTH_LDAP_GROUP_TYPE',
'AUTH_LDAP_GROUP_TYPE_PARAMS',
'AUTH_LDAP_REQUIRE_GROUP',
'AUTH_LDAP_DENY_GROUP',
'AUTH_LDAP_USER_FLAGS_BY_GROUP',
'AUTH_LDAP_ORGANIZATION_MAP',
'AUTH_LDAP_TEAM_MAP',
'AUTH_LDAP_1_SERVER_URI',
'AUTH_LDAP_1_BIND_DN',
'AUTH_LDAP_1_BIND_PASSWORD',
'AUTH_LDAP_1_START_TLS',
'AUTH_LDAP_1_CONNECTION_OPTIONS',
'AUTH_LDAP_1_USER_SEARCH',
'AUTH_LDAP_1_USER_DN_TEMPLATE',
'AUTH_LDAP_1_USER_ATTR_MAP',
'AUTH_LDAP_1_GROUP_SEARCH',
'AUTH_LDAP_1_GROUP_TYPE',
'AUTH_LDAP_1_GROUP_TYPE_PARAMS',
'AUTH_LDAP_1_REQUIRE_GROUP',
'AUTH_LDAP_1_DENY_GROUP',
'AUTH_LDAP_1_USER_FLAGS_BY_GROUP',
'AUTH_LDAP_1_ORGANIZATION_MAP',
'AUTH_LDAP_1_TEAM_MAP',
'AUTH_LDAP_2_SERVER_URI',
'AUTH_LDAP_2_BIND_DN',
'AUTH_LDAP_2_BIND_PASSWORD',
'AUTH_LDAP_2_START_TLS',
'AUTH_LDAP_2_CONNECTION_OPTIONS',
'AUTH_LDAP_2_USER_SEARCH',
'AUTH_LDAP_2_USER_DN_TEMPLATE',
'AUTH_LDAP_2_USER_ATTR_MAP',
'AUTH_LDAP_2_GROUP_SEARCH',
'AUTH_LDAP_2_GROUP_TYPE',
'AUTH_LDAP_2_GROUP_TYPE_PARAMS',
'AUTH_LDAP_2_REQUIRE_GROUP',
'AUTH_LDAP_2_DENY_GROUP',
'AUTH_LDAP_2_USER_FLAGS_BY_GROUP',
'AUTH_LDAP_2_ORGANIZATION_MAP',
'AUTH_LDAP_2_TEAM_MAP',
'AUTH_LDAP_3_SERVER_URI',
'AUTH_LDAP_3_BIND_DN',
'AUTH_LDAP_3_BIND_PASSWORD',
'AUTH_LDAP_3_START_TLS',
'AUTH_LDAP_3_CONNECTION_OPTIONS',
'AUTH_LDAP_3_USER_SEARCH',
'AUTH_LDAP_3_USER_DN_TEMPLATE',
'AUTH_LDAP_3_USER_ATTR_MAP',
'AUTH_LDAP_3_GROUP_SEARCH',
'AUTH_LDAP_3_GROUP_TYPE',
'AUTH_LDAP_3_GROUP_TYPE_PARAMS',
'AUTH_LDAP_3_REQUIRE_GROUP',
'AUTH_LDAP_3_DENY_GROUP',
'AUTH_LDAP_3_USER_FLAGS_BY_GROUP',
'AUTH_LDAP_3_ORGANIZATION_MAP',
'AUTH_LDAP_3_TEAM_MAP',
'AUTH_LDAP_4_SERVER_URI',
'AUTH_LDAP_4_BIND_DN',
'AUTH_LDAP_4_BIND_PASSWORD',
'AUTH_LDAP_4_START_TLS',
'AUTH_LDAP_4_CONNECTION_OPTIONS',
'AUTH_LDAP_4_USER_SEARCH',
'AUTH_LDAP_4_USER_DN_TEMPLATE',
'AUTH_LDAP_4_USER_ATTR_MAP',
'AUTH_LDAP_4_GROUP_SEARCH',
'AUTH_LDAP_4_GROUP_TYPE',
'AUTH_LDAP_4_GROUP_TYPE_PARAMS',
'AUTH_LDAP_4_REQUIRE_GROUP',
'AUTH_LDAP_4_DENY_GROUP',
'AUTH_LDAP_4_USER_FLAGS_BY_GROUP',
'AUTH_LDAP_4_ORGANIZATION_MAP',
'AUTH_LDAP_4_TEAM_MAP',
'AUTH_LDAP_5_SERVER_URI',
'AUTH_LDAP_5_BIND_DN',
'AUTH_LDAP_5_BIND_PASSWORD',
'AUTH_LDAP_5_START_TLS',
'AUTH_LDAP_5_CONNECTION_OPTIONS',
'AUTH_LDAP_5_USER_SEARCH',
'AUTH_LDAP_5_USER_DN_TEMPLATE',
'AUTH_LDAP_5_USER_ATTR_MAP',
'AUTH_LDAP_5_GROUP_SEARCH',
'AUTH_LDAP_5_GROUP_TYPE',
'AUTH_LDAP_5_GROUP_TYPE_PARAMS',
'AUTH_LDAP_5_REQUIRE_GROUP',
'AUTH_LDAP_5_DENY_GROUP',
'AUTH_LDAP_5_USER_FLAGS_BY_GROUP',
'AUTH_LDAP_5_ORGANIZATION_MAP',
'AUTH_LDAP_5_TEAM_MAP',
]
def remove_ldap_auth_conf(apps, scheme_editor):
setting = apps.get_model('conf', 'Setting')
setting.objects.filter(key__in=LDAP_AUTH_CONF_KEYS).delete()
class Migration(migrations.Migration):
dependencies = [
('conf', '0010_change_to_JSONField'),
]
operations = [
migrations.RunPython(remove_ldap_auth_conf),
]

View File

@@ -1,31 +0,0 @@
import inspect
from django.conf import settings
import logging
logger = logging.getLogger('awx.conf.migrations')
def fill_ldap_group_type_params(apps, schema_editor):
group_type = getattr(settings, 'AUTH_LDAP_GROUP_TYPE', None)
Setting = apps.get_model('conf', 'Setting')
group_type_params = {'name_attr': 'cn', 'member_attr': 'member'}
qs = Setting.objects.filter(key='AUTH_LDAP_GROUP_TYPE_PARAMS')
entry = None
if qs.exists():
entry = qs[0]
group_type_params = entry.value
else:
return # for new installs we prefer to use the default value
init_attrs = set(inspect.getfullargspec(group_type.__init__).args[1:])
for k in list(group_type_params.keys()):
if k not in init_attrs:
del group_type_params[k]
entry.value = group_type_params
logger.warning(f'Migration updating AUTH_LDAP_GROUP_TYPE_PARAMS with value {entry.value}')
entry.save()

View File

@@ -73,6 +73,6 @@ def disable_local_auth(**kwargs):
logger.warning("Triggering token invalidation for local users.")
qs = User.objects.filter(profile__ldap_dn='', enterprise_auth__isnull=True, social_auth__isnull=True)
qs = User.objects.filter(enterprise_auth__isnull=True, social_auth__isnull=True)
revoke_tokens(RefreshToken.objects.filter(revoked=None, user__in=qs))
revoke_tokens(OAuth2AccessToken.objects.filter(user__in=qs))

View File

@@ -1,25 +0,0 @@
import pytest
from awx.conf.migrations._ldap_group_type import fill_ldap_group_type_params
from awx.conf.models import Setting
from django.apps import apps
@pytest.mark.django_db
def test_fill_group_type_params_no_op():
fill_ldap_group_type_params(apps, 'dont-use-me')
assert Setting.objects.count() == 0
@pytest.mark.django_db
def test_keep_old_setting_with_default_value():
Setting.objects.create(key='AUTH_LDAP_GROUP_TYPE', value={'name_attr': 'cn', 'member_attr': 'member'})
fill_ldap_group_type_params(apps, 'dont-use-me')
assert Setting.objects.count() == 1
s = Setting.objects.first()
assert s.value == {'name_attr': 'cn', 'member_attr': 'member'}
# NOTE: would be good to test the removal of attributes by migration
# but this requires fighting with the validator and is not done here

View File

@@ -642,10 +642,7 @@ class UserAccess(BaseAccess):
"""
model = User
prefetch_related = (
'profile',
'resource',
)
prefetch_related = ('resource',)
def filtered_queryset(self):
if settings.ORG_ADMINS_CAN_SEE_ALL_USERS and (self.user.admin_of_organizations.exists() or self.user.auditor_of_organizations.exists()):

View File

@@ -1,7 +1,6 @@
import json
import os
import sys
import re
from typing import Any
from django.core.management.base import BaseCommand
@@ -11,7 +10,7 @@ from awx.conf import settings_registry
class Command(BaseCommand):
help = 'Dump the current auth configuration in django_ansible_base.authenticator format, currently supports LDAP and SAML'
help = 'Dump the current auth configuration in django_ansible_base.authenticator format, currently supports SAML'
DAB_SAML_AUTHENTICATOR_KEYS = {
"SP_ENTITY_ID": True,
@@ -27,20 +26,6 @@ class Command(BaseCommand):
"CALLBACK_URL": False,
}
DAB_LDAP_AUTHENTICATOR_KEYS = {
"SERVER_URI": True,
"BIND_DN": False,
"BIND_PASSWORD": False,
"CONNECTION_OPTIONS": False,
"GROUP_TYPE": True,
"GROUP_TYPE_PARAMS": True,
"GROUP_SEARCH": False,
"START_TLS": False,
"USER_DN_TEMPLATE": True,
"USER_ATTR_MAP": True,
"USER_SEARCH": False,
}
def is_enabled(self, settings, keys):
missing_fields = []
for key, required in keys.items():
@@ -50,41 +35,6 @@ class Command(BaseCommand):
return False, missing_fields
return True, None
def get_awx_ldap_settings(self) -> dict[str, dict[str, Any]]:
awx_ldap_settings = {}
for awx_ldap_setting in settings_registry.get_registered_settings(category_slug='ldap'):
key = awx_ldap_setting.removeprefix("AUTH_LDAP_")
value = getattr(settings, awx_ldap_setting, None)
awx_ldap_settings[key] = value
grouped_settings = {}
for key, value in awx_ldap_settings.items():
match = re.search(r'(\d+)', key)
index = int(match.group()) if match else 0
new_key = re.sub(r'\d+_', '', key)
if index not in grouped_settings:
grouped_settings[index] = {}
grouped_settings[index][new_key] = value
if new_key == "GROUP_TYPE" and value:
grouped_settings[index][new_key] = type(value).__name__
if new_key == "SERVER_URI" and value:
value = value.split(", ")
grouped_settings[index][new_key] = value
if type(value).__name__ == "LDAPSearch":
data = []
data.append(value.base_dn)
data.append("SCOPE_SUBTREE")
data.append(value.filterstr)
grouped_settings[index][new_key] = data
return grouped_settings
def get_awx_saml_settings(self) -> dict[str, Any]:
awx_saml_settings = {}
for awx_saml_setting in settings_registry.get_registered_settings(category_slug='saml'):
@@ -157,23 +107,6 @@ class Command(BaseCommand):
else:
data.append({"SAML_missing_fields": saml_missing_fields})
# dump LDAP settings
awx_ldap_group_settings = self.get_awx_ldap_settings()
for awx_ldap_name, awx_ldap_settings in awx_ldap_group_settings.items():
awx_ldap_enabled, ldap_missing_fields = self.is_enabled(awx_ldap_settings, self.DAB_LDAP_AUTHENTICATOR_KEYS)
if awx_ldap_enabled:
data.append(
self.format_config_data(
awx_ldap_enabled,
awx_ldap_settings,
"ldap",
self.DAB_LDAP_AUTHENTICATOR_KEYS,
f"LDAP_{awx_ldap_name}",
)
)
else:
data.append({f"LDAP_{awx_ldap_name}_missing_fields": ldap_missing_fields})
# write to file if requested
if options["output_file"]:
# Define the path for the output JSON file

View File

@@ -93,7 +93,7 @@ class DisableLocalAuthMiddleware(MiddlewareMixin):
user = request.user
if not user.pk:
return
if not (user.profile.ldap_dn or user.social_auth.exists() or user.enterprise_auth.exists()):
if not (user.social_auth.exists() or user.enterprise_auth.exists()):
logout(request)

View File

@@ -0,0 +1,16 @@
# Generated by Django 4.2.10 on 2024-08-09 16:47
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0195_EE_permissions'),
]
operations = [
migrations.DeleteModel(
name='Profile',
),
]

View File

@@ -18,7 +18,7 @@ from ansible_base.lib.utils.models import user_summary_fields
# AWX
from awx.main.models.base import BaseModel, PrimordialModel, accepts_json, CLOUD_INVENTORY_SOURCES, VERBOSITY_CHOICES # noqa
from awx.main.models.unified_jobs import UnifiedJob, UnifiedJobTemplate, StdoutMaxBytesExceeded # noqa
from awx.main.models.organization import Organization, Profile, Team, UserSessionMembership # noqa
from awx.main.models.organization import Organization, Team, UserSessionMembership # noqa
from awx.main.models.credential import Credential, CredentialType, CredentialInputSource, ManagedCredentialType, build_safe_env # noqa
from awx.main.models.projects import Project, ProjectUpdate # noqa
from awx.main.models.receptor_address import ReceptorAddress # noqa
@@ -292,7 +292,6 @@ activity_stream_registrar.connect(Job)
activity_stream_registrar.connect(AdHocCommand)
# activity_stream_registrar.connect(JobHostSummary)
# activity_stream_registrar.connect(JobEvent)
# activity_stream_registrar.connect(Profile)
activity_stream_registrar.connect(Schedule)
activity_stream_registrar.connect(NotificationTemplate)
activity_stream_registrar.connect(Notification)

View File

@@ -15,8 +15,8 @@ from ansible_base.resource_registry.fields import AnsibleResourceField
# AWX
from awx.api.versioning import reverse
from awx.main.fields import AutoOneToOneField, ImplicitRoleField, OrderedManyToManyField
from awx.main.models.base import BaseModel, CommonModel, CommonModelNameNotUnique, CreatedModifiedModel, NotificationFieldsModel
from awx.main.fields import ImplicitRoleField, OrderedManyToManyField
from awx.main.models.base import BaseModel, CommonModel, CommonModelNameNotUnique, NotificationFieldsModel
from awx.main.models.rbac import (
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
ROLE_SINGLETON_SYSTEM_AUDITOR,
@@ -24,7 +24,7 @@ from awx.main.models.rbac import (
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.models.mixins import ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin
__all__ = ['Organization', 'Team', 'Profile', 'UserSessionMembership']
__all__ = ['Organization', 'Team', 'UserSessionMembership']
class Organization(CommonModel, NotificationFieldsModel, ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin):
@@ -167,22 +167,6 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
return reverse('api:team_detail', kwargs={'pk': self.pk}, request=request)
class Profile(CreatedModifiedModel):
"""
Profile model related to User object. Currently stores LDAP DN for users
loaded from LDAP.
"""
class Meta:
app_label = 'main'
user = AutoOneToOneField('auth.User', related_name='profile', editable=False, on_delete=models.CASCADE)
ldap_dn = models.CharField(
max_length=1024,
default='',
)
class UserSessionMembership(BaseModel):
"""
A lookup table for API session membership given user. Note, there is a

View File

@@ -66,82 +66,6 @@ def test_awx_task_env_validity(get, patch, admin, value, expected):
assert resp.data['AWX_TASK_ENV'] == dict()
@pytest.mark.django_db
def test_ldap_settings(get, put, patch, delete, admin):
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'ldap'})
get(url, user=admin, expect=200)
# The PUT below will fail at the moment because AUTH_LDAP_GROUP_TYPE
# defaults to None but cannot be set to None.
# put(url, user=admin, data=response.data, expect=200)
delete(url, user=admin, expect=204)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': ''}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldap.example.com'}, expect=400)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldap://ldap.example.com'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldaps://ldap.example.com'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldap://ldap.example.com:389'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldaps://ldap.example.com:636'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldap://ldap.example.com ldap://ldap2.example.com'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldap://ldap.example.com,ldap://ldap2.example.com'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_SERVER_URI': 'ldap://ldap.example.com, ldap://ldap2.example.com'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_BIND_DN': 'cn=Manager,dc=example,dc=com'}, expect=200)
patch(url, user=admin, data={'AUTH_LDAP_BIND_DN': u'cn=暴力膜,dc=大新闻,dc=真的粉丝'}, expect=200)
@pytest.mark.django_db
@pytest.mark.parametrize(
'value',
[
None,
'',
'INVALID',
1,
[1],
['INVALID'],
],
)
def test_ldap_user_flags_by_group_invalid_dn(get, patch, admin, value):
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'ldap'})
patch(url, user=admin, data={'AUTH_LDAP_USER_FLAGS_BY_GROUP': {'is_superuser': value}}, expect=400)
@pytest.mark.django_db
def test_ldap_user_flags_by_group_string(get, patch, admin):
expected = 'CN=Admins,OU=Groups,DC=example,DC=com'
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'ldap'})
patch(url, user=admin, data={'AUTH_LDAP_USER_FLAGS_BY_GROUP': {'is_superuser': expected}}, expect=200)
resp = get(url, user=admin)
assert resp.data['AUTH_LDAP_USER_FLAGS_BY_GROUP']['is_superuser'] == [expected]
@pytest.mark.django_db
def test_ldap_user_flags_by_group_list(get, patch, admin):
expected = ['CN=Admins,OU=Groups,DC=example,DC=com', 'CN=Superadmins,OU=Groups,DC=example,DC=com']
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'ldap'})
patch(url, user=admin, data={'AUTH_LDAP_USER_FLAGS_BY_GROUP': {'is_superuser': expected}}, expect=200)
resp = get(url, user=admin)
assert resp.data['AUTH_LDAP_USER_FLAGS_BY_GROUP']['is_superuser'] == expected
@pytest.mark.parametrize(
'setting',
[
'AUTH_LDAP_USER_DN_TEMPLATE',
'AUTH_LDAP_REQUIRE_GROUP',
'AUTH_LDAP_DENY_GROUP',
],
)
@pytest.mark.django_db
def test_empty_ldap_dn(get, put, patch, delete, admin, setting):
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'ldap'})
patch(url, user=admin, data={setting: ''}, expect=200)
resp = get(url, user=admin, expect=200)
assert resp.data[setting] is None
patch(url, user=admin, data={setting: None}, expect=200)
resp = get(url, user=admin, expect=200)
assert resp.data[setting] is None
@pytest.mark.django_db
def test_radius_settings(get, put, patch, delete, admin, settings):
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'radius'})

View File

@@ -1,103 +0,0 @@
import ldap
import ldif
import pytest
import os
from mockldap import MockLdap
from awx.api.versioning import reverse
@pytest.fixture
def ldap_generator():
def fn(fname, host='localhost'):
fh = open(os.path.join(os.path.dirname(os.path.realpath(__file__)), fname), 'rb')
ctrl = ldif.LDIFRecordList(fh)
ctrl.parse()
directory = dict(ctrl.all_records)
mockldap = MockLdap(directory)
mockldap.start()
mockldap['ldap://{}/'.format(host)]
conn = ldap.initialize('ldap://{}/'.format(host))
return conn
# mockldap.stop()
return fn
@pytest.fixture
def ldap_settings_generator():
def fn(prefix='', dc='ansible', host='ldap.ansible.com'):
prefix = '_{}'.format(prefix) if prefix else ''
data = {
'AUTH_LDAP_SERVER_URI': 'ldap://{}'.format(host),
'AUTH_LDAP_BIND_DN': 'cn=eng_user1,ou=people,dc={},dc=com'.format(dc),
'AUTH_LDAP_BIND_PASSWORD': 'password',
"AUTH_LDAP_USER_SEARCH": ["ou=people,dc={},dc=com".format(dc), "SCOPE_SUBTREE", "(cn=%(user)s)"],
"AUTH_LDAP_TEAM_MAP": {
"LDAP Sales": {"organization": "LDAP Organization", "users": "cn=sales,ou=groups,dc={},dc=com".format(dc), "remove": True},
"LDAP IT": {"organization": "LDAP Organization", "users": "cn=it,ou=groups,dc={},dc=com".format(dc), "remove": True},
"LDAP Engineering": {"organization": "LDAP Organization", "users": "cn=engineering,ou=groups,dc={},dc=com".format(dc), "remove": True},
},
"AUTH_LDAP_REQUIRE_GROUP": None,
"AUTH_LDAP_USER_ATTR_MAP": {"first_name": "givenName", "last_name": "sn", "email": "mail"},
"AUTH_LDAP_GROUP_SEARCH": ["dc={},dc=com".format(dc), "SCOPE_SUBTREE", "(objectClass=groupOfNames)"],
"AUTH_LDAP_USER_FLAGS_BY_GROUP": {"is_superuser": "cn=superusers,ou=groups,dc={},dc=com".format(dc)},
"AUTH_LDAP_ORGANIZATION_MAP": {
"LDAP Organization": {
"admins": "cn=engineering_admins,ou=groups,dc={},dc=com".format(dc),
"remove_admins": False,
"users": [
"cn=engineering,ou=groups,dc={},dc=com".format(dc),
"cn=sales,ou=groups,dc={},dc=com".format(dc),
"cn=it,ou=groups,dc={},dc=com".format(dc),
],
"remove_users": False,
}
},
}
if prefix:
data_new = dict()
for k, v in data.items():
k_new = k.replace('AUTH_LDAP', 'AUTH_LDAP{}'.format(prefix))
data_new[k_new] = v
else:
data_new = data
return data_new
return fn
# Note: mockldap isn't fully featured. Fancy queries aren't fully baked.
# However, objects returned are solid so they should flow through django ldap middleware nicely.
@pytest.mark.skip(reason="Needs Update - CA")
@pytest.mark.django_db
def test_login(ldap_generator, patch, post, admin, ldap_settings_generator):
auth_url = reverse('api:auth_token_view')
ldap_settings_url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'ldap'})
# Generate mock ldap servers and init with ldap data
ldap_generator("../data/ldap_example.ldif", "ldap.example.com")
ldap_generator("../data/ldap_redhat.ldif", "ldap.redhat.com")
ldap_generator("../data/ldap_ansible.ldif", "ldap.ansible.com")
ldap_settings_example = ldap_settings_generator(dc='example')
ldap_settings_ansible = ldap_settings_generator(prefix='1', dc='ansible')
ldap_settings_redhat = ldap_settings_generator(prefix='2', dc='redhat')
# eng_user1 exists in ansible and redhat but not example
patch(ldap_settings_url, user=admin, data=ldap_settings_example, expect=200)
post(auth_url, data={'username': 'eng_user1', 'password': 'password'}, expect=400)
patch(ldap_settings_url, user=admin, data=ldap_settings_ansible, expect=200)
patch(ldap_settings_url, user=admin, data=ldap_settings_redhat, expect=200)
post(auth_url, data={'username': 'eng_user1', 'password': 'password'}, expect=200)

View File

@@ -28,21 +28,6 @@ settings_dict = {
}
},
"SOCIAL_AUTH_SAML_CALLBACK_URL": "CALLBACK_URL",
"AUTH_LDAP_1_SERVER_URI": "SERVER_URI",
"AUTH_LDAP_1_BIND_DN": "BIND_DN",
"AUTH_LDAP_1_BIND_PASSWORD": "BIND_PASSWORD",
"AUTH_LDAP_1_GROUP_SEARCH": ["GROUP_SEARCH"],
"AUTH_LDAP_1_GROUP_TYPE": "string object",
"AUTH_LDAP_1_GROUP_TYPE_PARAMS": {"member_attr": "member", "name_attr": "cn"},
"AUTH_LDAP_1_USER_DN_TEMPLATE": "USER_DN_TEMPLATE",
"AUTH_LDAP_1_USER_SEARCH": ["USER_SEARCH"],
"AUTH_LDAP_1_USER_ATTR_MAP": {
"email": "email",
"last_name": "last_name",
"first_name": "first_name",
},
"AUTH_LDAP_1_CONNECTION_OPTIONS": {},
"AUTH_LDAP_1_START_TLS": None,
}
@@ -93,27 +78,6 @@ class TestDumpAuthConfigCommand(TestCase):
"IDP_ATTR_USER_PERMANENT_ID": "name_id",
},
},
{
"type": "ansible_base.authentication.authenticator_plugins.ldap",
"name": "LDAP_1",
"enabled": True,
"create_objects": True,
"users_unique": False,
"remove_users": True,
"configuration": {
"SERVER_URI": ["SERVER_URI"],
"BIND_DN": "BIND_DN",
"BIND_PASSWORD": "BIND_PASSWORD",
"CONNECTION_OPTIONS": {},
"GROUP_TYPE": "str",
"GROUP_TYPE_PARAMS": {"member_attr": "member", "name_attr": "cn"},
"GROUP_SEARCH": ["GROUP_SEARCH"],
"START_TLS": None,
"USER_DN_TEMPLATE": "USER_DN_TEMPLATE",
"USER_ATTR_MAP": {"email": "email", "last_name": "last_name", "first_name": "first_name"},
"USER_SEARCH": ["USER_SEARCH"],
},
},
]
def test_json_returned_from_cmd(self):
@@ -123,10 +87,3 @@ class TestDumpAuthConfigCommand(TestCase):
# check configured SAML return
assert cmmd_output[0] == self.expected_config[0]
# check configured LDAP return
assert cmmd_output[2] == self.expected_config[1]
# check unconfigured LDAP return
assert "LDAP_0_missing_fields" in cmmd_output[1]
assert cmmd_output[1]["LDAP_0_missing_fields"] == ['SERVER_URI', 'GROUP_TYPE', 'GROUP_TYPE_PARAMS', 'USER_DN_TEMPLATE', 'USER_ATTR_MAP']

View File

@@ -9,8 +9,6 @@ import tempfile
import socket
from datetime import timedelta
# python-ldap
import ldap
from split_settings.tools import include
@@ -394,12 +392,6 @@ REST_FRAMEWORK = {
}
AUTHENTICATION_BACKENDS = (
'awx.sso.backends.LDAPBackend',
'awx.sso.backends.LDAPBackend1',
'awx.sso.backends.LDAPBackend2',
'awx.sso.backends.LDAPBackend3',
'awx.sso.backends.LDAPBackend4',
'awx.sso.backends.LDAPBackend5',
'awx.sso.backends.RADIUSBackend',
'awx.sso.backends.TACACSPlusBackend',
'social_core.backends.google.GoogleOAuth2',
@@ -425,14 +417,6 @@ OAUTH2_PROVIDER_ID_TOKEN_MODEL = "oauth2_provider.IDToken"
OAUTH2_PROVIDER = {'ACCESS_TOKEN_EXPIRE_SECONDS': 31536000000, 'AUTHORIZATION_CODE_EXPIRE_SECONDS': 600, 'REFRESH_TOKEN_EXPIRE_SECONDS': 2628000}
ALLOW_OAUTH2_FOR_EXTERNAL_USERS = False
# LDAP server (default to None to skip using LDAP authentication).
# Note: This setting may be overridden by database settings.
AUTH_LDAP_SERVER_URI = None
# Disable LDAP referrals by default (to prevent certain LDAP queries from
# hanging with AD).
# Note: This setting may be overridden by database settings.
AUTH_LDAP_CONNECTION_OPTIONS = {ldap.OPT_REFERRALS: 0, ldap.OPT_NETWORK_TIMEOUT: 30}
# Radius server settings (default to empty string to skip using Radius auth).
# Note: These settings may be overridden by database settings.
@@ -932,7 +916,6 @@ LOGGING = {
'awx.analytics.broadcast_websocket': {'handlers': ['console', 'file', 'wsrelay', 'external_logger'], 'level': 'INFO', 'propagate': False},
'awx.analytics.performance': {'handlers': ['console', 'file', 'tower_warnings', 'external_logger'], 'level': 'DEBUG', 'propagate': False},
'awx.analytics.job_lifecycle': {'handlers': ['console', 'job_lifecycle'], 'level': 'DEBUG', 'propagate': False},
'django_auth_ldap': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
'social': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
'system_tracking_migrations': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
'rbac_migrations': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},

View File

@@ -2,26 +2,14 @@
# All Rights Reserved.
# Python
from collections import OrderedDict
import logging
import uuid
import ldap
# Django
from django.dispatch import receiver
from django.contrib.auth.models import User
from django.conf import settings as django_settings
from django.core.signals import setting_changed
from django.utils.encoding import force_str
from django.http import HttpResponse
# django-auth-ldap
from django_auth_ldap.backend import LDAPSettings as BaseLDAPSettings
from django_auth_ldap.backend import LDAPBackend as BaseLDAPBackend
from django_auth_ldap.backend import populate_user
from django.core.exceptions import ImproperlyConfigured
# radiusauth
from radiusauth.backends import RADIUSBackend as BaseRADIUSBackend
@@ -35,143 +23,10 @@ from social_core.backends.saml import SAMLIdentityProvider as BaseSAMLIdentityPr
# Ansible Tower
from awx.sso.models import UserEnterpriseAuth
from awx.sso.common import create_org_and_teams, reconcile_users_org_team_mappings
logger = logging.getLogger('awx.sso.backends')
class LDAPSettings(BaseLDAPSettings):
defaults = dict(list(BaseLDAPSettings.defaults.items()) + list({'ORGANIZATION_MAP': {}, 'TEAM_MAP': {}, 'GROUP_TYPE_PARAMS': {}}.items()))
def __init__(self, prefix='AUTH_LDAP_', defaults={}):
super(LDAPSettings, self).__init__(prefix, defaults)
# If a DB-backed setting is specified that wipes out the
# OPT_NETWORK_TIMEOUT, fall back to a sane default
if ldap.OPT_NETWORK_TIMEOUT not in getattr(self, 'CONNECTION_OPTIONS', {}):
options = getattr(self, 'CONNECTION_OPTIONS', {})
options[ldap.OPT_NETWORK_TIMEOUT] = 30
self.CONNECTION_OPTIONS = options
# when specifying `.set_option()` calls for TLS in python-ldap, the
# *order* in which you invoke them *matters*, particularly in Python3,
# where dictionary insertion order is persisted
#
# specifically, it is *critical* that `ldap.OPT_X_TLS_NEWCTX` be set *last*
# this manual sorting puts `OPT_X_TLS_NEWCTX` *after* other TLS-related
# options
#
# see: https://github.com/python-ldap/python-ldap/issues/55
newctx_option = self.CONNECTION_OPTIONS.pop(ldap.OPT_X_TLS_NEWCTX, None)
self.CONNECTION_OPTIONS = OrderedDict(self.CONNECTION_OPTIONS)
if newctx_option is not None:
self.CONNECTION_OPTIONS[ldap.OPT_X_TLS_NEWCTX] = newctx_option
class LDAPBackend(BaseLDAPBackend):
"""
Custom LDAP backend for AWX.
"""
settings_prefix = 'AUTH_LDAP_'
def __init__(self, *args, **kwargs):
self._dispatch_uid = uuid.uuid4()
super(LDAPBackend, self).__init__(*args, **kwargs)
setting_changed.connect(self._on_setting_changed, dispatch_uid=self._dispatch_uid)
def _on_setting_changed(self, sender, **kwargs):
# If any AUTH_LDAP_* setting changes, force settings to be reloaded for
# this backend instance.
if kwargs.get('setting', '').startswith(self.settings_prefix):
self._settings = None
def _get_settings(self):
if self._settings is None:
self._settings = LDAPSettings(self.settings_prefix)
return self._settings
def _set_settings(self, settings):
self._settings = settings
settings = property(_get_settings, _set_settings)
def authenticate(self, request, username, password):
if self.settings.START_TLS and ldap.OPT_X_TLS_REQUIRE_CERT in self.settings.CONNECTION_OPTIONS:
# with python-ldap, if you want to set connection-specific TLS
# parameters, you must also specify OPT_X_TLS_NEWCTX = 0
# see: https://stackoverflow.com/a/29722445
# see: https://stackoverflow.com/a/38136255
self.settings.CONNECTION_OPTIONS[ldap.OPT_X_TLS_NEWCTX] = 0
if not self.settings.SERVER_URI:
return None
try:
user = User.objects.get(username=username)
if user and (not user.profile or not user.profile.ldap_dn):
return None
except User.DoesNotExist:
pass
try:
for setting_name, type_ in [('GROUP_SEARCH', 'LDAPSearch'), ('GROUP_TYPE', 'LDAPGroupType')]:
if getattr(self.settings, setting_name) is None:
raise ImproperlyConfigured("{} must be an {} instance.".format(setting_name, type_))
ldap_user = super(LDAPBackend, self).authenticate(request, username, password)
# If we have an LDAP user and that user we found has an ldap_user internal object and that object has a bound connection
# Then we can try and force an unbind to close the sticky connection
if ldap_user and ldap_user.ldap_user and ldap_user.ldap_user._connection_bound:
logger.debug("Forcing LDAP connection to close")
try:
ldap_user.ldap_user._connection.unbind_s()
ldap_user.ldap_user._connection_bound = False
except Exception:
logger.exception(f"Got unexpected LDAP exception when forcing LDAP disconnect for user {ldap_user}, login will still proceed")
return ldap_user
except Exception:
logger.exception("Encountered an error authenticating to LDAP")
return None
def get_user(self, user_id):
if not self.settings.SERVER_URI:
return None
return super(LDAPBackend, self).get_user(user_id)
# Disable any LDAP based authorization / permissions checking.
def has_perm(self, user, perm, obj=None):
return False
def has_module_perms(self, user, app_label):
return False
def get_all_permissions(self, user, obj=None):
return set()
def get_group_permissions(self, user, obj=None):
return set()
class LDAPBackend1(LDAPBackend):
settings_prefix = 'AUTH_LDAP_1_'
class LDAPBackend2(LDAPBackend):
settings_prefix = 'AUTH_LDAP_2_'
class LDAPBackend3(LDAPBackend):
settings_prefix = 'AUTH_LDAP_3_'
class LDAPBackend4(LDAPBackend):
settings_prefix = 'AUTH_LDAP_4_'
class LDAPBackend5(LDAPBackend):
settings_prefix = 'AUTH_LDAP_5_'
def _decorate_enterprise_user(user, provider):
user.set_unusable_password()
user.save()
@@ -348,122 +203,3 @@ class SAMLAuth(BaseSAMLAuth):
):
return None
return super(SAMLAuth, self).get_user(user_id)
def _update_m2m_from_groups(ldap_user, opts, remove=True):
"""
Hepler function to evaluate the LDAP team/org options to determine if LDAP user should
be a member of the team/org based on their ldap group dns.
Returns:
True - User should be added
False - User should be removed
None - Users membership should not be changed
"""
if opts is None:
return None
elif not opts:
pass
elif isinstance(opts, bool) and opts is True:
return True
else:
if isinstance(opts, str):
opts = [opts]
# If any of the users groups matches any of the list options
for group_dn in opts:
if not isinstance(group_dn, str):
continue
if ldap_user._get_groups().is_member_of(group_dn):
return True
if remove:
return False
return None
@receiver(populate_user, dispatch_uid='populate-ldap-user')
def on_populate_user(sender, **kwargs):
"""
Handle signal from LDAP backend to populate the user object. Update user
organization/team memberships according to their LDAP groups.
"""
user = kwargs['user']
ldap_user = kwargs['ldap_user']
backend = ldap_user.backend
# Boolean to determine if we should force an user update
# to avoid duplicate SQL update statements
force_user_update = False
# Prefetch user's groups to prevent LDAP queries for each org/team when
# checking membership.
ldap_user._get_groups().get_group_dns()
# If the LDAP user has a first or last name > $maxlen chars, truncate it
for field in ('first_name', 'last_name'):
max_len = User._meta.get_field(field).max_length
field_len = len(getattr(user, field))
if field_len > max_len:
setattr(user, field, getattr(user, field)[:max_len])
force_user_update = True
logger.warning('LDAP user {} has {} > max {} characters'.format(user.username, field, max_len))
org_map = getattr(backend.settings, 'ORGANIZATION_MAP', {})
team_map_settings = getattr(backend.settings, 'TEAM_MAP', {})
orgs_list = list(org_map.keys())
team_map = {}
for team_name, team_opts in team_map_settings.items():
if not team_opts.get('organization', None):
# You can't save the LDAP config in the UI w/o an org (or '' or null as the org) so if we somehow got this condition its an error
logger.error("Team named {} in LDAP team map settings is invalid due to missing organization".format(team_name))
continue
team_map[team_name] = team_opts['organization']
create_org_and_teams(orgs_list, team_map, 'LDAP')
# Compute in memory what the state is of the different LDAP orgs
org_roles_and_ldap_attributes = {'admin_role': 'admins', 'auditor_role': 'auditors', 'member_role': 'users'}
desired_org_states = {}
for org_name, org_opts in org_map.items():
remove = bool(org_opts.get('remove', True))
desired_org_states[org_name] = {}
for org_role_name in org_roles_and_ldap_attributes.keys():
ldap_name = org_roles_and_ldap_attributes[org_role_name]
opts = org_opts.get(ldap_name, None)
remove = bool(org_opts.get('remove_{}'.format(ldap_name), remove))
desired_org_states[org_name][org_role_name] = _update_m2m_from_groups(ldap_user, opts, remove)
# If everything returned None (because there was no configuration) we can remove this org from our map
# This will prevent us from loading the org in the next query
if all(desired_org_states[org_name][org_role_name] is None for org_role_name in org_roles_and_ldap_attributes.keys()):
del desired_org_states[org_name]
# Compute in memory what the state is of the different LDAP teams
desired_team_states = {}
for team_name, team_opts in team_map_settings.items():
if 'organization' not in team_opts:
continue
users_opts = team_opts.get('users', None)
remove = bool(team_opts.get('remove', True))
state = _update_m2m_from_groups(ldap_user, users_opts, remove)
if state is not None:
organization = team_opts['organization']
if organization not in desired_team_states:
desired_team_states[organization] = {}
desired_team_states[organization][team_name] = {'member_role': state}
# Check if user.profile is available, otherwise force user.save()
try:
_ = user.profile
except ValueError:
force_user_update = True
finally:
if force_user_update:
user.save()
# Update user profile to store LDAP DN.
profile = user.profile
if profile.ldap_dn != ldap_user.dn:
profile.ldap_dn = ldap_user.dn
profile.save()
reconcile_users_org_team_mappings(user, desired_org_states, desired_team_states, 'LDAP')

View File

@@ -113,7 +113,7 @@ def create_org_and_teams(org_list, team_map, adapter, can_create=True):
logger.debug(f"Adapter {adapter} is not allowed to create orgs/teams")
return
# Get all of the IDs and names of orgs in the DB and create any new org defined in LDAP that does not exist in the DB
# Get all of the IDs and names of orgs in the DB and create any new org defined in org_list that does not exist in the DB
existing_orgs = get_orgs_by_ids()
# Parse through orgs and teams provided and create a list of unique items we care about creating
@@ -174,18 +174,6 @@ def get_or_create_org_with_default_galaxy_cred(**kwargs):
def get_external_account(user):
account_type = None
# Previously this method also checked for active configuration which meant that if a user logged in from LDAP
# and then LDAP was no longer configured it would "convert" the user from an LDAP account_type to none.
# This did have one benefit that if a login type was removed intentionally the user could be given a username password.
# But it had a limitation that the user would have to have an active session (or an admin would have to go set a temp password).
# It also lead to the side affect that if LDAP was ever reconfigured the user would convert back to LDAP but still have a local password.
# That local password could then be used to bypass LDAP authentication.
try:
if user.pk and user.profile.ldap_dn and not user.has_usable_password():
account_type = "ldap"
except AttributeError:
pass
if user.social_auth.all():
account_type = "social"
@@ -198,9 +186,8 @@ def get_external_account(user):
def is_remote_auth_enabled():
from django.conf import settings
# Append LDAP, Radius, TACACS+ and SAML options
# Append Radius, TACACS+ and SAML options
settings_that_turn_on_remote_auth = [
'AUTH_LDAP_SERVER_URI',
'SOCIAL_AUTH_SAML_ENABLED_IDPS',
'RADIUS_SERVER',
'TACACSPLUS_HOST',

View File

@@ -14,18 +14,6 @@ from rest_framework import serializers
from awx.conf import register, register_validate, fields
from awx.sso.fields import (
AuthenticationBackendsField,
LDAPConnectionOptionsField,
LDAPDNField,
LDAPDNWithUserField,
LDAPGroupTypeField,
LDAPGroupTypeParamsField,
LDAPOrganizationMapField,
LDAPSearchField,
LDAPSearchUnionField,
LDAPServerURIField,
LDAPTeamMapField,
LDAPUserAttrMapField,
LDAPUserFlagsField,
SAMLContactField,
SAMLEnabledIdPsField,
SAMLOrgAttrField,
@@ -37,7 +25,7 @@ from awx.sso.fields import (
SocialTeamMapField,
)
from awx.main.validators import validate_private_key, validate_certificate
from awx.sso.validators import validate_ldap_bind_dn, validate_tacacsplus_disallow_nonascii # noqa
from awx.sso.validators import validate_tacacsplus_disallow_nonascii # noqa
class SocialAuthCallbackURL(object):
@@ -159,297 +147,6 @@ if settings.ALLOW_LOCAL_RESOURCE_MANAGEMENT:
category_slug='authentication',
)
###############################################################################
# LDAP AUTHENTICATION SETTINGS
###############################################################################
def _register_ldap(append=None):
append_str = '_{}'.format(append) if append else ''
register(
'AUTH_LDAP{}_SERVER_URI'.format(append_str),
field_class=LDAPServerURIField,
allow_blank=True,
default='',
label=_('LDAP Server URI'),
help_text=_(
'URI to connect to LDAP server, such as "ldap://ldap.example.com:389" '
'(non-SSL) or "ldaps://ldap.example.com:636" (SSL). Multiple LDAP '
'servers may be specified by separating with spaces or commas. LDAP '
'authentication is disabled if this parameter is empty.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder='ldaps://ldap.example.com:636',
)
register(
'AUTH_LDAP{}_BIND_DN'.format(append_str),
field_class=fields.CharField,
allow_blank=True,
default='',
validators=[validate_ldap_bind_dn],
label=_('LDAP Bind DN'),
help_text=_(
'DN (Distinguished Name) of user to bind for all search queries. This'
' is the system user account we will use to login to query LDAP for other'
' user information. Refer to the documentation for example syntax.'
),
category=_('LDAP'),
category_slug='ldap',
)
register(
'AUTH_LDAP{}_BIND_PASSWORD'.format(append_str),
field_class=fields.CharField,
allow_blank=True,
default='',
label=_('LDAP Bind Password'),
help_text=_('Password used to bind LDAP user account.'),
category=_('LDAP'),
category_slug='ldap',
encrypted=True,
)
register(
'AUTH_LDAP{}_START_TLS'.format(append_str),
field_class=fields.BooleanField,
default=False,
label=_('LDAP Start TLS'),
help_text=_('Whether to enable TLS when the LDAP connection is not using SSL.'),
category=_('LDAP'),
category_slug='ldap',
)
register(
'AUTH_LDAP{}_CONNECTION_OPTIONS'.format(append_str),
field_class=LDAPConnectionOptionsField,
default={'OPT_REFERRALS': 0, 'OPT_NETWORK_TIMEOUT': 30},
label=_('LDAP Connection Options'),
help_text=_(
'Additional options to set for the LDAP connection. LDAP '
'referrals are disabled by default (to prevent certain LDAP '
'queries from hanging with AD). Option names should be strings '
'(e.g. "OPT_REFERRALS"). Refer to '
'https://www.python-ldap.org/doc/html/ldap.html#options for '
'possible options and values that can be set.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder=collections.OrderedDict([('OPT_REFERRALS', 0), ('OPT_NETWORK_TIMEOUT', 30)]),
)
register(
'AUTH_LDAP{}_USER_SEARCH'.format(append_str),
field_class=LDAPSearchUnionField,
default=[],
label=_('LDAP User Search'),
help_text=_(
'LDAP search query to find users. Any user that matches the given '
'pattern will be able to login to the service. The user should also be '
'mapped into an organization (as defined in the '
'AUTH_LDAP_ORGANIZATION_MAP setting). If multiple search queries '
'need to be supported use of "LDAPUnion" is possible. See '
'the documentation for details.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder=('OU=Users,DC=example,DC=com', 'SCOPE_SUBTREE', '(sAMAccountName=%(user)s)'),
)
register(
'AUTH_LDAP{}_USER_DN_TEMPLATE'.format(append_str),
field_class=LDAPDNWithUserField,
allow_blank=True,
allow_null=True,
default=None,
label=_('LDAP User DN Template'),
help_text=_(
'Alternative to user search, if user DNs are all of the same '
'format. This approach is more efficient for user lookups than '
'searching if it is usable in your organizational environment. If '
'this setting has a value it will be used instead of '
'AUTH_LDAP_USER_SEARCH.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder='uid=%(user)s,OU=Users,DC=example,DC=com',
)
register(
'AUTH_LDAP{}_USER_ATTR_MAP'.format(append_str),
field_class=LDAPUserAttrMapField,
default={},
label=_('LDAP User Attribute Map'),
help_text=_(
'Mapping of LDAP user schema to API user attributes. The default'
' setting is valid for ActiveDirectory but users with other LDAP'
' configurations may need to change the values. Refer to the'
' documentation for additional details.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder=collections.OrderedDict([('first_name', 'givenName'), ('last_name', 'sn'), ('email', 'mail')]),
)
register(
'AUTH_LDAP{}_GROUP_SEARCH'.format(append_str),
field_class=LDAPSearchField,
default=[],
label=_('LDAP Group Search'),
help_text=_(
'Users are mapped to organizations based on their membership in LDAP'
' groups. This setting defines the LDAP search query to find groups. '
'Unlike the user search, group search does not support LDAPSearchUnion.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder=('DC=example,DC=com', 'SCOPE_SUBTREE', '(objectClass=group)'),
)
register(
'AUTH_LDAP{}_GROUP_TYPE'.format(append_str),
field_class=LDAPGroupTypeField,
label=_('LDAP Group Type'),
help_text=_(
'The group type may need to be changed based on the type of the '
'LDAP server. Values are listed at: '
'https://django-auth-ldap.readthedocs.io/en/stable/groups.html#types-of-groups'
),
category=_('LDAP'),
category_slug='ldap',
default='MemberDNGroupType',
depends_on=['AUTH_LDAP{}_GROUP_TYPE_PARAMS'.format(append_str)],
)
register(
'AUTH_LDAP{}_GROUP_TYPE_PARAMS'.format(append_str),
field_class=LDAPGroupTypeParamsField,
label=_('LDAP Group Type Parameters'),
help_text=_('Key value parameters to send the chosen group type init method.'),
category=_('LDAP'),
category_slug='ldap',
default=collections.OrderedDict([('member_attr', 'member'), ('name_attr', 'cn')]),
placeholder=collections.OrderedDict([('ldap_group_user_attr', 'legacyuid'), ('member_attr', 'member'), ('name_attr', 'cn')]),
depends_on=['AUTH_LDAP{}_GROUP_TYPE'.format(append_str)],
)
register(
'AUTH_LDAP{}_REQUIRE_GROUP'.format(append_str),
field_class=LDAPDNField,
allow_blank=True,
allow_null=True,
default=None,
label=_('LDAP Require Group'),
help_text=_(
'Group DN required to login. If specified, user must be a member '
'of this group to login via LDAP. If not set, everyone in LDAP '
'that matches the user search will be able to login to the service. '
'Only one require group is supported.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder='CN=Service Users,OU=Users,DC=example,DC=com',
)
register(
'AUTH_LDAP{}_DENY_GROUP'.format(append_str),
field_class=LDAPDNField,
allow_blank=True,
allow_null=True,
default=None,
label=_('LDAP Deny Group'),
help_text=_(
'Group DN denied from login. If specified, user will not be allowed to login if a member of this group. Only one deny group is supported.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder='CN=Disabled Users,OU=Users,DC=example,DC=com',
)
register(
'AUTH_LDAP{}_USER_FLAGS_BY_GROUP'.format(append_str),
field_class=LDAPUserFlagsField,
default={},
label=_('LDAP User Flags By Group'),
help_text=_(
'Retrieve users from a given group. At this time, superuser and system'
' auditors are the only groups supported. Refer to the'
' documentation for more detail.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder=collections.OrderedDict(
[('is_superuser', 'CN=Domain Admins,CN=Users,DC=example,DC=com'), ('is_system_auditor', 'CN=Domain Auditors,CN=Users,DC=example,DC=com')]
),
)
register(
'AUTH_LDAP{}_ORGANIZATION_MAP'.format(append_str),
field_class=LDAPOrganizationMapField,
default={},
label=_('LDAP Organization Map'),
help_text=_(
'Mapping between organization admins/users and LDAP groups. This '
'controls which users are placed into which organizations '
'relative to their LDAP group memberships. Configuration details '
'are available in the documentation.'
),
category=_('LDAP'),
category_slug='ldap',
placeholder=collections.OrderedDict(
[
(
'Test Org',
collections.OrderedDict(
[
('admins', 'CN=Domain Admins,CN=Users,DC=example,DC=com'),
('auditors', 'CN=Domain Auditors,CN=Users,DC=example,DC=com'),
('users', ['CN=Domain Users,CN=Users,DC=example,DC=com']),
('remove_users', True),
('remove_admins', True),
]
),
),
(
'Test Org 2',
collections.OrderedDict(
[('admins', 'CN=Administrators,CN=Builtin,DC=example,DC=com'), ('users', True), ('remove_users', True), ('remove_admins', True)]
),
),
]
),
)
register(
'AUTH_LDAP{}_TEAM_MAP'.format(append_str),
field_class=LDAPTeamMapField,
default={},
label=_('LDAP Team Map'),
help_text=_('Mapping between team members (users) and LDAP groups. Configuration details are available in the documentation.'),
category=_('LDAP'),
category_slug='ldap',
placeholder=collections.OrderedDict(
[
(
'My Team',
collections.OrderedDict([('organization', 'Test Org'), ('users', ['CN=Domain Users,CN=Users,DC=example,DC=com']), ('remove', True)]),
),
(
'Other Team',
collections.OrderedDict([('organization', 'Test Org 2'), ('users', 'CN=Other Users,CN=Users,DC=example,DC=com'), ('remove', False)]),
),
]
),
)
_register_ldap()
_register_ldap('1')
_register_ldap('2')
_register_ldap('3')
_register_ldap('4')
_register_ldap('5')
###############################################################################
# RADIUS AUTHENTICATION SETTINGS
###############################################################################

View File

@@ -1,39 +1,20 @@
import collections
import copy
import inspect
import json
import re
import six
# Python LDAP
import ldap
import awx
# Django
from django.utils.translation import gettext_lazy as _
# Django Auth LDAP
import django_auth_ldap.config
from django_auth_ldap.config import LDAPSearch, LDAPSearchUnion
from rest_framework.exceptions import ValidationError
from rest_framework.fields import empty, Field, SkipField
# This must be imported so get_subclasses picks it up
from awx.sso.ldap_group_types import PosixUIDGroupType # noqa
# AWX
from awx.conf import fields
from awx.main.validators import validate_certificate
from awx.sso.validators import ( # noqa
validate_ldap_dn,
validate_ldap_bind_dn,
validate_ldap_dn_with_user,
validate_ldap_filter,
validate_ldap_filter_with_user,
validate_tacacsplus_disallow_nonascii,
)
from awx.sso.validators import validate_tacacsplus_disallow_nonascii # noqa
def get_subclasses(cls):
@@ -43,18 +24,6 @@ def get_subclasses(cls):
yield subclass
def find_class_in_modules(class_name):
"""
Used to find ldap subclasses by string
"""
module_search_space = [django_auth_ldap.config, awx.sso.ldap_group_types]
for m in module_search_space:
cls = getattr(m, class_name, None)
if cls:
return cls
return None
class DependsOnMixin:
def get_depends_on(self):
"""
@@ -139,12 +108,6 @@ class AuthenticationBackendsField(fields.StringListField):
# authentication backend.
REQUIRED_BACKEND_SETTINGS = collections.OrderedDict(
[
('awx.sso.backends.LDAPBackend', ['AUTH_LDAP_SERVER_URI']),
('awx.sso.backends.LDAPBackend1', ['AUTH_LDAP_1_SERVER_URI']),
('awx.sso.backends.LDAPBackend2', ['AUTH_LDAP_2_SERVER_URI']),
('awx.sso.backends.LDAPBackend3', ['AUTH_LDAP_3_SERVER_URI']),
('awx.sso.backends.LDAPBackend4', ['AUTH_LDAP_4_SERVER_URI']),
('awx.sso.backends.LDAPBackend5', ['AUTH_LDAP_5_SERVER_URI']),
('awx.sso.backends.RADIUSBackend', ['RADIUS_SERVER']),
('social_core.backends.google.GoogleOAuth2', ['SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', 'SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET']),
('social_core.backends.github.GithubOAuth2', ['SOCIAL_AUTH_GITHUB_KEY', 'SOCIAL_AUTH_GITHUB_SECRET']),
@@ -230,310 +193,6 @@ class AuthenticationBackendsField(fields.StringListField):
return backends
class LDAPServerURIField(fields.URLField):
def __init__(self, **kwargs):
kwargs.setdefault('schemes', ('ldap', 'ldaps'))
kwargs.setdefault('allow_plain_hostname', True)
super(LDAPServerURIField, self).__init__(**kwargs)
def run_validators(self, value):
for url in filter(None, re.split(r'[, ]', (value or ''))):
super(LDAPServerURIField, self).run_validators(url)
return value
class LDAPConnectionOptionsField(fields.DictField):
default_error_messages = {'invalid_options': _('Invalid connection option(s): {invalid_options}.')}
def to_representation(self, value):
value = value or {}
opt_names = ldap.OPT_NAMES_DICT
# Convert integer options to their named constants.
repr_value = {}
for opt, opt_value in value.items():
if opt in opt_names:
repr_value[opt_names[opt]] = opt_value
return repr_value
def to_internal_value(self, data):
data = super(LDAPConnectionOptionsField, self).to_internal_value(data)
valid_options = dict([(v, k) for k, v in ldap.OPT_NAMES_DICT.items()])
invalid_options = set(data.keys()) - set(valid_options.keys())
if invalid_options:
invalid_options = sorted(list(invalid_options))
options_display = json.dumps(invalid_options).lstrip('[').rstrip(']')
self.fail('invalid_options', invalid_options=options_display)
# Convert named options to their integer constants.
internal_data = {}
for opt_name, opt_value in data.items():
internal_data[valid_options[opt_name]] = opt_value
return internal_data
class LDAPDNField(fields.CharField):
def __init__(self, **kwargs):
super(LDAPDNField, self).__init__(**kwargs)
self.validators.append(validate_ldap_dn)
def run_validation(self, data=empty):
value = super(LDAPDNField, self).run_validation(data)
# django-auth-ldap expects DN fields (like AUTH_LDAP_REQUIRE_GROUP)
# to be either a valid string or ``None`` (not an empty string)
return None if value == '' else value
class LDAPDNListField(fields.StringListField):
def __init__(self, **kwargs):
super(LDAPDNListField, self).__init__(**kwargs)
self.validators.append(lambda dn: list(map(validate_ldap_dn, dn)))
def run_validation(self, data=empty):
if not isinstance(data, (list, tuple)):
data = [data]
return super(LDAPDNListField, self).run_validation(data)
class LDAPDNWithUserField(fields.CharField):
def __init__(self, **kwargs):
super(LDAPDNWithUserField, self).__init__(**kwargs)
self.validators.append(validate_ldap_dn_with_user)
def run_validation(self, data=empty):
value = super(LDAPDNWithUserField, self).run_validation(data)
# django-auth-ldap expects DN fields (like AUTH_LDAP_USER_DN_TEMPLATE)
# to be either a valid string or ``None`` (not an empty string)
return None if value == '' else value
class LDAPFilterField(fields.CharField):
def __init__(self, **kwargs):
super(LDAPFilterField, self).__init__(**kwargs)
self.validators.append(validate_ldap_filter)
class LDAPFilterWithUserField(fields.CharField):
def __init__(self, **kwargs):
super(LDAPFilterWithUserField, self).__init__(**kwargs)
self.validators.append(validate_ldap_filter_with_user)
class LDAPScopeField(fields.ChoiceField):
def __init__(self, choices=None, **kwargs):
choices = choices or [('SCOPE_BASE', _('Base')), ('SCOPE_ONELEVEL', _('One Level')), ('SCOPE_SUBTREE', _('Subtree'))]
super(LDAPScopeField, self).__init__(choices, **kwargs)
def to_representation(self, value):
for choice in self.choices.keys():
if value == getattr(ldap, choice):
return choice
return super(LDAPScopeField, self).to_representation(value)
def to_internal_value(self, data):
value = super(LDAPScopeField, self).to_internal_value(data)
return getattr(ldap, value)
class LDAPSearchField(fields.ListField):
default_error_messages = {
'invalid_length': _('Expected a list of three items but got {length} instead.'),
'type_error': _('Expected an instance of LDAPSearch but got {input_type} instead.'),
}
ldap_filter_field_class = LDAPFilterField
def to_representation(self, value):
if not value:
return []
if not isinstance(value, LDAPSearch):
self.fail('type_error', input_type=type(value))
return [
LDAPDNField().to_representation(value.base_dn),
LDAPScopeField().to_representation(value.scope),
self.ldap_filter_field_class().to_representation(value.filterstr),
]
def to_internal_value(self, data):
data = super(LDAPSearchField, self).to_internal_value(data)
if len(data) == 0:
return None
if len(data) != 3:
self.fail('invalid_length', length=len(data))
return LDAPSearch(
LDAPDNField().run_validation(data[0]), LDAPScopeField().run_validation(data[1]), self.ldap_filter_field_class().run_validation(data[2])
)
class LDAPSearchWithUserField(LDAPSearchField):
ldap_filter_field_class = LDAPFilterWithUserField
class LDAPSearchUnionField(fields.ListField):
default_error_messages = {'type_error': _('Expected an instance of LDAPSearch or LDAPSearchUnion but got {input_type} instead.')}
ldap_search_field_class = LDAPSearchWithUserField
def to_representation(self, value):
if not value:
return []
elif isinstance(value, LDAPSearchUnion):
return [self.ldap_search_field_class().to_representation(s) for s in value.searches]
elif isinstance(value, LDAPSearch):
return self.ldap_search_field_class().to_representation(value)
else:
self.fail('type_error', input_type=type(value))
def to_internal_value(self, data):
data = super(LDAPSearchUnionField, self).to_internal_value(data)
if len(data) == 0:
return None
if len(data) == 3 and isinstance(data[0], str):
return self.ldap_search_field_class().run_validation(data)
else:
search_args = []
for i in range(len(data)):
if not isinstance(data[i], list):
raise ValidationError('In order to ultilize LDAP Union, input element No. %d should be a search query array.' % (i + 1))
try:
search_args.append(self.ldap_search_field_class().run_validation(data[i]))
except Exception as e:
if hasattr(e, 'detail') and isinstance(e.detail, list):
e.detail.insert(0, "Error parsing LDAP Union element No. %d:" % (i + 1))
raise e
return LDAPSearchUnion(*search_args)
class LDAPUserAttrMapField(fields.DictField):
default_error_messages = {'invalid_attrs': _('Invalid user attribute(s): {invalid_attrs}.')}
valid_user_attrs = {'first_name', 'last_name', 'email'}
child = fields.CharField()
def to_internal_value(self, data):
data = super(LDAPUserAttrMapField, self).to_internal_value(data)
invalid_attrs = set(data.keys()) - self.valid_user_attrs
if invalid_attrs:
invalid_attrs = sorted(list(invalid_attrs))
attrs_display = json.dumps(invalid_attrs).lstrip('[').rstrip(']')
self.fail('invalid_attrs', invalid_attrs=attrs_display)
return data
class LDAPGroupTypeField(fields.ChoiceField, DependsOnMixin):
default_error_messages = {
'type_error': _('Expected an instance of LDAPGroupType but got {input_type} instead.'),
'missing_parameters': _('Missing required parameters in {dependency}.'),
'invalid_parameters': _('Invalid group_type parameters. Expected instance of dict but got {parameters_type} instead.'),
}
def __init__(self, choices=None, **kwargs):
group_types = get_subclasses(django_auth_ldap.config.LDAPGroupType)
choices = choices or [(x.__name__, x.__name__) for x in group_types]
super(LDAPGroupTypeField, self).__init__(choices, **kwargs)
def to_representation(self, value):
if not value:
return 'MemberDNGroupType'
if not isinstance(value, django_auth_ldap.config.LDAPGroupType):
self.fail('type_error', input_type=type(value))
return value.__class__.__name__
def to_internal_value(self, data):
data = super(LDAPGroupTypeField, self).to_internal_value(data)
if not data:
return None
cls = find_class_in_modules(data)
if not cls:
return None
# Per-group type parameter validation and handling here
# Backwords compatability. Before AUTH_LDAP_GROUP_TYPE_PARAMS existed
# MemberDNGroupType was the only group type, of the underlying lib, that
# took a parameter.
params = self.get_depends_on() or {}
params_sanitized = dict()
cls_args = inspect.getfullargspec(cls.__init__).args[1:]
if cls_args:
if not isinstance(params, dict):
self.fail('invalid_parameters', parameters_type=type(params))
for attr in cls_args:
if attr in params:
params_sanitized[attr] = params[attr]
try:
return cls(**params_sanitized)
except TypeError:
self.fail('missing_parameters', dependency=list(self.depends_on)[0])
class LDAPGroupTypeParamsField(fields.DictField, DependsOnMixin):
default_error_messages = {'invalid_keys': _('Invalid key(s): {invalid_keys}.')}
def to_internal_value(self, value):
value = super(LDAPGroupTypeParamsField, self).to_internal_value(value)
if not value:
return value
group_type_str = self.get_depends_on()
group_type_str = group_type_str or ''
group_type_cls = find_class_in_modules(group_type_str)
if not group_type_cls:
# Fail safe
return {}
invalid_keys = set(value.keys()) - set(inspect.getfullargspec(group_type_cls.__init__).args[1:])
if invalid_keys:
invalid_keys = sorted(list(invalid_keys))
keys_display = json.dumps(invalid_keys).lstrip('[').rstrip(']')
self.fail('invalid_keys', invalid_keys=keys_display)
return value
class LDAPUserFlagsField(fields.DictField):
default_error_messages = {'invalid_flag': _('Invalid user flag: "{invalid_flag}".')}
valid_user_flags = {'is_superuser', 'is_system_auditor'}
child = LDAPDNListField()
def to_internal_value(self, data):
data = super(LDAPUserFlagsField, self).to_internal_value(data)
invalid_flags = set(data.keys()) - self.valid_user_flags
if invalid_flags:
self.fail('invalid_flag', invalid_flag=list(invalid_flags)[0])
return data
class LDAPDNMapField(fields.StringListBooleanField):
child = LDAPDNField()
class LDAPSingleOrganizationMapField(HybridDictField):
admins = LDAPDNMapField(allow_null=True, required=False)
users = LDAPDNMapField(allow_null=True, required=False)
auditors = LDAPDNMapField(allow_null=True, required=False)
remove_admins = fields.BooleanField(required=False)
remove_users = fields.BooleanField(required=False)
remove_auditors = fields.BooleanField(required=False)
child = _Forbidden()
class LDAPOrganizationMapField(fields.DictField):
child = LDAPSingleOrganizationMapField()
class LDAPSingleTeamMapField(HybridDictField):
organization = fields.CharField()
users = LDAPDNMapField(allow_null=True, required=False)
remove = fields.BooleanField(required=False)
child = _Forbidden()
class LDAPTeamMapField(fields.DictField):
child = LDAPSingleTeamMapField()
class SocialMapStringRegexField(fields.CharField):
def to_representation(self, value):
if isinstance(value, type(re.compile(''))):

View File

@@ -1,73 +0,0 @@
# Copyright (c) 2018 Ansible by Red Hat
# All Rights Reserved.
# Python
import ldap
# Django
from django.utils.encoding import force_str
# 3rd party
from django_auth_ldap.config import LDAPGroupType
class PosixUIDGroupType(LDAPGroupType):
def __init__(self, name_attr='cn', ldap_group_user_attr='uid'):
self.ldap_group_user_attr = ldap_group_user_attr
super(PosixUIDGroupType, self).__init__(name_attr)
"""
An LDAPGroupType subclass that handles non-standard DS.
"""
def user_groups(self, ldap_user, group_search):
"""
Searches for any group that is either the user's primary or contains the
user as a member.
"""
groups = []
try:
user_uid = ldap_user.attrs[self.ldap_group_user_attr][0]
if 'gidNumber' in ldap_user.attrs:
user_gid = ldap_user.attrs['gidNumber'][0]
filterstr = u'(|(gidNumber=%s)(memberUid=%s))' % (
self.ldap.filter.escape_filter_chars(user_gid),
self.ldap.filter.escape_filter_chars(user_uid),
)
else:
filterstr = u'(memberUid=%s)' % (self.ldap.filter.escape_filter_chars(user_uid),)
search = group_search.search_with_additional_term_string(filterstr)
search.attrlist = [str(self.name_attr)]
groups = search.execute(ldap_user.connection)
except (KeyError, IndexError):
pass
return groups
def is_member(self, ldap_user, group_dn):
"""
Returns True if the group is the user's primary group or if the user is
listed in the group's memberUid attribute.
"""
is_member = False
try:
user_uid = ldap_user.attrs[self.ldap_group_user_attr][0]
try:
is_member = ldap_user.connection.compare_s(force_str(group_dn), 'memberUid', force_str(user_uid))
except (ldap.UNDEFINED_TYPE, ldap.NO_SUCH_ATTRIBUTE):
is_member = False
if not is_member:
try:
user_gid = ldap_user.attrs['gidNumber'][0]
is_member = ldap_user.connection.compare_s(force_str(group_dn), 'gidNumber', force_str(user_gid))
except (ldap.UNDEFINED_TYPE, ldap.NO_SUCH_ATTRIBUTE):
is_member = False
except (KeyError, IndexError):
is_member = False
return is_member

View File

@@ -1,115 +0,0 @@
import pytest
from awx.sso.backends import _update_m2m_from_groups
class MockLDAPGroups(object):
def is_member_of(self, group_dn):
return bool(group_dn)
class MockLDAPUser(object):
def _get_groups(self):
return MockLDAPGroups()
@pytest.mark.parametrize(
"setting, expected_result",
[
(True, True),
('something', True),
(False, False),
('', False),
],
)
def test_mock_objects(setting, expected_result):
ldap_user = MockLDAPUser()
assert ldap_user._get_groups().is_member_of(setting) == expected_result
@pytest.mark.parametrize(
"opts, remove, expected_result",
[
# In these case we will pass no opts so we should get None as a return in all cases
(
None,
False,
None,
),
(
None,
True,
None,
),
# Next lets test with empty opts ([]) This should return False if remove is True and None otherwise
(
[],
True,
False,
),
(
[],
False,
None,
),
# Next opts is True, this will always return True
(
True,
True,
True,
),
(
True,
False,
True,
),
# If we get only a non-string as an option we hit a continue and will either return None or False depending on the remove flag
(
[32],
False,
None,
),
(
[32],
True,
False,
),
# Finally we need to test whether or not a user should be allowed in or not.
# We use a mock class for ldap_user that simply returns true/false based on the otps
(
['true'],
False,
True,
),
# In this test we are going to pass a string to test the part of the code that coverts strings into array, this should give us True
(
'something',
True,
True,
),
(
[''],
False,
None,
),
(
False,
True,
False,
),
# Empty strings are considered opts == None and will result in None or False based on the remove flag
(
'',
True,
False,
),
(
'',
False,
None,
),
],
)
@pytest.mark.django_db
def test__update_m2m_from_groups(opts, remove, expected_result):
ldap_user = MockLDAPUser()
assert expected_result == _update_m2m_from_groups(ldap_user, opts, remove)

View File

@@ -293,18 +293,17 @@ class TestCommonFunctions:
assert o.galaxy_credentials.count() == 0
@pytest.mark.parametrize(
"enable_ldap, enable_social, enable_enterprise, expected_results",
"enable_social, enable_enterprise, expected_results",
[
(False, False, False, None),
(True, False, False, 'ldap'),
(True, True, False, 'social'),
(True, True, True, 'enterprise'),
(False, True, True, 'enterprise'),
(False, False, True, 'enterprise'),
(False, True, False, 'social'),
(False, False, None),
(True, False, 'social'),
(True, True, 'enterprise'),
(True, True, 'enterprise'),
(False, True, 'enterprise'),
(True, False, 'social'),
],
)
def test_get_external_account(self, enable_ldap, enable_social, enable_enterprise, expected_results):
def test_get_external_account(self, enable_social, enable_enterprise, expected_results):
try:
user = User.objects.get(username="external_tester")
except User.DoesNotExist:
@@ -312,8 +311,6 @@ class TestCommonFunctions:
user.set_unusable_password()
user.save()
if enable_ldap:
user.profile.ldap_dn = 'test.dn'
if enable_social:
from social_django.models import UserSocialAuth
@@ -337,8 +334,6 @@ class TestCommonFunctions:
[
# Set none of the social auth settings
('JUNK_SETTING', False),
# Set the hard coded settings
('AUTH_LDAP_SERVER_URI', True),
('SOCIAL_AUTH_SAML_ENABLED_IDPS', True),
('RADIUS_SERVER', True),
('TACACSPLUS_HOST', True),
@@ -366,9 +361,8 @@ class TestCommonFunctions:
"key_one, key_one_value, key_two, key_two_value, expected",
[
('JUNK_SETTING', True, 'JUNK2_SETTING', True, False),
('AUTH_LDAP_SERVER_URI', True, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True, True),
('JUNK_SETTING', True, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', True, True),
('AUTH_LDAP_SERVER_URI', False, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', False, False),
('JUNK_SETTING', True, 'SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', False, False),
],
)
def test_is_remote_auth_enabled_multiple_keys(self, key_one, key_one_value, key_two, key_two_value, expected):

View File

@@ -1,19 +0,0 @@
from django.test.utils import override_settings
import ldap
import pytest
from awx.sso.backends import LDAPSettings
@override_settings(AUTH_LDAP_CONNECTION_OPTIONS={ldap.OPT_NETWORK_TIMEOUT: 60})
@pytest.mark.django_db
def test_ldap_with_custom_timeout():
settings = LDAPSettings()
assert settings.CONNECTION_OPTIONS == {ldap.OPT_NETWORK_TIMEOUT: 60}
@override_settings(AUTH_LDAP_CONNECTION_OPTIONS={ldap.OPT_REFERRALS: 0})
@pytest.mark.django_db
def test_ldap_with_missing_timeout():
settings = LDAPSettings()
assert settings.CONNECTION_OPTIONS == {ldap.OPT_REFERRALS: 0, ldap.OPT_NETWORK_TIMEOUT: 30}

View File

@@ -1,9 +1,8 @@
import pytest
from unittest import mock
from rest_framework.exceptions import ValidationError
from awx.sso.fields import SAMLOrgAttrField, SAMLTeamAttrField, SAMLUserFlagsAttrField, LDAPGroupTypeParamsField, LDAPServerURIField
from awx.sso.fields import SAMLOrgAttrField, SAMLTeamAttrField, SAMLUserFlagsAttrField
class TestSAMLOrgAttrField:
@@ -192,44 +191,3 @@ class TestSAMLUserFlagsAttrField:
field.to_internal_value(data)
print(e.value.detail)
assert e.value.detail == expected
class TestLDAPGroupTypeParamsField:
@pytest.mark.parametrize(
"group_type, data, expected",
[
('LDAPGroupType', {'name_attr': 'user', 'bob': ['a', 'b'], 'scooter': 'hello'}, ['Invalid key(s): "bob", "scooter".']),
('MemberDNGroupType', {'name_attr': 'user', 'member_attr': 'west', 'bob': ['a', 'b'], 'scooter': 'hello'}, ['Invalid key(s): "bob", "scooter".']),
(
'PosixUIDGroupType',
{'name_attr': 'user', 'member_attr': 'west', 'ldap_group_user_attr': 'legacyThing', 'bob': ['a', 'b'], 'scooter': 'hello'},
['Invalid key(s): "bob", "member_attr", "scooter".'],
),
],
)
def test_internal_value_invalid(self, group_type, data, expected):
field = LDAPGroupTypeParamsField()
field.get_depends_on = mock.MagicMock(return_value=group_type)
with pytest.raises(ValidationError) as e:
field.to_internal_value(data)
assert e.value.detail == expected
class TestLDAPServerURIField:
@pytest.mark.parametrize(
"ldap_uri, exception, expected",
[
(r'ldap://servername.com:444', None, r'ldap://servername.com:444'),
(r'ldap://servername.so3:444', None, r'ldap://servername.so3:444'),
(r'ldaps://servername3.s300:344', None, r'ldaps://servername3.s300:344'),
(r'ldap://servername.-so3:444', ValidationError, None),
],
)
def test_run_validators_valid(self, ldap_uri, exception, expected):
field = LDAPServerURIField()
if exception is None:
assert field.run_validators(ldap_uri) == expected
else:
with pytest.raises(exception):
field.run_validators(ldap_uri)

View File

@@ -1,25 +0,0 @@
import ldap
from awx.sso.backends import LDAPSettings
from awx.sso.validators import validate_ldap_filter
from django.core.cache import cache
def test_ldap_default_settings(mocker):
from_db = mocker.Mock(**{'order_by.return_value': []})
mocker.patch('awx.conf.models.Setting.objects.filter', return_value=from_db)
settings = LDAPSettings()
assert settings.ORGANIZATION_MAP == {}
assert settings.TEAM_MAP == {}
def test_ldap_default_network_timeout(mocker):
cache.clear() # clearing cache avoids picking up stray default for OPT_REFERRALS
from_db = mocker.Mock(**{'order_by.return_value': []})
mocker.patch('awx.conf.models.Setting.objects.filter', return_value=from_db)
settings = LDAPSettings()
assert settings.CONNECTION_OPTIONS[ldap.OPT_NETWORK_TIMEOUT] == 30
def test_ldap_filter_validator():
validate_ldap_filter('(test-uid=%(user)s)', with_user=True)

View File

@@ -1,72 +1,12 @@
# Python
import re
# Python-LDAP
import ldap
# Django
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
__all__ = [
'validate_ldap_dn',
'validate_ldap_dn_with_user',
'validate_ldap_bind_dn',
'validate_ldap_filter',
'validate_ldap_filter_with_user',
'validate_tacacsplus_disallow_nonascii',
]
def validate_ldap_dn(value, with_user=False):
if with_user:
if '%(user)s' not in value:
raise ValidationError(_('DN must include "%%(user)s" placeholder for username: %s') % value)
dn_value = value.replace('%(user)s', 'USER')
else:
dn_value = value
try:
ldap.dn.str2dn(dn_value.encode('utf-8'))
except ldap.DECODING_ERROR:
raise ValidationError(_('Invalid DN: %s') % value)
def validate_ldap_dn_with_user(value):
validate_ldap_dn(value, with_user=True)
def validate_ldap_bind_dn(value):
if not re.match(r'^[A-Za-z][A-Za-z0-9._-]*?\\[A-Za-z0-9 ._-]+?$', value.strip()) and not re.match(
r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$', value.strip()
):
validate_ldap_dn(value)
def validate_ldap_filter(value, with_user=False):
value = value.strip()
if not value:
return
if with_user:
if '%(user)s' not in value:
raise ValidationError(_('DN must include "%%(user)s" placeholder for username: %s') % value)
dn_value = value.replace('%(user)s', 'USER')
else:
dn_value = value
if re.match(r'^\([A-Za-z0-9-]+?=[^()]+?\)$', dn_value):
return
elif re.match(r'^\([&|!]\(.*?\)\)$', dn_value):
try:
map(validate_ldap_filter, ['(%s)' % x for x in dn_value[3:-2].split(')(')])
return
except ValidationError:
pass
raise ValidationError(_('Invalid filter: %s') % value)
def validate_ldap_filter_with_user(value):
validate_ldap_filter(value, with_user=True)
def validate_tacacsplus_disallow_nonascii(value):
try:
value.encode('ascii')