fix: do not create multiple mappers for lists of emails or usernames (#7063)

* fix: do not create multiple mappers for lists of emails or usernames

* fix: create multiple matchers, don't rely on matches_or

* fix tests

* truncate mapper names to a max of 128 chars

* better naming scheme for matchers
This commit is contained in:
Peter Braun 2025-08-20 21:10:25 +02:00 committed by thedoubl3j
parent a7eb1ef763
commit af2efec2b4
No known key found for this signature in database
GPG Key ID: E84C42ACF75B0768
2 changed files with 264 additions and 205 deletions

View File

@ -32,91 +32,112 @@ def get_role_mappers(role_map, start_order=1):
class TestProcessSSOUserList:
"""Tests for the process_sso_user_list function."""
"""Tests for the process_sso_user_list function (consolidated version)."""
def test_false_boolean(self):
"""Test that False creates 'Never Allow' trigger."""
result = process_sso_user_list(False)
assert len(result) == 1
assert result[0]["name"] == "Never Allow"
assert result[0]["trigger"] == {"never": {}}
assert result["name"] == "Never Allow"
assert result["trigger"] == {"never": {}}
def test_true_boolean(self):
"""Test that True creates 'Always Allow' trigger."""
result = process_sso_user_list(True)
assert len(result) == 1
assert result[0]["name"] == "Always Allow"
assert result[0]["trigger"] == {"always": {}}
assert result["name"] == "Always Allow"
assert result["trigger"] == {"always": {}}
def test_false_string_list(self):
"""Test that ['false'] creates 'Never Allow' trigger."""
result = process_sso_user_list(["false"])
assert len(result) == 1
assert result[0]["name"] == "Never Allow"
assert result[0]["trigger"] == {"never": {}}
assert result["name"] == "Never Allow"
assert result["trigger"] == {"never": {}}
def test_true_string_list(self):
"""Test that ['true'] creates 'Always Allow' trigger."""
result = process_sso_user_list(["true"])
assert len(result) == 1
assert result[0]["name"] == "Always Allow"
assert result[0]["trigger"] == {"always": {}}
assert result["name"] == "Always Allow"
assert result["trigger"] == {"always": {}}
def test_string_user_list(self):
"""Test that regular string users are processed correctly."""
result = process_sso_user_list(["testuser"])
assert len(result) == 1
assert result[0]["name"] == "Username equals testuser"
assert result[0]["trigger"]["attributes"]["username"]["equals"] == "testuser"
assert result["name"] == "U:1"
assert result["trigger"]["attributes"]["username"]["equals"] == "testuser"
def test_email_user_list(self):
"""Test that email addresses are processed correctly."""
result = process_sso_user_list(["test@example.com"])
assert len(result) == 1
assert result[0]["name"] == "Email Equals test@example.com"
assert result[0]["trigger"]["attributes"]["email"]["equals"] == "test@example.com"
assert result["name"] == "E:1"
assert result["trigger"]["attributes"]["email"]["equals"] == "test@example.com"
def test_mixed_string_list(self):
"""Test that mixed list with 'true', 'false', and regular users works correctly."""
result = process_sso_user_list(["true", "testuser", "false"])
# Should process each item separately, not treat the whole list as true/false
assert len(result) == 3
# Check each result
names = [r["name"] for r in result]
assert "Username equals true" in names
assert "Username equals testuser" in names
assert "Username equals false" in names
# Should consolidate all usernames and show count
assert result["name"] == "U:3"
assert result["trigger"]["attributes"]["username"]["in"] == ["true", "testuser", "false"]
def test_custom_email_username_attrs(self):
"""Test that custom email and username attributes work correctly."""
result = process_sso_user_list(["test@example.com"], email_attr='custom_email', username_attr='custom_username')
assert len(result) == 1
assert result[0]["trigger"]["attributes"]["custom_email"]["equals"] == "test@example.com"
assert result["trigger"]["attributes"]["custom_email"]["equals"] == "test@example.com"
def test_regex_pattern(self):
"""Test that regex patterns create both username and email matches."""
pattern = re.compile(r"^admin.*@example\.com$")
result = process_sso_user_list([pattern])
assert len(result) == 2
assert "Match Username" in result[0]["name"]
assert "Match Email" in result[1]["name"]
assert result[0]["trigger"]["attributes"]["username"]["matches"] == "/^admin.*@example\\.com$/"
assert result[1]["trigger"]["attributes"]["email"]["matches"] == "/^admin.*@example\\.com$/"
assert result["name"] == "UP:1 EP:1"
assert result["trigger"]["attributes"]["username"]["matches"] == "/^admin.*@example\\.com$/"
assert result["trigger"]["attributes"]["email"]["matches"] == "/^admin.*@example\\.com$/"
def test_multiple_emails(self):
"""Test that multiple emails use count-based names."""
emails = [f"user{i}@example.com" for i in range(10)]
result = process_sso_user_list(emails)
assert result["name"] == "E:10"
assert result["trigger"]["attributes"]["email"]["in"] == emails
def test_multiple_usernames(self):
"""Test that multiple usernames use count-based names."""
usernames = [f"user{i}" for i in range(8)]
result = process_sso_user_list(usernames)
assert result["name"] == "U:8"
assert result["trigger"]["attributes"]["username"]["in"] == usernames
def test_mixed_emails_and_usernames(self):
"""Test mixed emails and usernames use count-based names."""
emails = ["user1@example.com", "user2@example.com"]
usernames = ["admin1", "admin2", "admin3"]
users = emails + usernames
result = process_sso_user_list(users)
assert result["name"] == "E:2 U:3"
assert result["trigger"]["attributes"]["email"]["in"] == emails
assert result["trigger"]["attributes"]["username"]["in"] == usernames
def test_multiple_regex_patterns(self):
"""Test that multiple regex patterns use count-based names."""
patterns = [re.compile(f"pattern{i}") for i in range(5)]
result = process_sso_user_list(patterns)
assert result["name"] == "UP:5 EP:5"
def test_empty_list(self):
"""Test that empty list creates no triggers."""
"""Test that empty list creates default trigger."""
result = process_sso_user_list([])
assert len(result) == 0
assert result["name"] == "Mixed Rules"
assert result["trigger"]["attributes"]["join_condition"] == "or"
class TestProcessLdapUserList:
@ -305,7 +326,7 @@ class TestOrgMapToGatewayFormat:
assert len(result) == 1
mapping = result[0]
assert mapping["name"] == "myorg - Admins Username equals admin-username"
assert mapping["name"] == "myorg - Admins U:1"
assert mapping["triggers"] == {"attributes": {"join_condition": "or", "username": {"equals": "admin-username"}}}
assert mapping["role"] == "Organization Admin"
@ -315,11 +336,11 @@ class TestOrgMapToGatewayFormat:
result, _ = org_map_to_gateway_format(org_map)
assert len(result) == 2
assert result[0]["name"] == "myorg - Admins Username equals admin-username1"
assert result[1]["name"] == "myorg - Admins Username equals admin-username2"
assert result[0]["order"] == 1
assert result[1]["order"] == 2
assert len(result) == 1
mapping = result[0]
assert mapping["name"] == "myorg - Admins U:2"
assert mapping["triggers"]["attributes"]["username"]["in"] == ["admin-username1", "admin-username2"]
assert mapping["order"] == 1
def test_org_with_email_detection(self):
"""Test that email addresses are correctly identified and handled."""
@ -327,25 +348,16 @@ class TestOrgMapToGatewayFormat:
result, _ = org_map_to_gateway_format(org_map)
assert len(result) == 3
assert len(result) == 1
mapping = result[0]
# First mapping should be for email
email_mapping = result[0]
assert "user@example.com" in email_mapping["name"]
assert "Email Equals" in email_mapping["name"]
assert email_mapping["triggers"]["attributes"]["email"]["equals"] == "user@example.com"
# Should consolidate emails and usernames in one mapper
assert mapping["name"] == "myorg - Users E:2 U:1"
# Second mapping should be for email
email_mapping2 = result[1]
assert "admin@test.org" in email_mapping2["name"]
assert "Email Equals" in email_mapping2["name"]
assert email_mapping2["triggers"]["attributes"]["email"]["equals"] == "admin@test.org"
# Third mapping should be for username (not email)
username_mapping = result[2]
assert "not-an-email" in username_mapping["name"]
assert "Username equals" in username_mapping["name"]
assert username_mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email"
# Should have both email and username attributes
assert mapping["triggers"]["attributes"]["email"]["in"] == ["user@example.com", "admin@test.org"]
assert mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email"
assert mapping["triggers"]["attributes"]["join_condition"] == "or"
def test_org_with_remove_flags(self):
"""Test organization with remove flags."""
@ -375,16 +387,13 @@ class TestOrgMapToGatewayFormat:
result, _ = org_map_to_gateway_format(org_map)
# Should create 2 mappings - one for username match, one for email match
assert len(result) == 2, f"Expected 2 items but got: {result}"
# Should create 1 consolidated mapping with both username and email matches
assert len(result) == 1, f"Expected 1 item but got: {result}"
username_mapping = result[0]
assert "Match Username" in username_mapping["name"]
assert username_mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/"
email_mapping = result[1]
assert "Match Email" in email_mapping["name"]
assert email_mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/"
mapping = result[0]
assert mapping["name"] == "myorg - Users UP:1 EP:1"
assert mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/"
assert mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/"
def test_org_with_none_values_skipped(self):
"""Test that entries with None values are skipped."""
@ -401,12 +410,11 @@ class TestOrgMapToGatewayFormat:
result, next_order = org_map_to_gateway_format(org_map, start_order=10)
# Should have 3 mappings total
assert len(result) == 3
# Should have 2 mappings total (1 for org1, 1 for org2)
assert len(result) == 2
assert result[0]["order"] == 10
assert result[1]["order"] == 11
assert result[2]["order"] == 12
assert next_order == 13
assert next_order == 12
def test_org_comprehensive_field_validation(self):
"""Test comprehensive validation of all fields in org mappings."""
@ -419,7 +427,7 @@ class TestOrgMapToGatewayFormat:
# Validate all required fields and their types
assert isinstance(mapping["name"], str)
assert mapping["name"] == "test-org - Admins Username equals test-admin"
assert mapping["name"] == "test-org - Admins U:1"
assert mapping["map_type"] == "organization"
assert mapping["order"] == 5
@ -454,11 +462,11 @@ class TestOrgMapToGatewayFormat:
assert len(result) == 1
assert next_order == 2
# Test with multiple mappings from single org
# Test with multiple mappings from single org - now consolidated into one
org_map = {"multi": {"users": ["user1", "user2"]}}
result, next_order = org_map_to_gateway_format(org_map)
assert len(result) == 2
assert next_order == 3
assert len(result) == 1
assert next_order == 2
def test_org_with_auth_type_sso(self):
"""Test org mapping with auth_type='sso' (default behavior)."""
@ -468,7 +476,7 @@ class TestOrgMapToGatewayFormat:
assert len(result) == 1
mapping = result[0]
assert "Username equals testuser" in mapping["name"]
assert mapping["name"] == "myorg - Users U:1"
assert mapping["triggers"]["attributes"]["username"]["equals"] == "testuser"
def test_org_with_auth_type_ldap(self):
@ -530,26 +538,17 @@ class TestTeamMapToGatewayFormat:
result, _ = team_map_to_gateway_format(team_map)
# Should have 3 mappings - one for each user
assert len(result) == 3
# Should have 1 consolidated mapping
assert len(result) == 1
mapping = result[0]
# First mapping should be for email (emails are detected and use email attribute)
email_mapping = result[0]
assert "user@example.com" in email_mapping["name"]
assert "Email Equals" in email_mapping["name"]
assert email_mapping["triggers"]["attributes"]["email"]["equals"] == "user@example.com"
# Should consolidate emails and usernames in one mapper
assert mapping["name"] == "myorg - email-team E:2 U:1"
# Second mapping should be for email
email_mapping2 = result[1]
assert "admin@test.org" in email_mapping2["name"]
assert "Email Equals" in email_mapping2["name"]
assert email_mapping2["triggers"]["attributes"]["email"]["equals"] == "admin@test.org"
# Third mapping should be for username (not email)
username_mapping = result[2]
assert "not-an-email" in username_mapping["name"]
assert "Username equals" in username_mapping["name"]
assert username_mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email"
# Should have both email and username attributes
assert mapping["triggers"]["attributes"]["email"]["in"] == ["user@example.com", "admin@test.org"]
assert mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email"
assert mapping["triggers"]["attributes"]["join_condition"] == "or"
def test_team_with_custom_email_username_attrs(self):
"""Test team mapping with custom email and username attributes."""
@ -560,7 +559,7 @@ class TestTeamMapToGatewayFormat:
assert len(result) == 1
mapping = result[0]
assert mapping["triggers"]["attributes"]["custom_email"]["equals"] == "test@example.com"
assert "Email Equals" in mapping["name"]
assert mapping["name"] == "myorg - custom-team E:1"
def test_team_with_regex_pattern_objects(self):
"""Test team mapping with actual re.Pattern objects."""
@ -570,16 +569,13 @@ class TestTeamMapToGatewayFormat:
result, _ = team_map_to_gateway_format(team_map)
# Should create 2 mappings - one for username match, one for email match
assert len(result) == 2, f"Expected 2 items but got: {result}"
# Should create 1 consolidated mapping with both username and email matches
assert len(result) == 1, f"Expected 1 item but got: {result}"
username_mapping = result[0]
assert "Match Username" in username_mapping["name"]
assert username_mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/"
email_mapping = result[1]
assert "Match Email" in email_mapping["name"]
assert email_mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/"
mapping = result[0]
assert mapping["name"] == "myorg - regex-team UP:1 EP:1"
assert mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/"
assert mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/"
def test_team_with_non_string_objects(self):
"""Test team mapping with non-string objects that get converted."""
@ -593,16 +589,13 @@ class TestTeamMapToGatewayFormat:
result, _ = team_map_to_gateway_format(team_map)
# Should create 4 mappings - 2 for custom object (username + email), 2 for number
assert len(result) == 4
# Should create 1 consolidated mapping with both username and email attributes
assert len(result) == 1
# Check custom object mappings - non-string objects create both username and email mappings
custom_mappings = [r for r in result if "custom_object_string" in r["name"]]
assert len(custom_mappings) == 2
# Check number mappings - numbers also create both username and email mappings
number_mappings = [r for r in result if "12345" in r["name"]]
assert len(number_mappings) == 2
mapping = result[0]
# Both objects should be treated as usernames and emails
assert mapping["triggers"]["attributes"]["username"]["in"] == ["custom_object_string", "12345"]
assert mapping["triggers"]["attributes"]["email"]["in"] == ["custom_object_string", "12345"]
def test_team_with_mixed_data_types(self):
"""Test team mapping with mixed data types in users list."""
@ -612,15 +605,17 @@ class TestTeamMapToGatewayFormat:
result, _ = team_map_to_gateway_format(team_map)
# Should handle all different types appropriately
assert len(result) > 0
# Should create 1 consolidated mapping with all types handled
assert len(result) == 1
# Verify that each type is handled
names = [mapping["name"] for mapping in result]
assert any("string_user" in name for name in names), f"Expected to find string_user in {(', '.join(names))}"
assert any("email@test.com" in name for name in names), f"Expected to find email@test.com in {(', '.join(names))}"
assert any("999" in name for name in names), f"Expected to find 999 in {(', '.join(names))}"
assert any("True" in name for name in names), f"Expected to find True in {(', '.join(names))}"
mapping = result[0]
# All types should be consolidated into one mapper name
assert mapping["name"] == "myorg - mixed-team E:3 U:3 UP:1 EP:1"
# Verify trigger structure contains all the data types
triggers = mapping["triggers"]["attributes"]
assert "email" in triggers
assert "username" in triggers
def test_team_with_start_order_parameter(self):
"""Test that start_order parameter works correctly."""
@ -651,7 +646,7 @@ class TestTeamMapToGatewayFormat:
# Check empty team name mapping
empty_team_mapping = [m for m in result if m["team"] == ""][0]
assert "Username equals" in empty_team_mapping["name"]
assert " - " in empty_team_mapping["name"] and "U:1" in empty_team_mapping["name"]
assert empty_team_mapping["team"] == ""
# Check empty organization mapping
@ -667,14 +662,14 @@ class TestTeamMapToGatewayFormat:
result, _ = team_map_to_gateway_format(team_map)
assert len(result) == 2
assert len(result) == 1
# Verify special characters are preserved in names
for mapping in result:
assert "team-with-special!@#$%^&*()_+chars" in mapping["name"]
assert "org with spaces & symbols!" in mapping["name"]
assert mapping["team"] == "team-with-special!@#$%^&*()_+chars"
assert mapping["organization"] == "org with spaces & symbols!"
mapping = result[0]
assert "team-with-special!@#$%^&*()_+chars" in mapping["name"]
assert "org with spaces & symbols!" in mapping["name"]
assert mapping["team"] == "team-with-special!@#$%^&*()_+chars"
assert mapping["organization"] == "org with spaces & symbols!"
def test_team_with_unicode_characters(self):
"""Test team mapping with unicode characters."""
@ -687,14 +682,14 @@ class TestTeamMapToGatewayFormat:
result, _ = team_map_to_gateway_format(team_map)
assert len(result) == 2
assert len(result) == 1
# Verify unicode characters are handled correctly
for mapping in result:
assert "チーム" in mapping["name"]
assert "組織" in mapping["name"]
assert mapping["team"] == "チーム"
assert mapping["organization"] == "組織"
mapping = result[0]
assert "チーム" in mapping["name"]
assert "組織" in mapping["name"]
assert mapping["team"] == "チーム"
assert mapping["organization"] == "組織"
def test_team_next_order_calculation(self):
"""Test that next_order is calculated correctly in various scenarios."""
@ -714,11 +709,11 @@ class TestTeamMapToGatewayFormat:
assert len(result) == 1
assert next_order == 2
# Test with multiple mappings from single team
# Test with multiple mappings from single team - now consolidated into one
team_map = {"multi": {"organization": "org", "users": ["user1", "user2"]}}
result, next_order = team_map_to_gateway_format(team_map)
assert len(result) == 2
assert next_order == 3
assert len(result) == 1
assert next_order == 2
def test_team_large_dataset_performance(self):
"""Test team mapping with a large number of teams and users."""
@ -732,13 +727,13 @@ class TestTeamMapToGatewayFormat:
result, next_order = team_map_to_gateway_format(team_map)
# Should create 500 mappings (100 teams * 5 users each)
assert len(result) == 500
# Should create 100 mappings (1 per team, with consolidated users)
assert len(result) == 100
# Verify orders are sequential
orders = [mapping["order"] for mapping in result]
assert orders == list(range(1, 501))
assert next_order == 501
assert orders == list(range(1, 101))
assert next_order == 101
# Verify all teams are represented
teams = {mapping["team"] for mapping in result}
@ -794,11 +789,11 @@ class TestTeamMapToGatewayFormat:
# Should have either username or email attribute
assert ("username" in attrs) or ("email" in attrs)
# The attribute should have either "equals" or "matches"
# The attribute should have either "equals", "matches", or "has_or"
for attr_name in ["username", "email"]:
if attr_name in attrs:
attr_value = attrs[attr_name]
assert ("equals" in attr_value) or ("matches" in attr_value)
assert ("equals" in attr_value) or ("matches" in attr_value) or ("has_or" in attr_value)
def test_team_boolean_false_trigger(self):
"""Test that False users value creates never trigger correctly."""
@ -873,7 +868,7 @@ class TestTeamMapToGatewayFormat:
# String patterns should be treated as regular strings, not regex
assert len(result) == 1
mapping = result[0]
assert "Username equals" in mapping["name"]
assert mapping["name"] == "myorg - regex-team U:1"
assert mapping["triggers"]["attributes"]["username"]["equals"] == "/^admin.*@example\\.com$/"
def test_team_comprehensive_field_validation(self):
@ -887,7 +882,7 @@ class TestTeamMapToGatewayFormat:
# Validate all required fields and their types
assert isinstance(mapping["name"], str)
assert mapping["name"] == "test-org - comprehensive-team Username equals test-user"
assert mapping["name"] == "test-org - comprehensive-team U:1"
assert mapping["map_type"] == "team"
assert mapping["order"] == 5
@ -934,26 +929,26 @@ class TestTeamMapToGatewayFormat:
"""Test that ordering works correctly with mixed user types."""
team_map = {
"team1": {"organization": "org1", "users": True}, # 1 mapping
"team2": {"organization": "org2", "users": ["user1", "user2"]}, # 2 mappings
"team2": {"organization": "org2", "users": ["user1", "user2"]}, # 1 mapping (consolidated)
"team3": {"organization": "org3", "users": False}, # 1 mapping
}
result, next_order = team_map_to_gateway_format(team_map, start_order=10)
# Should have 4 total mappings
assert len(result) == 4
# Should have 3 total mappings (consolidated behavior)
assert len(result) == 3
# Orders should be sequential starting from 10
orders = [mapping["order"] for mapping in result]
assert orders == [10, 11, 12, 13]
assert next_order == 14
assert orders == [10, 11, 12]
assert next_order == 13
# Verify teams are represented correctly
teams = [mapping["team"] for mapping in result]
assert "team1" in teams
assert "team2" in teams
assert "team3" in teams
assert teams.count("team2") == 2 # team2 should appear twice
assert teams.count("team2") == 1 # team2 should appear once (consolidated)
def test_team_with_auth_type_sso(self):
"""Test team mapping with auth_type='sso' (default behavior)."""
@ -963,7 +958,7 @@ class TestTeamMapToGatewayFormat:
assert len(result) == 1
mapping = result[0]
assert "Username equals testuser" in mapping["name"]
assert mapping["name"] == "testorg - testteam U:1"
assert mapping["triggers"]["attributes"]["username"]["equals"] == "testuser"
def test_team_with_auth_type_ldap(self):

View File

@ -11,6 +11,21 @@ from typing import cast, Any, Literal, Pattern, Union
email_regex = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
def truncate_name(name: str, max_length: int = 128) -> str:
"""Truncate a name to the specified maximum length."""
if len(name) <= max_length:
return name
return name[:max_length]
def build_truncated_name(org_name: str, entity_name: str, trigger_name: str, max_component_length: int = 40) -> str:
"""Build a name by truncating each component individually and joining with ' - '."""
truncated_org = truncate_name(org_name, max_component_length)
truncated_entity = truncate_name(entity_name, max_component_length)
truncated_trigger = truncate_name(trigger_name, max_component_length)
return f"{truncated_org} - {truncated_entity} {truncated_trigger}"
def pattern_to_slash_format(pattern: Any) -> str:
"""Convert a re.Pattern object to /pattern/flags format."""
if not isinstance(pattern, re.Pattern):
@ -60,63 +75,84 @@ def process_ldap_user_list(
def process_sso_user_list(
users: Union[str, bool, Pattern[str], list[Union[str, bool, Pattern[str]]]],
email_attr: str = 'email',
username_attr: str = 'username',
) -> list[dict[str, Any]]:
users: Union[str, bool, Pattern[str], list[Union[str, bool, Pattern[str]]]], email_attr: str = 'email', username_attr: str = 'username'
) -> dict[str, Union[str, dict[str, dict[str, Union[str, list[str]]]]]]:
"""Process SSO user list and return a single consolidated trigger instead of multiple separate ones."""
if not isinstance(users, list):
users = [users]
# Type cast to help mypy understand the type after conversion
user_list: list[Union[str, bool, Pattern[str]]] = cast(list[Union[str, bool, Pattern[str]]], users)
triggers = []
if user_list == ["false"] or user_list == [False]:
triggers.append(
{
"name": "Never Allow",
"trigger": {"never": {}},
}
)
return {"name": "Never Allow", "trigger": {"never": {}}}
elif user_list == ["true"] or user_list == [True]:
triggers.append({"name": "Always Allow", "trigger": {"always": {}}})
return {"name": "Always Allow", "trigger": {"always": {}}}
else:
# Group users by type
emails = []
usernames = []
regexes_username = []
regexes_email = []
for user_or_email in user_list:
if isinstance(user_or_email, re.Pattern):
user_or_email = pattern_to_slash_format(user_or_email)
# If we got a regex it could be either a username or an email object
triggers.append(
{"name": f"Match Username {user_or_email}", "trigger": {"attributes": {"join_condition": "or", username_attr: {"matches": user_or_email}}}}
)
triggers.append(
{"name": f"Match Email {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"matches": user_or_email}}}}
)
pattern_str = pattern_to_slash_format(user_or_email)
regexes_username.append(pattern_str)
regexes_email.append(pattern_str)
elif isinstance(user_or_email, str):
# If we got a string its a direct match for either a username or an email
if email_regex.match(user_or_email):
triggers.append(
{"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": user_or_email}}}}
)
emails.append(user_or_email)
else:
triggers.append(
{
"name": f"Username equals {user_or_email}",
"trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": user_or_email}}},
}
)
usernames.append(user_or_email)
else:
# Convert other objects to string representation and assume it could be either an email or a username
# The other option we could take here would be to just error out
triggers.append(
{
"name": f"Username Equals {user_or_email}",
"trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": str(user_or_email)}}},
}
)
triggers.append(
{"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": str(user_or_email)}}}}
)
return triggers
# Convert other objects to string and treat as both
str_val = str(user_or_email)
usernames.append(str_val)
emails.append(str_val)
# Build consolidated trigger
attributes = {"join_condition": "or"}
if emails:
if len(emails) == 1:
attributes[email_attr] = {"equals": emails[0]}
else:
attributes[email_attr] = {"in": emails}
if usernames:
if len(usernames) == 1:
attributes[username_attr] = {"equals": usernames[0]}
else:
attributes[username_attr] = {"in": usernames}
# For regex patterns, we need to create separate matches conditions since there's no matches_or
for i, pattern in enumerate(regexes_username):
pattern_key = f"{username_attr}_pattern_{i}" if len(regexes_username) > 1 else username_attr
if pattern_key not in attributes:
attributes[pattern_key] = {}
attributes[pattern_key]["matches"] = pattern
for i, pattern in enumerate(regexes_email):
pattern_key = f"{email_attr}_pattern_{i}" if len(regexes_email) > 1 else email_attr
if pattern_key not in attributes:
attributes[pattern_key] = {}
attributes[pattern_key]["matches"] = pattern
# Create a deterministic, concise name based on trigger types and counts
name_parts = []
if emails:
name_parts.append(f"E:{len(emails)}")
if usernames:
name_parts.append(f"U:{len(usernames)}")
if regexes_username:
name_parts.append(f"UP:{len(regexes_username)}")
if regexes_email:
name_parts.append(f"EP:{len(regexes_email)}")
name = " ".join(name_parts) if name_parts else "Mixed Rules"
return {"name": name, "trigger": {"attributes": attributes}}
def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username', auth_type: Literal['sso', 'ldap'] = 'sso'):
@ -149,15 +185,29 @@ def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email
# Check for remove flag
revoke = team.get('remove', False)
if auth_type == 'sso':
triggers = process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr)
else:
if auth_type == 'ldap':
triggers = process_ldap_user_list(team['users'])
for trigger in triggers:
result.append(
{
"name": build_truncated_name(organization_name, team_name, trigger['name']),
"map_type": "team",
"order": order,
"authenticator": -1, # Will be updated when creating the mapper
"triggers": trigger['trigger'],
"organization": organization_name,
"team": team_name,
"role": "Team Member", # Gateway team member role
"revoke": revoke,
}
)
order += 1
for trigger in triggers:
if auth_type == 'sso':
trigger = process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr)
result.append(
{
"name": f"{organization_name} - {team_name} {trigger['name']}",
"name": build_truncated_name(organization_name, team_name, trigger['name']),
"map_type": "team",
"order": order,
"authenticator": -1, # Will be updated when creating the mapper
@ -209,15 +259,29 @@ def org_map_to_gateway_format(org_map, start_order=1, email_attr: str = 'email',
if organization.get(f"remove_{user_type}"):
revoke = True
if auth_type == 'sso':
triggers = process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr)
else:
if auth_type == 'ldap':
triggers = process_ldap_user_list(organization[user_type])
for trigger in triggers:
result.append(
{
"name": build_truncated_name(organization_name, permission_type, trigger['name']),
"map_type": "organization",
"order": order,
"authenticator": -1, # Will be updated when creating the mapper
"triggers": trigger['trigger'],
"organization": organization_name,
"team": None, # Organization-level mapping, not team-specific
"role": role,
"revoke": revoke,
}
)
order += 1
for trigger in triggers:
if auth_type == 'sso':
trigger = process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr)
result.append(
{
"name": f"{organization_name} - {permission_type} {trigger['name']}",
"name": build_truncated_name(organization_name, permission_type, trigger['name']),
"map_type": "organization",
"order": order,
"authenticator": -1, # Will be updated when creating the mapper