mirror of
https://github.com/ansible/awx.git
synced 2026-01-13 11:00:03 -03:30
Disable LDAP support when not allowed by license.
This commit is contained in:
parent
a72dbe4428
commit
07205bcb88
@ -608,11 +608,12 @@ class UserSerializer(BaseSerializer):
|
||||
new_password = getattr(obj, '_new_password', None)
|
||||
# For now we're not raising an error, just not saving password for
|
||||
# users managed by LDAP who already have an unusable password set.
|
||||
try:
|
||||
if obj.pk and obj.profile.ldap_dn and not obj.has_usable_password():
|
||||
new_password = None
|
||||
except AttributeError:
|
||||
pass
|
||||
if getattr(settings, 'AUTH_LDAP_SERVER_URI', None) and feature_enabled('ldap'):
|
||||
try:
|
||||
if obj.pk and obj.profile.ldap_dn and not obj.has_usable_password():
|
||||
new_password = None
|
||||
except AttributeError:
|
||||
pass
|
||||
if new_password:
|
||||
obj.set_password(new_password)
|
||||
if not obj.password:
|
||||
@ -633,6 +634,8 @@ class UserSerializer(BaseSerializer):
|
||||
return res
|
||||
|
||||
def _validate_ldap_managed_field(self, attrs, source):
|
||||
if not getattr(settings, 'AUTH_LDAP_SERVER_URI', None) or not feature_enabled('ldap'):
|
||||
return attrs
|
||||
try:
|
||||
is_ldap_user = bool(self.object.profile.ldap_dn)
|
||||
except AttributeError:
|
||||
|
||||
@ -190,7 +190,7 @@ class ApiV1ConfigView(APIView):
|
||||
# If LDAP is enabled, user_ldap_fields will return a list of field
|
||||
# names that are managed by LDAP and should be read-only for users with
|
||||
# a non-empty ldap_dn attribute.
|
||||
if getattr(settings, 'AUTH_LDAP_SERVER_URI', None):
|
||||
if getattr(settings, 'AUTH_LDAP_SERVER_URI', None) and feature_enabled('ldap'):
|
||||
user_ldap_fields = ['username', 'password']
|
||||
user_ldap_fields.extend(getattr(settings, 'AUTH_LDAP_USER_ATTR_MAP', {}).keys())
|
||||
user_ldap_fields.extend(getattr(settings, 'AUTH_LDAP_USER_FLAGS_BY_GROUP', {}).keys())
|
||||
|
||||
@ -9,6 +9,9 @@ from django_auth_ldap.backend import LDAPSettings as BaseLDAPSettings
|
||||
from django_auth_ldap.backend import LDAPBackend as BaseLDAPBackend
|
||||
from django_auth_ldap.backend import populate_user
|
||||
|
||||
# Ansible Tower
|
||||
from awx.api.license import feature_enabled
|
||||
|
||||
class LDAPSettings(BaseLDAPSettings):
|
||||
|
||||
defaults = dict(BaseLDAPSettings.defaults.items() + {
|
||||
@ -34,12 +37,12 @@ class LDAPBackend(BaseLDAPBackend):
|
||||
settings = property(_get_settings, _set_settings)
|
||||
|
||||
def authenticate(self, username, password):
|
||||
if not self.settings.SERVER_URI:
|
||||
if not self.settings.SERVER_URI or not feature_enabled('ldap'):
|
||||
return None
|
||||
return super(LDAPBackend, self).authenticate(username, password)
|
||||
|
||||
def get_user(self, user_id):
|
||||
if not self.settings.SERVER_URI:
|
||||
if not self.settings.SERVER_URI or not feature_enabled('ldap'):
|
||||
return None
|
||||
return super(LDAPBackend, self).get_user(user_id)
|
||||
|
||||
|
||||
@ -860,6 +860,7 @@ class LdapTest(BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
super(LdapTest, self).setUp()
|
||||
self.create_test_license_file(features={'ldap': True})
|
||||
# Skip tests if basic LDAP test settings aren't defined.
|
||||
if not getattr(settings, 'TEST_AUTH_LDAP_SERVER_URI', None):
|
||||
self.skipTest('no test LDAP auth server defined')
|
||||
@ -933,6 +934,15 @@ class LdapTest(BaseTest):
|
||||
user = self.check_login()
|
||||
for attr in settings.AUTH_LDAP_USER_FLAGS_BY_GROUP.keys():
|
||||
self.assertTrue(getattr(user, attr))
|
||||
# Check that LDAP login fails when not enabled by license, but using a
|
||||
# local password will work in either case.
|
||||
user.set_password('local pass')
|
||||
user.save()
|
||||
self.check_login()
|
||||
self.check_login(password='local pass')
|
||||
self.create_test_license_file(features={'ldap': False})
|
||||
self.check_login(should_fail=True)
|
||||
self.check_login(password='local pass')
|
||||
|
||||
def test_ldap_organization_mapping(self):
|
||||
for name in ('USER_SEARCH', 'ALWAYS_UPDATE_USER', 'USER_ATTR_MAP',
|
||||
@ -1014,25 +1024,25 @@ class LdapTest(BaseTest):
|
||||
self.use_test_setting(name)
|
||||
user = self.check_login()
|
||||
self.setup_users()
|
||||
url = reverse('api:api_v1_config_view')
|
||||
config_url = reverse('api:api_v1_config_view')
|
||||
with self.current_user(self.super_django_user):
|
||||
response = self.get(url, expect=200)
|
||||
response = self.get(config_url, expect=200)
|
||||
user_ldap_fields = response.get('user_ldap_fields', [])
|
||||
self.assertTrue(user_ldap_fields)
|
||||
url = reverse('api:user_detail', args=(user.pk,))
|
||||
user_url = reverse('api:user_detail', args=(user.pk,))
|
||||
for user_field in user_ldap_fields:
|
||||
with self.current_user(self.super_django_user):
|
||||
data = self.get(url, expect=200)
|
||||
data = self.get(user_url, expect=200)
|
||||
if user_field == 'password':
|
||||
data[user_field] = 'my new password'
|
||||
with self.current_user(self.super_django_user):
|
||||
self.put(url, data, expect=200)
|
||||
self.put(user_url, data, expect=200)
|
||||
user = User.objects.get(pk=user.pk)
|
||||
self.assertFalse(user.has_usable_password())
|
||||
with self.current_user(self.super_django_user):
|
||||
self.patch(user_url, {'password': 'try again'}, expect=200)
|
||||
user = User.objects.get(pk=user.pk)
|
||||
self.assertFalse(user.has_usable_password())
|
||||
#with self.current_user(self.super_django_user):
|
||||
# self.patch(url, {'password': 'try again'}, expect=200)
|
||||
#user = User.objects.get(pk=user.pk)
|
||||
#self.assertFalse(user.has_usable_password())
|
||||
elif user_field in data:
|
||||
value = data[user_field]
|
||||
if isinstance(value, bool):
|
||||
@ -1041,7 +1051,40 @@ class LdapTest(BaseTest):
|
||||
value = unicode(value).upper()
|
||||
data[user_field] = value
|
||||
with self.current_user(self.super_django_user):
|
||||
self.put(url, data, expect=400)
|
||||
#patch_data = {user_field: data[user_field]}
|
||||
#with self.current_user(self.super_django_user):
|
||||
# self.patch(url, patch_data, expect=400)
|
||||
self.put(user_url, data, expect=400)
|
||||
patch_data = {user_field: data[user_field]}
|
||||
with self.current_user(self.super_django_user):
|
||||
self.patch(user_url, patch_data, expect=400)
|
||||
# Install a license with LDAP disabled; ldap fields should not be in
|
||||
# config and all user fields should be changeable.
|
||||
self.create_test_license_file(features={'ldap': False})
|
||||
with self.current_user(self.super_django_user):
|
||||
response = self.get(config_url, expect=200)
|
||||
self.assertFalse('user_ldap_fields' in response)
|
||||
for user_field in user_ldap_fields:
|
||||
with self.current_user(self.super_django_user):
|
||||
data = self.get(user_url, expect=200)
|
||||
if user_field == 'password':
|
||||
data[user_field] = 'my new password'
|
||||
with self.current_user(self.super_django_user):
|
||||
self.put(user_url, data, expect=200)
|
||||
user = User.objects.get(pk=user.pk)
|
||||
self.assertTrue(user.has_usable_password())
|
||||
self.assertTrue(user.check_password, 'my new password')
|
||||
with self.current_user(self.super_django_user):
|
||||
self.patch(user_url, {'password': 'try again'}, expect=200)
|
||||
user = User.objects.get(pk=user.pk)
|
||||
self.assertTrue(user.has_usable_password())
|
||||
self.assertTrue(user.check_password, 'try again')
|
||||
elif user_field in data:
|
||||
value = data[user_field]
|
||||
if isinstance(value, bool):
|
||||
value = not value
|
||||
else:
|
||||
value = unicode(value).upper()
|
||||
data[user_field] = value
|
||||
with self.current_user(self.super_django_user):
|
||||
self.put(user_url, data, expect=200)
|
||||
patch_data = {user_field: data[user_field]}
|
||||
with self.current_user(self.super_django_user):
|
||||
self.patch(user_url, patch_data, expect=200)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user