Allow multiple values in SOCIAL_AUTH_SAML_USER_FLAGS_BY_ATTR.is_*_[value|role] settings (#12558)

This commit is contained in:
John Westcott IV 2022-08-05 10:39:50 -04:00 committed by GitHub
parent 19c24cba10
commit 782667a34e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 30 deletions

View File

@ -1535,12 +1535,12 @@ register(
category_slug='saml',
placeholder=[
('is_superuser_attr', 'saml_attr'),
('is_superuser_value', 'value'),
('is_superuser_role', 'saml_role'),
('is_superuser_value', ['value']),
('is_superuser_role', ['saml_role']),
('remove_superusers', True),
('is_system_auditor_attr', 'saml_attr'),
('is_system_auditor_value', 'value'),
('is_system_auditor_role', 'saml_role'),
('is_system_auditor_value', ['value']),
('is_system_auditor_role', ['saml_role']),
('remove_system_auditors', True),
],
)

View File

@ -741,12 +741,12 @@ class SAMLTeamAttrField(HybridDictField):
class SAMLUserFlagsAttrField(HybridDictField):
is_superuser_attr = fields.CharField(required=False, allow_null=True)
is_superuser_value = fields.CharField(required=False, allow_null=True)
is_superuser_role = fields.CharField(required=False, allow_null=True)
is_superuser_value = fields.StringListField(required=False, allow_null=True)
is_superuser_role = fields.StringListField(required=False, allow_null=True)
remove_superusers = fields.BooleanField(required=False, allow_null=True)
is_system_auditor_attr = fields.CharField(required=False, allow_null=True)
is_system_auditor_value = fields.CharField(required=False, allow_null=True)
is_system_auditor_role = fields.CharField(required=False, allow_null=True)
is_system_auditor_value = fields.StringListField(required=False, allow_null=True)
is_system_auditor_role = fields.StringListField(required=False, allow_null=True)
remove_system_auditors = fields.BooleanField(required=False, allow_null=True)
child = _Forbidden()

View File

@ -0,0 +1,58 @@
from django.db import migrations, connection
import json
_values_to_change = ['is_superuser_value', 'is_superuser_role', 'is_system_auditor_value', 'is_system_auditor_role']
def _get_setting():
with connection.cursor() as cursor:
cursor.execute(f'SELECT value FROM conf_setting WHERE key= %s', ['SOCIAL_AUTH_SAML_USER_FLAGS_BY_ATTR'])
row = cursor.fetchone()
if row == None:
return {}
existing_setting = row[0]
try:
existing_json = json.loads(existing_setting)
except json.decoder.JSONDecodeError as e:
print("Failed to decode existing json setting:")
print(existing_setting)
raise e
return existing_json
def _set_setting(value):
with connection.cursor() as cursor:
cursor.execute(f'UPDATE conf_setting SET value = %s WHERE key = %s', [json.dumps(value), 'SOCIAL_AUTH_SAML_USER_FLAGS_BY_ATTR'])
def forwards(app, schema_editor):
# The Operation should use schema_editor to apply any changes it
# wants to make to the database.
existing_json = _get_setting()
for key in _values_to_change:
if existing_json.get(key, None) and isinstance(existing_json.get(key), str):
existing_json[key] = [existing_json.get(key)]
_set_setting(existing_json)
def backwards(app, schema_editor):
existing_json = _get_setting()
for key in _values_to_change:
if existing_json.get(key, None) and not isinstance(existing_json.get(key), str):
try:
existing_json[key] = existing_json.get(key).pop()
except IndexError:
existing_json[key] = ""
_set_setting(existing_json)
class Migration(migrations.Migration):
dependencies = [
('sso', '0002_expand_provider_options'),
]
operations = [
migrations.RunPython(forwards, backwards),
]

View File

@ -250,7 +250,25 @@ def update_user_teams_by_saml_attr(backend, details, user=None, *args, **kwargs)
[t.member_role.members.remove(user) for t in Team.objects.filter(Q(member_role__members=user) & ~Q(id__in=team_ids))]
def _get_matches(list1, list2):
# Because we are just doing an intersection here we don't really care which list is in which parameter
# A SAML provider could return either a string or a list of items so we need to coerce the SAML value into a list (if needed)
if not isinstance(list1, (list, tuple)):
list1 = [list1]
# In addition, we used to allow strings in the SAML config instead of Lists. The migration should take case of that but just in case, we will convert our list too
if not isinstance(list2, (list, tuple)):
list2 = [list2]
return set(list1).intersection(set(list2))
def _check_flag(user, flag, attributes, user_flags_settings):
'''
Helper function to set the is_superuser is_system_auditor flags for the SAML adapter
Returns the new flag and whether or not it changed the flag
'''
new_flag = False
is_role_key = "is_%s_role" % (flag)
is_attr_key = "is_%s_attr" % (flag)
@ -258,37 +276,35 @@ def _check_flag(user, flag, attributes, user_flags_settings):
remove_setting = "remove_%ss" % (flag)
# Check to see if we are respecting a role and, if so, does our user have that role?
role_setting = user_flags_settings.get(is_role_key, None)
if role_setting:
required_roles = user_flags_settings.get(is_role_key, None)
if required_roles:
matching_roles = _get_matches(required_roles, attributes.get('Role', []))
# We do a 2 layer check here so that we don't spit out the else message if there is no role defined
if role_setting in attributes.get('Role', []):
logger.debug("User %s has %s role %s" % (user.username, flag, role_setting))
if matching_roles:
logger.debug("User %s has %s role(s) %s" % (user.username, flag, ', '.join(matching_roles)))
new_flag = True
else:
logger.debug("User %s is missing the %s role %s" % (user.username, flag, role_setting))
logger.debug("User %s is missing the %s role(s) %s" % (user.username, flag, ', '.join(required_roles)))
# Next, check to see if we are respecting an attribute; this will take priority over the role if its defined
attr_setting = user_flags_settings.get(is_attr_key, None)
if attr_setting and attributes.get(attr_setting, None):
# Do we have a required value for the attribute
if user_flags_settings.get(is_value_key, None):
required_value = user_flags_settings.get(is_value_key, None)
if required_value:
# If so, check and see if the value of the attr matches the required value
attribute_value = attributes.get(attr_setting, None)
attribute_matches = False
if isinstance(attribute_value, (list, tuple)):
if user_flags_settings.get(is_value_key) in attribute_value:
attribute_matches = True
elif attribute_value == user_flags_settings.get(is_value_key):
attribute_matches = True
saml_user_attribute_value = attributes.get(attr_setting, None)
matching_values = _get_matches(required_value, saml_user_attribute_value)
if attribute_matches:
logger.debug("Giving %s %s from attribute %s with matching value" % (user.username, flag, attr_setting))
if matching_values:
logger.debug("Giving %s %s from attribute %s with matching values %s" % (user.username, flag, attr_setting, ', '.join(matching_values)))
new_flag = True
# if they don't match make sure that new_flag is false
else:
logger.debug(
"For %s on %s attr %s (%s) did not match expected value '%s'"
% (flag, user.username, attr_setting, attribute_value, user_flags_settings.get(is_value_key))
"Refusing %s for %s because attr %s (%s) did not match value(s) %s"
% (flag, user.username, attr_setting, ", ".join(saml_user_attribute_value), ', '.join(required_value))
)
new_flag = False
# If there was no required value then we can just allow them in because of the attribute

View File

@ -446,6 +446,10 @@ class TestSAMLUserFlags:
(False, False),
False,
),
# NOTE: The first handful of tests test role/value as string instead of lists.
# This was from the initial implementation of these fields but the code should be able to handle this
# There are a couple tests at the end of this which will validate arrays in these values.
#
# In this case we will give the user a group to make them an admin
(
{'is_superuser_role': 'test-role-1'},
@ -518,6 +522,30 @@ class TestSAMLUserFlags:
(True, False),
True,
),
# Positive test for multiple values for is_superuser_value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'else', 'junk']},
(True, True),
False,
),
# Negative test for multiple values for is_superuser_value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'junk']},
(False, True),
True,
),
# Positive test for multiple values of is_superuser_role
(
{'is_superuser_role': ['junk', 'junk2', 'something', 'junk']},
(True, True),
False,
),
# Negative test for multiple values of is_superuser_role
(
{'is_superuser_role': ['junk', 'junk2', 'junk']},
(False, True),
True,
),
],
)
def test__check_flag(self, user_flags_settings, expected, is_superuser):

View File

@ -121,12 +121,12 @@ class TestSAMLUserFlagsAttrField:
[
{},
{'is_superuser_attr': 'something'},
{'is_superuser_value': 'value'},
{'is_superuser_role': 'my_peeps'},
{'is_superuser_value': ['value']},
{'is_superuser_role': ['my_peeps']},
{'remove_superusers': False},
{'is_system_auditor_attr': 'something_else'},
{'is_system_auditor_value': 'value2'},
{'is_system_auditor_role': 'other_peeps'},
{'is_system_auditor_value': ['value2']},
{'is_system_auditor_role': ['other_peeps']},
{'remove_system_auditors': False},
],
)
@ -147,7 +147,13 @@ class TestSAMLUserFlagsAttrField:
'is_system_auditor_value': 'value2',
'is_system_auditor_role': 'other_peeps',
},
{'junk': ['Invalid field.']},
{
'junk': ['Invalid field.'],
'is_superuser_role': ['Expected a list of items but got type "str".'],
'is_superuser_value': ['Expected a list of items but got type "str".'],
'is_system_auditor_role': ['Expected a list of items but got type "str".'],
'is_system_auditor_value': ['Expected a list of items but got type "str".'],
},
),
(
{