From af2efec2b4b33c10bd0ca855c422c5475ac12a72 Mon Sep 17 00:00:00 2001 From: Peter Braun Date: Wed, 20 Aug 2025 21:10:25 +0200 Subject: [PATCH] fix: do not create multiple mappers for lists of emails or usernames (#7063) * fix: do not create multiple mappers for lists of emails or usernames * fix: create multiple matchers, don't rely on matches_or * fix tests * truncate mapper names to a max of 128 chars * better naming scheme for matchers --- .../tests/unit/utils/test_auth_migration.py | 301 +++++++++--------- awx/main/utils/gateway_mapping.py | 168 +++++++--- 2 files changed, 264 insertions(+), 205 deletions(-) diff --git a/awx/main/tests/unit/utils/test_auth_migration.py b/awx/main/tests/unit/utils/test_auth_migration.py index 2c3a3c6204..e00bdb839b 100644 --- a/awx/main/tests/unit/utils/test_auth_migration.py +++ b/awx/main/tests/unit/utils/test_auth_migration.py @@ -32,91 +32,112 @@ def get_role_mappers(role_map, start_order=1): class TestProcessSSOUserList: - """Tests for the process_sso_user_list function.""" + """Tests for the process_sso_user_list function (consolidated version).""" def test_false_boolean(self): """Test that False creates 'Never Allow' trigger.""" result = process_sso_user_list(False) - assert len(result) == 1 - assert result[0]["name"] == "Never Allow" - assert result[0]["trigger"] == {"never": {}} + assert result["name"] == "Never Allow" + assert result["trigger"] == {"never": {}} def test_true_boolean(self): """Test that True creates 'Always Allow' trigger.""" result = process_sso_user_list(True) - assert len(result) == 1 - assert result[0]["name"] == "Always Allow" - assert result[0]["trigger"] == {"always": {}} + assert result["name"] == "Always Allow" + assert result["trigger"] == {"always": {}} def test_false_string_list(self): """Test that ['false'] creates 'Never Allow' trigger.""" result = process_sso_user_list(["false"]) - assert len(result) == 1 - assert result[0]["name"] == "Never Allow" - assert result[0]["trigger"] == {"never": {}} + assert result["name"] == "Never Allow" + assert result["trigger"] == {"never": {}} def test_true_string_list(self): """Test that ['true'] creates 'Always Allow' trigger.""" result = process_sso_user_list(["true"]) - assert len(result) == 1 - assert result[0]["name"] == "Always Allow" - assert result[0]["trigger"] == {"always": {}} + assert result["name"] == "Always Allow" + assert result["trigger"] == {"always": {}} def test_string_user_list(self): """Test that regular string users are processed correctly.""" result = process_sso_user_list(["testuser"]) - assert len(result) == 1 - assert result[0]["name"] == "Username equals testuser" - assert result[0]["trigger"]["attributes"]["username"]["equals"] == "testuser" + assert result["name"] == "U:1" + assert result["trigger"]["attributes"]["username"]["equals"] == "testuser" def test_email_user_list(self): """Test that email addresses are processed correctly.""" result = process_sso_user_list(["test@example.com"]) - assert len(result) == 1 - assert result[0]["name"] == "Email Equals test@example.com" - assert result[0]["trigger"]["attributes"]["email"]["equals"] == "test@example.com" + assert result["name"] == "E:1" + assert result["trigger"]["attributes"]["email"]["equals"] == "test@example.com" def test_mixed_string_list(self): """Test that mixed list with 'true', 'false', and regular users works correctly.""" result = process_sso_user_list(["true", "testuser", "false"]) - # Should process each item separately, not treat the whole list as true/false - assert len(result) == 3 - - # Check each result - names = [r["name"] for r in result] - assert "Username equals true" in names - assert "Username equals testuser" in names - assert "Username equals false" in names + # Should consolidate all usernames and show count + assert result["name"] == "U:3" + assert result["trigger"]["attributes"]["username"]["in"] == ["true", "testuser", "false"] def test_custom_email_username_attrs(self): """Test that custom email and username attributes work correctly.""" result = process_sso_user_list(["test@example.com"], email_attr='custom_email', username_attr='custom_username') - assert len(result) == 1 - assert result[0]["trigger"]["attributes"]["custom_email"]["equals"] == "test@example.com" + assert result["trigger"]["attributes"]["custom_email"]["equals"] == "test@example.com" def test_regex_pattern(self): """Test that regex patterns create both username and email matches.""" pattern = re.compile(r"^admin.*@example\.com$") result = process_sso_user_list([pattern]) - assert len(result) == 2 - assert "Match Username" in result[0]["name"] - assert "Match Email" in result[1]["name"] - assert result[0]["trigger"]["attributes"]["username"]["matches"] == "/^admin.*@example\\.com$/" - assert result[1]["trigger"]["attributes"]["email"]["matches"] == "/^admin.*@example\\.com$/" + assert result["name"] == "UP:1 EP:1" + assert result["trigger"]["attributes"]["username"]["matches"] == "/^admin.*@example\\.com$/" + assert result["trigger"]["attributes"]["email"]["matches"] == "/^admin.*@example\\.com$/" + + def test_multiple_emails(self): + """Test that multiple emails use count-based names.""" + emails = [f"user{i}@example.com" for i in range(10)] + result = process_sso_user_list(emails) + + assert result["name"] == "E:10" + assert result["trigger"]["attributes"]["email"]["in"] == emails + + def test_multiple_usernames(self): + """Test that multiple usernames use count-based names.""" + usernames = [f"user{i}" for i in range(8)] + result = process_sso_user_list(usernames) + + assert result["name"] == "U:8" + assert result["trigger"]["attributes"]["username"]["in"] == usernames + + def test_mixed_emails_and_usernames(self): + """Test mixed emails and usernames use count-based names.""" + emails = ["user1@example.com", "user2@example.com"] + usernames = ["admin1", "admin2", "admin3"] + users = emails + usernames + result = process_sso_user_list(users) + + assert result["name"] == "E:2 U:3" + assert result["trigger"]["attributes"]["email"]["in"] == emails + assert result["trigger"]["attributes"]["username"]["in"] == usernames + + def test_multiple_regex_patterns(self): + """Test that multiple regex patterns use count-based names.""" + patterns = [re.compile(f"pattern{i}") for i in range(5)] + result = process_sso_user_list(patterns) + + assert result["name"] == "UP:5 EP:5" def test_empty_list(self): - """Test that empty list creates no triggers.""" + """Test that empty list creates default trigger.""" result = process_sso_user_list([]) - assert len(result) == 0 + assert result["name"] == "Mixed Rules" + assert result["trigger"]["attributes"]["join_condition"] == "or" class TestProcessLdapUserList: @@ -305,7 +326,7 @@ class TestOrgMapToGatewayFormat: assert len(result) == 1 mapping = result[0] - assert mapping["name"] == "myorg - Admins Username equals admin-username" + assert mapping["name"] == "myorg - Admins U:1" assert mapping["triggers"] == {"attributes": {"join_condition": "or", "username": {"equals": "admin-username"}}} assert mapping["role"] == "Organization Admin" @@ -315,11 +336,11 @@ class TestOrgMapToGatewayFormat: result, _ = org_map_to_gateway_format(org_map) - assert len(result) == 2 - assert result[0]["name"] == "myorg - Admins Username equals admin-username1" - assert result[1]["name"] == "myorg - Admins Username equals admin-username2" - assert result[0]["order"] == 1 - assert result[1]["order"] == 2 + assert len(result) == 1 + mapping = result[0] + assert mapping["name"] == "myorg - Admins U:2" + assert mapping["triggers"]["attributes"]["username"]["in"] == ["admin-username1", "admin-username2"] + assert mapping["order"] == 1 def test_org_with_email_detection(self): """Test that email addresses are correctly identified and handled.""" @@ -327,25 +348,16 @@ class TestOrgMapToGatewayFormat: result, _ = org_map_to_gateway_format(org_map) - assert len(result) == 3 + assert len(result) == 1 + mapping = result[0] - # First mapping should be for email - email_mapping = result[0] - assert "user@example.com" in email_mapping["name"] - assert "Email Equals" in email_mapping["name"] - assert email_mapping["triggers"]["attributes"]["email"]["equals"] == "user@example.com" + # Should consolidate emails and usernames in one mapper + assert mapping["name"] == "myorg - Users E:2 U:1" - # Second mapping should be for email - email_mapping2 = result[1] - assert "admin@test.org" in email_mapping2["name"] - assert "Email Equals" in email_mapping2["name"] - assert email_mapping2["triggers"]["attributes"]["email"]["equals"] == "admin@test.org" - - # Third mapping should be for username (not email) - username_mapping = result[2] - assert "not-an-email" in username_mapping["name"] - assert "Username equals" in username_mapping["name"] - assert username_mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email" + # Should have both email and username attributes + assert mapping["triggers"]["attributes"]["email"]["in"] == ["user@example.com", "admin@test.org"] + assert mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email" + assert mapping["triggers"]["attributes"]["join_condition"] == "or" def test_org_with_remove_flags(self): """Test organization with remove flags.""" @@ -375,16 +387,13 @@ class TestOrgMapToGatewayFormat: result, _ = org_map_to_gateway_format(org_map) - # Should create 2 mappings - one for username match, one for email match - assert len(result) == 2, f"Expected 2 items but got: {result}" + # Should create 1 consolidated mapping with both username and email matches + assert len(result) == 1, f"Expected 1 item but got: {result}" - username_mapping = result[0] - assert "Match Username" in username_mapping["name"] - assert username_mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/" - - email_mapping = result[1] - assert "Match Email" in email_mapping["name"] - assert email_mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/" + mapping = result[0] + assert mapping["name"] == "myorg - Users UP:1 EP:1" + assert mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/" + assert mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/" def test_org_with_none_values_skipped(self): """Test that entries with None values are skipped.""" @@ -401,12 +410,11 @@ class TestOrgMapToGatewayFormat: result, next_order = org_map_to_gateway_format(org_map, start_order=10) - # Should have 3 mappings total - assert len(result) == 3 + # Should have 2 mappings total (1 for org1, 1 for org2) + assert len(result) == 2 assert result[0]["order"] == 10 assert result[1]["order"] == 11 - assert result[2]["order"] == 12 - assert next_order == 13 + assert next_order == 12 def test_org_comprehensive_field_validation(self): """Test comprehensive validation of all fields in org mappings.""" @@ -419,7 +427,7 @@ class TestOrgMapToGatewayFormat: # Validate all required fields and their types assert isinstance(mapping["name"], str) - assert mapping["name"] == "test-org - Admins Username equals test-admin" + assert mapping["name"] == "test-org - Admins U:1" assert mapping["map_type"] == "organization" assert mapping["order"] == 5 @@ -454,11 +462,11 @@ class TestOrgMapToGatewayFormat: assert len(result) == 1 assert next_order == 2 - # Test with multiple mappings from single org + # Test with multiple mappings from single org - now consolidated into one org_map = {"multi": {"users": ["user1", "user2"]}} result, next_order = org_map_to_gateway_format(org_map) - assert len(result) == 2 - assert next_order == 3 + assert len(result) == 1 + assert next_order == 2 def test_org_with_auth_type_sso(self): """Test org mapping with auth_type='sso' (default behavior).""" @@ -468,7 +476,7 @@ class TestOrgMapToGatewayFormat: assert len(result) == 1 mapping = result[0] - assert "Username equals testuser" in mapping["name"] + assert mapping["name"] == "myorg - Users U:1" assert mapping["triggers"]["attributes"]["username"]["equals"] == "testuser" def test_org_with_auth_type_ldap(self): @@ -530,26 +538,17 @@ class TestTeamMapToGatewayFormat: result, _ = team_map_to_gateway_format(team_map) - # Should have 3 mappings - one for each user - assert len(result) == 3 + # Should have 1 consolidated mapping + assert len(result) == 1 + mapping = result[0] - # First mapping should be for email (emails are detected and use email attribute) - email_mapping = result[0] - assert "user@example.com" in email_mapping["name"] - assert "Email Equals" in email_mapping["name"] - assert email_mapping["triggers"]["attributes"]["email"]["equals"] == "user@example.com" + # Should consolidate emails and usernames in one mapper + assert mapping["name"] == "myorg - email-team E:2 U:1" - # Second mapping should be for email - email_mapping2 = result[1] - assert "admin@test.org" in email_mapping2["name"] - assert "Email Equals" in email_mapping2["name"] - assert email_mapping2["triggers"]["attributes"]["email"]["equals"] == "admin@test.org" - - # Third mapping should be for username (not email) - username_mapping = result[2] - assert "not-an-email" in username_mapping["name"] - assert "Username equals" in username_mapping["name"] - assert username_mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email" + # Should have both email and username attributes + assert mapping["triggers"]["attributes"]["email"]["in"] == ["user@example.com", "admin@test.org"] + assert mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email" + assert mapping["triggers"]["attributes"]["join_condition"] == "or" def test_team_with_custom_email_username_attrs(self): """Test team mapping with custom email and username attributes.""" @@ -560,7 +559,7 @@ class TestTeamMapToGatewayFormat: assert len(result) == 1 mapping = result[0] assert mapping["triggers"]["attributes"]["custom_email"]["equals"] == "test@example.com" - assert "Email Equals" in mapping["name"] + assert mapping["name"] == "myorg - custom-team E:1" def test_team_with_regex_pattern_objects(self): """Test team mapping with actual re.Pattern objects.""" @@ -570,16 +569,13 @@ class TestTeamMapToGatewayFormat: result, _ = team_map_to_gateway_format(team_map) - # Should create 2 mappings - one for username match, one for email match - assert len(result) == 2, f"Expected 2 items but got: {result}" + # Should create 1 consolidated mapping with both username and email matches + assert len(result) == 1, f"Expected 1 item but got: {result}" - username_mapping = result[0] - assert "Match Username" in username_mapping["name"] - assert username_mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/" - - email_mapping = result[1] - assert "Match Email" in email_mapping["name"] - assert email_mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/" + mapping = result[0] + assert mapping["name"] == "myorg - regex-team UP:1 EP:1" + assert mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/" + assert mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/" def test_team_with_non_string_objects(self): """Test team mapping with non-string objects that get converted.""" @@ -593,16 +589,13 @@ class TestTeamMapToGatewayFormat: result, _ = team_map_to_gateway_format(team_map) - # Should create 4 mappings - 2 for custom object (username + email), 2 for number - assert len(result) == 4 + # Should create 1 consolidated mapping with both username and email attributes + assert len(result) == 1 - # Check custom object mappings - non-string objects create both username and email mappings - custom_mappings = [r for r in result if "custom_object_string" in r["name"]] - assert len(custom_mappings) == 2 - - # Check number mappings - numbers also create both username and email mappings - number_mappings = [r for r in result if "12345" in r["name"]] - assert len(number_mappings) == 2 + mapping = result[0] + # Both objects should be treated as usernames and emails + assert mapping["triggers"]["attributes"]["username"]["in"] == ["custom_object_string", "12345"] + assert mapping["triggers"]["attributes"]["email"]["in"] == ["custom_object_string", "12345"] def test_team_with_mixed_data_types(self): """Test team mapping with mixed data types in users list.""" @@ -612,15 +605,17 @@ class TestTeamMapToGatewayFormat: result, _ = team_map_to_gateway_format(team_map) - # Should handle all different types appropriately - assert len(result) > 0 + # Should create 1 consolidated mapping with all types handled + assert len(result) == 1 - # Verify that each type is handled - names = [mapping["name"] for mapping in result] - assert any("string_user" in name for name in names), f"Expected to find string_user in {(', '.join(names))}" - assert any("email@test.com" in name for name in names), f"Expected to find email@test.com in {(', '.join(names))}" - assert any("999" in name for name in names), f"Expected to find 999 in {(', '.join(names))}" - assert any("True" in name for name in names), f"Expected to find True in {(', '.join(names))}" + mapping = result[0] + # All types should be consolidated into one mapper name + assert mapping["name"] == "myorg - mixed-team E:3 U:3 UP:1 EP:1" + + # Verify trigger structure contains all the data types + triggers = mapping["triggers"]["attributes"] + assert "email" in triggers + assert "username" in triggers def test_team_with_start_order_parameter(self): """Test that start_order parameter works correctly.""" @@ -651,7 +646,7 @@ class TestTeamMapToGatewayFormat: # Check empty team name mapping empty_team_mapping = [m for m in result if m["team"] == ""][0] - assert "Username equals" in empty_team_mapping["name"] + assert " - " in empty_team_mapping["name"] and "U:1" in empty_team_mapping["name"] assert empty_team_mapping["team"] == "" # Check empty organization mapping @@ -667,14 +662,14 @@ class TestTeamMapToGatewayFormat: result, _ = team_map_to_gateway_format(team_map) - assert len(result) == 2 + assert len(result) == 1 # Verify special characters are preserved in names - for mapping in result: - assert "team-with-special!@#$%^&*()_+chars" in mapping["name"] - assert "org with spaces & symbols!" in mapping["name"] - assert mapping["team"] == "team-with-special!@#$%^&*()_+chars" - assert mapping["organization"] == "org with spaces & symbols!" + mapping = result[0] + assert "team-with-special!@#$%^&*()_+chars" in mapping["name"] + assert "org with spaces & symbols!" in mapping["name"] + assert mapping["team"] == "team-with-special!@#$%^&*()_+chars" + assert mapping["organization"] == "org with spaces & symbols!" def test_team_with_unicode_characters(self): """Test team mapping with unicode characters.""" @@ -687,14 +682,14 @@ class TestTeamMapToGatewayFormat: result, _ = team_map_to_gateway_format(team_map) - assert len(result) == 2 + assert len(result) == 1 # Verify unicode characters are handled correctly - for mapping in result: - assert "チーム" in mapping["name"] - assert "組織" in mapping["name"] - assert mapping["team"] == "チーム" - assert mapping["organization"] == "組織" + mapping = result[0] + assert "チーム" in mapping["name"] + assert "組織" in mapping["name"] + assert mapping["team"] == "チーム" + assert mapping["organization"] == "組織" def test_team_next_order_calculation(self): """Test that next_order is calculated correctly in various scenarios.""" @@ -714,11 +709,11 @@ class TestTeamMapToGatewayFormat: assert len(result) == 1 assert next_order == 2 - # Test with multiple mappings from single team + # Test with multiple mappings from single team - now consolidated into one team_map = {"multi": {"organization": "org", "users": ["user1", "user2"]}} result, next_order = team_map_to_gateway_format(team_map) - assert len(result) == 2 - assert next_order == 3 + assert len(result) == 1 + assert next_order == 2 def test_team_large_dataset_performance(self): """Test team mapping with a large number of teams and users.""" @@ -732,13 +727,13 @@ class TestTeamMapToGatewayFormat: result, next_order = team_map_to_gateway_format(team_map) - # Should create 500 mappings (100 teams * 5 users each) - assert len(result) == 500 + # Should create 100 mappings (1 per team, with consolidated users) + assert len(result) == 100 # Verify orders are sequential orders = [mapping["order"] for mapping in result] - assert orders == list(range(1, 501)) - assert next_order == 501 + assert orders == list(range(1, 101)) + assert next_order == 101 # Verify all teams are represented teams = {mapping["team"] for mapping in result} @@ -794,11 +789,11 @@ class TestTeamMapToGatewayFormat: # Should have either username or email attribute assert ("username" in attrs) or ("email" in attrs) - # The attribute should have either "equals" or "matches" + # The attribute should have either "equals", "matches", or "has_or" for attr_name in ["username", "email"]: if attr_name in attrs: attr_value = attrs[attr_name] - assert ("equals" in attr_value) or ("matches" in attr_value) + assert ("equals" in attr_value) or ("matches" in attr_value) or ("has_or" in attr_value) def test_team_boolean_false_trigger(self): """Test that False users value creates never trigger correctly.""" @@ -873,7 +868,7 @@ class TestTeamMapToGatewayFormat: # String patterns should be treated as regular strings, not regex assert len(result) == 1 mapping = result[0] - assert "Username equals" in mapping["name"] + assert mapping["name"] == "myorg - regex-team U:1" assert mapping["triggers"]["attributes"]["username"]["equals"] == "/^admin.*@example\\.com$/" def test_team_comprehensive_field_validation(self): @@ -887,7 +882,7 @@ class TestTeamMapToGatewayFormat: # Validate all required fields and their types assert isinstance(mapping["name"], str) - assert mapping["name"] == "test-org - comprehensive-team Username equals test-user" + assert mapping["name"] == "test-org - comprehensive-team U:1" assert mapping["map_type"] == "team" assert mapping["order"] == 5 @@ -934,26 +929,26 @@ class TestTeamMapToGatewayFormat: """Test that ordering works correctly with mixed user types.""" team_map = { "team1": {"organization": "org1", "users": True}, # 1 mapping - "team2": {"organization": "org2", "users": ["user1", "user2"]}, # 2 mappings + "team2": {"organization": "org2", "users": ["user1", "user2"]}, # 1 mapping (consolidated) "team3": {"organization": "org3", "users": False}, # 1 mapping } result, next_order = team_map_to_gateway_format(team_map, start_order=10) - # Should have 4 total mappings - assert len(result) == 4 + # Should have 3 total mappings (consolidated behavior) + assert len(result) == 3 # Orders should be sequential starting from 10 orders = [mapping["order"] for mapping in result] - assert orders == [10, 11, 12, 13] - assert next_order == 14 + assert orders == [10, 11, 12] + assert next_order == 13 # Verify teams are represented correctly teams = [mapping["team"] for mapping in result] assert "team1" in teams assert "team2" in teams assert "team3" in teams - assert teams.count("team2") == 2 # team2 should appear twice + assert teams.count("team2") == 1 # team2 should appear once (consolidated) def test_team_with_auth_type_sso(self): """Test team mapping with auth_type='sso' (default behavior).""" @@ -963,7 +958,7 @@ class TestTeamMapToGatewayFormat: assert len(result) == 1 mapping = result[0] - assert "Username equals testuser" in mapping["name"] + assert mapping["name"] == "testorg - testteam U:1" assert mapping["triggers"]["attributes"]["username"]["equals"] == "testuser" def test_team_with_auth_type_ldap(self): diff --git a/awx/main/utils/gateway_mapping.py b/awx/main/utils/gateway_mapping.py index 2c3c9e05ac..0d6bd2bd4e 100644 --- a/awx/main/utils/gateway_mapping.py +++ b/awx/main/utils/gateway_mapping.py @@ -11,6 +11,21 @@ from typing import cast, Any, Literal, Pattern, Union email_regex = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") +def truncate_name(name: str, max_length: int = 128) -> str: + """Truncate a name to the specified maximum length.""" + if len(name) <= max_length: + return name + return name[:max_length] + + +def build_truncated_name(org_name: str, entity_name: str, trigger_name: str, max_component_length: int = 40) -> str: + """Build a name by truncating each component individually and joining with ' - '.""" + truncated_org = truncate_name(org_name, max_component_length) + truncated_entity = truncate_name(entity_name, max_component_length) + truncated_trigger = truncate_name(trigger_name, max_component_length) + return f"{truncated_org} - {truncated_entity} {truncated_trigger}" + + def pattern_to_slash_format(pattern: Any) -> str: """Convert a re.Pattern object to /pattern/flags format.""" if not isinstance(pattern, re.Pattern): @@ -60,63 +75,84 @@ def process_ldap_user_list( def process_sso_user_list( - users: Union[str, bool, Pattern[str], list[Union[str, bool, Pattern[str]]]], - email_attr: str = 'email', - username_attr: str = 'username', -) -> list[dict[str, Any]]: + users: Union[str, bool, Pattern[str], list[Union[str, bool, Pattern[str]]]], email_attr: str = 'email', username_attr: str = 'username' +) -> dict[str, Union[str, dict[str, dict[str, Union[str, list[str]]]]]]: + """Process SSO user list and return a single consolidated trigger instead of multiple separate ones.""" if not isinstance(users, list): users = [users] # Type cast to help mypy understand the type after conversion user_list: list[Union[str, bool, Pattern[str]]] = cast(list[Union[str, bool, Pattern[str]]], users) - triggers = [] if user_list == ["false"] or user_list == [False]: - triggers.append( - { - "name": "Never Allow", - "trigger": {"never": {}}, - } - ) + return {"name": "Never Allow", "trigger": {"never": {}}} elif user_list == ["true"] or user_list == [True]: - triggers.append({"name": "Always Allow", "trigger": {"always": {}}}) + return {"name": "Always Allow", "trigger": {"always": {}}} else: + # Group users by type + emails = [] + usernames = [] + regexes_username = [] + regexes_email = [] + for user_or_email in user_list: if isinstance(user_or_email, re.Pattern): - user_or_email = pattern_to_slash_format(user_or_email) - # If we got a regex it could be either a username or an email object - triggers.append( - {"name": f"Match Username {user_or_email}", "trigger": {"attributes": {"join_condition": "or", username_attr: {"matches": user_or_email}}}} - ) - triggers.append( - {"name": f"Match Email {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"matches": user_or_email}}}} - ) + pattern_str = pattern_to_slash_format(user_or_email) + regexes_username.append(pattern_str) + regexes_email.append(pattern_str) elif isinstance(user_or_email, str): - # If we got a string its a direct match for either a username or an email if email_regex.match(user_or_email): - triggers.append( - {"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": user_or_email}}}} - ) + emails.append(user_or_email) else: - triggers.append( - { - "name": f"Username equals {user_or_email}", - "trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": user_or_email}}}, - } - ) + usernames.append(user_or_email) else: - # Convert other objects to string representation and assume it could be either an email or a username - # The other option we could take here would be to just error out - triggers.append( - { - "name": f"Username Equals {user_or_email}", - "trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": str(user_or_email)}}}, - } - ) - triggers.append( - {"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": str(user_or_email)}}}} - ) - return triggers + # Convert other objects to string and treat as both + str_val = str(user_or_email) + usernames.append(str_val) + emails.append(str_val) + + # Build consolidated trigger + attributes = {"join_condition": "or"} + + if emails: + if len(emails) == 1: + attributes[email_attr] = {"equals": emails[0]} + else: + attributes[email_attr] = {"in": emails} + + if usernames: + if len(usernames) == 1: + attributes[username_attr] = {"equals": usernames[0]} + else: + attributes[username_attr] = {"in": usernames} + + # For regex patterns, we need to create separate matches conditions since there's no matches_or + for i, pattern in enumerate(regexes_username): + pattern_key = f"{username_attr}_pattern_{i}" if len(regexes_username) > 1 else username_attr + if pattern_key not in attributes: + attributes[pattern_key] = {} + attributes[pattern_key]["matches"] = pattern + + for i, pattern in enumerate(regexes_email): + pattern_key = f"{email_attr}_pattern_{i}" if len(regexes_email) > 1 else email_attr + if pattern_key not in attributes: + attributes[pattern_key] = {} + attributes[pattern_key]["matches"] = pattern + + # Create a deterministic, concise name based on trigger types and counts + name_parts = [] + if emails: + name_parts.append(f"E:{len(emails)}") + if usernames: + name_parts.append(f"U:{len(usernames)}") + if regexes_username: + name_parts.append(f"UP:{len(regexes_username)}") + if regexes_email: + name_parts.append(f"EP:{len(regexes_email)}") + + name = " ".join(name_parts) if name_parts else "Mixed Rules" + + return {"name": name, "trigger": {"attributes": attributes}} def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username', auth_type: Literal['sso', 'ldap'] = 'sso'): @@ -149,15 +185,29 @@ def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email # Check for remove flag revoke = team.get('remove', False) - if auth_type == 'sso': - triggers = process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr) - else: + if auth_type == 'ldap': triggers = process_ldap_user_list(team['users']) + for trigger in triggers: + result.append( + { + "name": build_truncated_name(organization_name, team_name, trigger['name']), + "map_type": "team", + "order": order, + "authenticator": -1, # Will be updated when creating the mapper + "triggers": trigger['trigger'], + "organization": organization_name, + "team": team_name, + "role": "Team Member", # Gateway team member role + "revoke": revoke, + } + ) + order += 1 - for trigger in triggers: + if auth_type == 'sso': + trigger = process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr) result.append( { - "name": f"{organization_name} - {team_name} {trigger['name']}", + "name": build_truncated_name(organization_name, team_name, trigger['name']), "map_type": "team", "order": order, "authenticator": -1, # Will be updated when creating the mapper @@ -209,15 +259,29 @@ def org_map_to_gateway_format(org_map, start_order=1, email_attr: str = 'email', if organization.get(f"remove_{user_type}"): revoke = True - if auth_type == 'sso': - triggers = process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr) - else: + if auth_type == 'ldap': triggers = process_ldap_user_list(organization[user_type]) + for trigger in triggers: + result.append( + { + "name": build_truncated_name(organization_name, permission_type, trigger['name']), + "map_type": "organization", + "order": order, + "authenticator": -1, # Will be updated when creating the mapper + "triggers": trigger['trigger'], + "organization": organization_name, + "team": None, # Organization-level mapping, not team-specific + "role": role, + "revoke": revoke, + } + ) + order += 1 - for trigger in triggers: + if auth_type == 'sso': + trigger = process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr) result.append( { - "name": f"{organization_name} - {permission_type} {trigger['name']}", + "name": build_truncated_name(organization_name, permission_type, trigger['name']), "map_type": "organization", "order": order, "authenticator": -1, # Will be updated when creating the mapper