From 505ec560c8af10b3c2c526905d0904c3f406cfb6 Mon Sep 17 00:00:00 2001 From: John Westcott IV <32551173+john-westcott-iv@users.noreply.github.com> Date: Tue, 12 Aug 2025 07:44:51 -0400 Subject: [PATCH] feat: comprehensive refactor of SSO org/team mapping for Gateway authentication export (#7047) This commit completely refactors how SSO organization and team mappings are processed and exported for Gateway authentication, moving from a group-based approach to a more flexible attribute-based system. Key Changes: - Introduced new process_sso_user_list() function for centralized user processing - Enhanced boolean handling to support both native booleans and string representations - Added email detection and regex pattern support for flexible user matching - Refactored trigger generation from groups-based to attributes-based system Gateway Mapping Enhancements (awx/main/utils/gateway_mapping.py): - Added email regex detection for automatic email vs username classification - Added pattern_to_slash_format() for regex pattern conversion - Enhanced process_sso_user_list() with support for: - Boolean values: True/False and ["true"]/["false"] - String usernames and email addresses with automatic detection - Regex patterns with both username and email matching - Custom email_attr and username_attr parameters - Refactored team_map_to_gateway_format() to use new processing system - Refactored org_map_to_gateway_format() to use new processing system - Changed trigger structure from {"groups": {"has_or": [...]}} to attribute-based triggers - Improved naming convention to include trigger type in mapping names Comprehensive Test Coverage (awx/main/tests/unit/utils/test_auth_migration.py): - Added complete TestProcessSSOUserList class with 8 comprehensive test methods - Enhanced TestOrgMapToGatewayFormat with string boolean and new functionality tests - Enhanced TestTeamMapToGatewayFormat with string boolean and new functionality tests - Added tests for email detection, regex patterns, and custom attributes - Verified backward compatibility and integration functionality - All existing tests updated to work with new attribute-based trigger system Breaking Changes: - Trigger structure changed from group-based to attribute-based - Mapping names now include trigger descriptions for better clarity - Function signatures updated to include email_attr and username_attr parameters Co-Authored with Claude-4 via Cursor Co-authored-by: Peter Braun --- .../tests/unit/utils/test_auth_migration.py | 1216 ++++++++++------- awx/main/utils/gateway_mapping.py | 205 +-- 2 files changed, 836 insertions(+), 585 deletions(-) diff --git a/awx/main/tests/unit/utils/test_auth_migration.py b/awx/main/tests/unit/utils/test_auth_migration.py index 20d7340f68..b7f4fcd674 100644 --- a/awx/main/tests/unit/utils/test_auth_migration.py +++ b/awx/main/tests/unit/utils/test_auth_migration.py @@ -3,7 +3,8 @@ Unit tests for auth migration utilities. """ import pytest -from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format, role_map_to_gateway_format +import re +from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format, role_map_to_gateway_format, process_sso_user_list def get_org_mappers(org_map, start_order=1): @@ -24,6 +25,89 @@ def get_role_mappers(role_map, start_order=1): return result +class TestProcessSSOUserList: + """Tests for the process_sso_user_list function.""" + + def test_false_boolean(self): + """Test that False creates 'Never Allow' trigger.""" + result = process_sso_user_list(False) + + assert len(result) == 1 + assert result[0]["name"] == "Never Allow" + assert result[0]["trigger"] == {"never": {}} + + def test_true_boolean(self): + """Test that True creates 'Always Allow' trigger.""" + result = process_sso_user_list(True) + + assert len(result) == 1 + assert result[0]["name"] == "Always Allow" + assert result[0]["trigger"] == {"always": {}} + + def test_false_string_list(self): + """Test that ['false'] creates 'Never Allow' trigger.""" + result = process_sso_user_list(["false"]) + + assert len(result) == 1 + assert result[0]["name"] == "Never Allow" + assert result[0]["trigger"] == {"never": {}} + + def test_true_string_list(self): + """Test that ['true'] creates 'Always Allow' trigger.""" + result = process_sso_user_list(["true"]) + + assert len(result) == 1 + assert result[0]["name"] == "Always Allow" + assert result[0]["trigger"] == {"always": {}} + + def test_string_user_list(self): + """Test that regular string users are processed correctly.""" + result = process_sso_user_list(["testuser"]) + + assert len(result) == 1 + assert result[0]["name"] == "Username equals testuser" + assert result[0]["trigger"]["attributes"]["username"]["equals"] == "testuser" + + def test_email_user_list(self): + """Test that email addresses are processed correctly.""" + result = process_sso_user_list(["test@example.com"]) + + assert len(result) == 1 + assert result[0]["name"] == "Email Equals test@example.com" + assert result[0]["trigger"]["attributes"]["email"]["equals"] == "test@example.com" + + def test_mixed_string_list(self): + """Test that mixed list with 'true', 'false', and regular users works correctly.""" + result = process_sso_user_list(["true", "testuser", "false"]) + + # Should process each item separately, not treat the whole list as true/false + assert len(result) == 3 + + # Check each result + names = [r["name"] for r in result] + assert "Username equals true" in names + assert "Username equals testuser" in names + assert "Username equals false" in names + + def test_custom_email_username_attrs(self): + """Test that custom email and username attributes work correctly.""" + result = process_sso_user_list(["test@example.com"], email_attr='custom_email', username_attr='custom_username') + + assert len(result) == 1 + assert result[0]["trigger"]["attributes"]["custom_email"]["equals"] == "test@example.com" + + def test_regex_pattern(self): + """Test that regex patterns create both username and email matches.""" + pattern = re.compile(r"^admin.*@example\.com$") + result = process_sso_user_list([pattern]) + + assert len(result) == 2 + assert "Match Username" in result[0]["name"] + assert "Match Email" in result[1]["name"] + assert result[0]["trigger"]["attributes"]["username"]["matches"] == "/^admin.*@example\\.com$/" + assert result[1]["trigger"]["attributes"]["email"]["matches"] == "/^admin.*@example\\.com$/" + + class TestOrgMapToGatewayFormat: def test_none_input(self): @@ -38,248 +122,6 @@ class TestOrgMapToGatewayFormat: assert result == [] assert next_order == 1 - def test_single_org_with_admin_true(self): - """Test organization with admin access set to True.""" - org_map = {"myorg": {"admins": True}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Admins", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Organization Admin", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_org_with_admin_false(self): - """Test organization with admin access set to False.""" - org_map = {"myorg": {"admins": False}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Admins", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"never": {}}, - "role": "Organization Admin", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_org_with_admin_string(self): - """Test organization with admin access set to a specific group.""" - org_map = {"myorg": {"admins": "admin-group"}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Admins", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"groups": {"has_or": ["admin-group"]}}, - "role": "Organization Admin", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_org_with_admin_list(self): - """Test organization with admin access set to multiple groups.""" - org_map = {"myorg": {"admins": ["admin-group1", "admin-group2"]}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Admins", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"groups": {"has_or": ["admin-group1", "admin-group2"]}}, - "role": "Organization Admin", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_org_with_users_true(self): - """Test organization with user access set to True.""" - org_map = {"myorg": {"users": True}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Users", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Organization Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_org_with_both_admin_and_users(self): - """Test organization with both admin and user mappings.""" - org_map = {"myorg": {"admins": True, "users": ["user-group"]}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Admins", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Organization Admin", - "revoke": False, - "order": 1, - }, - { - "name": "myorg - Organization Users", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"groups": {"has_or": ["user-group"]}}, - "role": "Organization Member", - "revoke": False, - "order": 2, - }, - ] - - assert result == expected - - def test_single_org_with_remove_admins(self): - """Test organization with remove_admins flag.""" - org_map = {"myorg": {"admins": True, "remove_admins": True}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Admins", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Organization Admin", - "revoke": True, - "order": 1, - } - ] - - assert result == expected - - def test_single_org_with_remove_users(self): - """Test organization with remove_users flag.""" - org_map = {"myorg": {"users": True, "remove_users": True}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Users", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Organization Member", - "revoke": True, - "order": 1, - } - ] - - assert result == expected - - def test_multiple_organizations(self): - """Test multiple organizations with different configurations.""" - org_map = {"org1": {"admins": True}, "org2": {"users": ["group1", "group2"]}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "org1 - Organization Admins", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "org1", - "triggers": {"always": {}}, - "role": "Organization Admin", - "revoke": False, - "order": 1, - }, - { - "name": "org2 - Organization Users", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "org2", - "triggers": {"groups": {"has_or": ["group1", "group2"]}}, - "role": "Organization Member", - "revoke": False, - "order": 2, - }, - ] - - assert result == expected - - def test_org_with_none_values_skipped(self): - """Test that entries with None values are skipped.""" - org_map = {"myorg": {"admins": None, "users": True}} - - result, _ = org_map_to_gateway_format(org_map) - - expected = [ - { - "name": "myorg - Organization Users", - "authenticator": -1, - "map_type": "organization", - "team": None, - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Organization Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - def test_order_increments_correctly(self): """Test that order values increment correctly.""" org_map = {"myorg": {"admins": True, "users": True}} @@ -290,27 +132,233 @@ class TestOrgMapToGatewayFormat: assert result[0]["order"] == 1 assert result[1]["order"] == 2 - def test_triggers_format_validation(self): - """Test that trigger formats match Gateway specification.""" - org_map = {"myorg": {"admins": ["group1", "group2"]}} + def test_org_with_admin_true(self): + """Test organization with admin access set to True.""" + org_map = {"myorg": {"admins": True}} result, _ = org_map_to_gateway_format(org_map) - # Validate that triggers follow Gateway format - triggers = result[0]["triggers"] - assert "groups" in triggers - assert "has_or" in triggers["groups"] - assert isinstance(triggers["groups"]["has_or"], list) - assert triggers["groups"]["has_or"] == ["group1", "group2"] + assert len(result) == 1 + mapping = result[0] + assert mapping["name"] == "myorg - Admins Always Allow" + assert mapping["map_type"] == "organization" + assert mapping["organization"] == "myorg" + assert mapping["team"] is None + assert mapping["role"] == "Organization Admin" + assert mapping["revoke"] is False + assert mapping["order"] == 1 + assert mapping["triggers"] == {"always": {}} - def test_string_to_list_conversion(self): - """Test that string groups are converted to lists.""" - org_map = {"myorg": {"users": "single-group"}} + def test_org_with_admin_false(self): + """Test organization with admin access set to False.""" + org_map = {"myorg": {"admins": False}} result, _ = org_map_to_gateway_format(org_map) - # Should convert string to list for has_or - assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"] + assert len(result) == 1 + mapping = result[0] + assert mapping["name"] == "myorg - Admins Never Allow" + assert mapping["triggers"] == {"never": {}} + assert mapping["role"] == "Organization Admin" + + def test_org_with_admin_false_string(self): + """Test organization with admin access set to ['false'].""" + org_map = {"myorg": {"admins": ["false"]}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["name"] == "myorg - Admins Never Allow" + assert mapping["triggers"] == {"never": {}} + assert mapping["role"] == "Organization Admin" + + def test_org_with_users_true_string(self): + """Test organization with users access set to ['true'].""" + org_map = {"myorg": {"users": ["true"]}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["name"] == "myorg - Users Always Allow" + assert mapping["triggers"] == {"always": {}} + assert mapping["role"] == "Organization Member" + + def test_org_with_users_true(self): + """Test organization with users access set to True.""" + org_map = {"myorg": {"users": True}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["name"] == "myorg - Users Always Allow" + assert mapping["triggers"] == {"always": {}} + assert mapping["role"] == "Organization Member" + + def test_org_with_admin_string(self): + """Test organization with admin access set to a specific group.""" + org_map = {"myorg": {"admins": "admin-username"}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["name"] == "myorg - Admins Username equals admin-username" + assert mapping["triggers"] == {"attributes": {"join_condition": "or", "username": {"equals": "admin-username"}}} + assert mapping["role"] == "Organization Admin" + + def test_org_with_admin_list(self): + """Test organization with admin access set to multiple groups.""" + org_map = {"myorg": {"admins": ["admin-username1", "admin-username2"]}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 2 + assert result[0]["name"] == "myorg - Admins Username equals admin-username1" + assert result[1]["name"] == "myorg - Admins Username equals admin-username2" + assert result[0]["order"] == 1 + assert result[1]["order"] == 2 + + def test_org_with_email_detection(self): + """Test that email addresses are correctly identified and handled.""" + org_map = {"myorg": {"users": ["user@example.com", "admin@test.org", "not-an-email"]}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 3 + + # First mapping should be for email + email_mapping = result[0] + assert "user@example.com" in email_mapping["name"] + assert "Email Equals" in email_mapping["name"] + assert email_mapping["triggers"]["attributes"]["email"]["equals"] == "user@example.com" + + # Second mapping should be for email + email_mapping2 = result[1] + assert "admin@test.org" in email_mapping2["name"] + assert "Email Equals" in email_mapping2["name"] + assert email_mapping2["triggers"]["attributes"]["email"]["equals"] == "admin@test.org" + + # Third mapping should be for username (not email) + username_mapping = result[2] + assert "not-an-email" in username_mapping["name"] + assert "Username equals" in username_mapping["name"] + assert username_mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email" + + def test_org_with_remove_flags(self): + """Test organization with remove flags.""" + org_map = {"myorg": {"admins": True, "users": ["user-group"], "remove_admins": True, "remove_users": True}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 2 + assert result[0]["revoke"] is True # admin mapping should have revoke=True + assert result[1]["revoke"] is True # user mapping should have revoke=True + + def test_org_with_custom_email_username_attrs(self): + """Test org mapping with custom email and username attributes.""" + org_map = {"myorg": {"admins": ["test@example.com"]}} + + result, _ = org_map_to_gateway_format(org_map, email_attr='custom_email', username_attr='custom_username') + + assert len(result) == 1 + mapping = result[0] + assert mapping["triggers"]["attributes"]["custom_email"]["equals"] == "test@example.com" + + def test_org_with_regex_pattern_objects(self): + """Test org mapping with actual re.Pattern objects.""" + regex_str = "^admin.*@example\\.com$" + + org_map = {"myorg": {"users": [re.compile(regex_str)]}} + + result, _ = org_map_to_gateway_format(org_map) + + # Should create 2 mappings - one for username match, one for email match + assert len(result) == 2, f"Expected 2 items but got: {result}" + + username_mapping = result[0] + assert "Match Username" in username_mapping["name"] + assert username_mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/" + + email_mapping = result[1] + assert "Match Email" in email_mapping["name"] + assert email_mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/" + + def test_org_with_none_values_skipped(self): + """Test that entries with None values are skipped.""" + org_map = {"myorg": {"admins": None, "users": True}} + + result, _ = org_map_to_gateway_format(org_map) + + assert len(result) == 1 + assert result[0]["role"] == "Organization Member" # Only users mapping should be present + + def test_org_with_start_order_parameter(self): + """Test that start_order parameter works correctly.""" + org_map = {"org1": {"admins": True}, "org2": {"users": ["username1", "username2"]}} + + result, next_order = org_map_to_gateway_format(org_map, start_order=10) + + # Should have 3 mappings total + assert len(result) == 3 + assert result[0]["order"] == 10 + assert result[1]["order"] == 11 + assert result[2]["order"] == 12 + assert next_order == 13 + + def test_org_comprehensive_field_validation(self): + """Test comprehensive validation of all fields in org mappings.""" + org_map = {"test-org": {"admins": ["test-admin"], "remove_admins": False}} + + result, next_order = org_map_to_gateway_format(org_map, start_order=5) + + assert len(result) == 1 + mapping = result[0] + + # Validate all required fields and their types + assert isinstance(mapping["name"], str) + assert mapping["name"] == "test-org - Admins Username equals test-admin" + + assert mapping["map_type"] == "organization" + assert mapping["order"] == 5 + assert mapping["authenticator"] == -1 + + assert isinstance(mapping["triggers"], dict) + assert "attributes" in mapping["triggers"] + + assert mapping["organization"] == "test-org" + assert mapping["team"] is None + assert mapping["role"] == "Organization Admin" + assert mapping["revoke"] is False + + # Validate next_order is incremented correctly + assert next_order == 6 + + def test_org_next_order_calculation(self): + """Test that next_order is calculated correctly in various scenarios.""" + # Test with no orgs + result, next_order = org_map_to_gateway_format({}) + assert next_order == 1 + + # Test with orgs that have no admins/users (should be skipped) + org_map = {"skipped": {"admins": None, "users": None}} + result, next_order = org_map_to_gateway_format(org_map) + assert len(result) == 0 + assert next_order == 1 + + # Test with single org + org_map = {"single": {"admins": True}} + result, next_order = org_map_to_gateway_format(org_map) + assert len(result) == 1 + assert next_order == 2 + + # Test with multiple mappings from single org + org_map = {"multi": {"users": ["user1", "user2"]}} + result, next_order = org_map_to_gateway_format(org_map) + assert len(result) == 2 + assert next_order == 3 class TestTeamMapToGatewayFormat: @@ -328,193 +376,6 @@ class TestTeamMapToGatewayFormat: assert result == [] assert next_order == 1 - def test_single_team_with_users_true(self): - """Test team with users access set to True.""" - team_map = {"engineering-team": {"organization": "myorg", "users": True}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "myorg - engineering-team", - "authenticator": -1, - "map_type": "team", - "team": "engineering-team", - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Team Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_team_with_users_false(self): - """Test team with users access set to False.""" - team_map = {"dev-team": {"organization": "myorg", "users": False}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "myorg - dev-team", - "authenticator": -1, - "map_type": "team", - "team": "dev-team", - "organization": "myorg", - "triggers": {"never": {}}, - "role": "Team Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_team_with_users_string(self): - """Test team with users access set to a specific group.""" - team_map = {"qa-team": {"organization": "myorg", "users": "qa-group"}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "myorg - qa-team", - "authenticator": -1, - "map_type": "team", - "team": "qa-team", - "organization": "myorg", - "triggers": {"groups": {"has_or": ["qa-group"]}}, - "role": "Team Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_single_team_with_users_list(self): - """Test team with users access set to multiple groups.""" - team_map = {"ops-team": {"organization": "myorg", "users": ["ops-group1", "ops-group2"]}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "myorg - ops-team", - "authenticator": -1, - "map_type": "team", - "team": "ops-team", - "organization": "myorg", - "triggers": {"groups": {"has_or": ["ops-group1", "ops-group2"]}}, - "role": "Team Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_team_with_remove_flag(self): - """Test team with remove flag set to True.""" - team_map = {"legacy-team": {"organization": "myorg", "users": True, "remove": True}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "myorg - legacy-team", - "authenticator": -1, - "map_type": "team", - "team": "legacy-team", - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Team Member", - "revoke": True, - "order": 1, - } - ] - - assert result == expected - - def test_team_with_no_organization(self): - """Test team without organization specified.""" - team_map = {"orphan-team": {"users": True}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "Unknown - orphan-team", - "authenticator": -1, - "map_type": "team", - "team": "orphan-team", - "organization": "Unknown", - "triggers": {"always": {}}, - "role": "Team Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - - def test_multiple_teams(self): - """Test multiple teams with different configurations.""" - team_map = {"team1": {"organization": "org1", "users": True}, "team2": {"organization": "org2", "users": ["group1", "group2"]}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "org1 - team1", - "authenticator": -1, - "map_type": "team", - "team": "team1", - "organization": "org1", - "triggers": {"always": {}}, - "role": "Team Member", - "revoke": False, - "order": 1, - }, - { - "name": "org2 - team2", - "authenticator": -1, - "map_type": "team", - "team": "team2", - "organization": "org2", - "triggers": {"groups": {"has_or": ["group1", "group2"]}}, - "role": "Team Member", - "revoke": False, - "order": 2, - }, - ] - - assert result == expected - - def test_team_with_none_users_skipped(self): - """Test that teams with None users are skipped.""" - team_map = {"skipped-team": {"organization": "myorg", "users": None}, "valid-team": {"organization": "myorg", "users": True}} - - result, _ = team_map_to_gateway_format(team_map) - - expected = [ - { - "name": "myorg - valid-team", - "authenticator": -1, - "map_type": "team", - "team": "valid-team", - "organization": "myorg", - "triggers": {"always": {}}, - "role": "Team Member", - "revoke": False, - "order": 1, - } - ] - - assert result == expected - def test_order_increments_correctly(self): """Test that order values increment correctly for teams.""" team_map = {"team1": {"organization": "myorg", "users": True}, "team2": {"organization": "myorg", "users": True}} @@ -525,69 +386,436 @@ class TestTeamMapToGatewayFormat: assert result[0]["order"] == 1 assert result[1]["order"] == 2 - def test_string_to_list_conversion(self): - """Test that string groups are converted to lists.""" - team_map = {"myteam": {"organization": "myorg", "users": "single-group"}} + def test_team_with_email_detection(self): + """Test that email addresses are correctly identified and handled.""" + team_map = {"email-team": {"organization": "myorg", "users": ["user@example.com", "admin@test.org", "not-an-email"]}} result, _ = team_map_to_gateway_format(team_map) - # Should convert string to list for has_or - assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"] + # Should have 3 mappings - one for each user + assert len(result) == 3 - def test_team_triggers_format_validation(self): - """Test that team trigger formats match Gateway specification.""" - team_map = {"myteam": {"organization": "myorg", "users": ["group1", "group2"]}} + # First mapping should be for email (emails are detected and use email attribute) + email_mapping = result[0] + assert "user@example.com" in email_mapping["name"] + assert "Email Equals" in email_mapping["name"] + assert email_mapping["triggers"]["attributes"]["email"]["equals"] == "user@example.com" + + # Second mapping should be for email + email_mapping2 = result[1] + assert "admin@test.org" in email_mapping2["name"] + assert "Email Equals" in email_mapping2["name"] + assert email_mapping2["triggers"]["attributes"]["email"]["equals"] == "admin@test.org" + + # Third mapping should be for username (not email) + username_mapping = result[2] + assert "not-an-email" in username_mapping["name"] + assert "Username equals" in username_mapping["name"] + assert username_mapping["triggers"]["attributes"]["username"]["equals"] == "not-an-email" + + def test_team_with_custom_email_username_attrs(self): + """Test team mapping with custom email and username attributes.""" + team_map = {"custom-team": {"organization": "myorg", "users": ["test@example.com"]}} + + result, _ = team_map_to_gateway_format(team_map, email_attr='custom_email', username_attr='custom_username') + + assert len(result) == 1 + mapping = result[0] + assert mapping["triggers"]["attributes"]["custom_email"]["equals"] == "test@example.com" + assert "Email Equals" in mapping["name"] + + def test_team_with_regex_pattern_objects(self): + """Test team mapping with actual re.Pattern objects.""" + regex_str = "^admin.*@example\\.com$" + + team_map = {"regex-team": {"organization": "myorg", "users": [re.compile(regex_str)]}} result, _ = team_map_to_gateway_format(team_map) - # Validate that triggers follow Gateway format - triggers = result[0]["triggers"] - assert "groups" in triggers - assert "has_or" in triggers["groups"] - assert isinstance(triggers["groups"]["has_or"], list) - assert triggers["groups"]["has_or"] == ["group1", "group2"] + # Should create 2 mappings - one for username match, one for email match + assert len(result) == 2, f"Expected 2 items but got: {result}" - def test_team_with_regex_patterns(self): - """Test that teams with regex patterns in users are handled correctly.""" + username_mapping = result[0] + assert "Match Username" in username_mapping["name"] + assert username_mapping["triggers"]["attributes"]["username"]["matches"] == f"/{regex_str}/" + + email_mapping = result[1] + assert "Match Email" in email_mapping["name"] + assert email_mapping["triggers"]["attributes"]["email"]["matches"] == f"/{regex_str}/" + + def test_team_with_non_string_objects(self): + """Test team mapping with non-string objects that get converted.""" + + class CustomObject: + def __str__(self): + return "custom_object_string" + + custom_obj = CustomObject() + team_map = {"object-team": {"organization": "myorg", "users": [custom_obj, 12345]}} + + result, _ = team_map_to_gateway_format(team_map) + + # Should create 4 mappings - 2 for custom object (username + email), 2 for number + assert len(result) == 4 + + # Check custom object mappings - non-string objects create both username and email mappings + custom_mappings = [r for r in result if "custom_object_string" in r["name"]] + assert len(custom_mappings) == 2 + + # Check number mappings - numbers also create both username and email mappings + number_mappings = [r for r in result if "12345" in r["name"]] + assert len(number_mappings) == 2 + + def test_team_with_mixed_data_types(self): + """Test team mapping with mixed data types in users list.""" + regex_str = 'test.*' + + team_map = {"mixed-team": {"organization": "myorg", "users": ["string_user", "email@test.com", re.compile(regex_str), 999, True]}} + + result, _ = team_map_to_gateway_format(team_map) + + # Should handle all different types appropriately + assert len(result) > 0 + + # Verify that each type is handled + names = [mapping["name"] for mapping in result] + assert any("string_user" in name for name in names), f"Expected to find string_user in {(', '.join(names))}" + assert any("email@test.com" in name for name in names), f"Expected to find email@test.com in {(', '.join(names))}" + assert any("999" in name for name in names), f"Expected to find 999 in {(', '.join(names))}" + assert any("True" in name for name in names), f"Expected to find True in {(', '.join(names))}" + + def test_team_with_start_order_parameter(self): + """Test that start_order parameter works correctly.""" + team_map = {"team1": {"organization": "org1", "users": True}, "team2": {"organization": "org2", "users": ["username1", "username2"]}} + + result, next_order = team_map_to_gateway_format(team_map, start_order=10) + + # First mapping should start at order 10 + assert result[0]["order"] == 10 + # Should increment properly + orders = [mapping["order"] for mapping in result] + assert orders == sorted(orders) # Should be in ascending order + assert min(orders) == 10 + # next_order should be one more than the last used order + assert next_order == max(orders) + 1 + + def test_team_with_empty_strings(self): + """Test team mapping with empty strings.""" team_map = { - "My Team": {"organization": "Test Org", "users": ["/^[^@]+?@test\\.example\\.com$/"], "remove": True}, - "Other Team": {"organization": "Test Org 2", "users": ["/^[^@]+?@test\\.example\\.com$/"], "remove": False}, + "": {"organization": "myorg", "users": [""]}, # Empty team name and user + "normal-team": {"organization": "", "users": True}, # Empty organization } result, _ = team_map_to_gateway_format(team_map) - expected = [ - { - "name": "Test Org - My Team", - "authenticator": -1, - "map_type": "team", - "team": "My Team", - "organization": "Test Org", - "triggers": {"groups": {"has_or": ["/^[^@]+?@test\\.example\\.com$/"]}}, - "role": "Team Member", - "revoke": True, - "order": 1, - }, - { - "name": "Test Org 2 - Other Team", - "authenticator": -1, - "map_type": "team", - "team": "Other Team", - "organization": "Test Org 2", - "triggers": {"groups": {"has_or": ["/^[^@]+?@test\\.example\\.com$/"]}}, - "role": "Team Member", - "revoke": False, - "order": 2, - }, - ] + # Should handle empty strings gracefully + assert len(result) == 2 - assert result == expected + # Check empty team name mapping + empty_team_mapping = [m for m in result if m["team"] == ""][0] + assert "Username equals" in empty_team_mapping["name"] + assert empty_team_mapping["team"] == "" - # Validate that the result is JSON serializable - import json + # Check empty organization mapping + empty_org_mapping = [m for m in result if m["organization"] == ""][0] + assert empty_org_mapping["organization"] == "" + assert "Always Allow" in empty_org_mapping["name"] - json_str = json.dumps(result) - assert json_str is not None + def test_team_with_special_characters(self): + """Test team mapping with special characters in names.""" + team_map = { + "team-with-special!@#$%^&*()_+chars": {"organization": "org with spaces & symbols!", "users": ["user@domain.com", "user-with-special!chars"]} + } + + result, _ = team_map_to_gateway_format(team_map) + + assert len(result) == 2 + + # Verify special characters are preserved in names + for mapping in result: + assert "team-with-special!@#$%^&*()_+chars" in mapping["name"] + assert "org with spaces & symbols!" in mapping["name"] + assert mapping["team"] == "team-with-special!@#$%^&*()_+chars" + assert mapping["organization"] == "org with spaces & symbols!" + + def test_team_with_unicode_characters(self): + """Test team mapping with unicode characters.""" + team_map = { + "チーム": { # Japanese for "team" + "organization": "組織", # Japanese for "organization" + "users": ["ユーザー@example.com", "用户"], # Mixed Japanese/Chinese + } + } + + result, _ = team_map_to_gateway_format(team_map) + + assert len(result) == 2 + + # Verify unicode characters are handled correctly + for mapping in result: + assert "チーム" in mapping["name"] + assert "組織" in mapping["name"] + assert mapping["team"] == "チーム" + assert mapping["organization"] == "組織" + + def test_team_next_order_calculation(self): + """Test that next_order is calculated correctly in various scenarios.""" + # Test with no teams + result, next_order = team_map_to_gateway_format({}) + assert next_order == 1 + + # Test with teams that have no users (should be skipped) + team_map = {"skipped": {"organization": "org", "users": None}} + result, next_order = team_map_to_gateway_format(team_map) + assert len(result) == 0 + assert next_order == 1 + + # Test with single team + team_map = {"single": {"organization": "org", "users": True}} + result, next_order = team_map_to_gateway_format(team_map) + assert len(result) == 1 + assert next_order == 2 + + # Test with multiple mappings from single team + team_map = {"multi": {"organization": "org", "users": ["user1", "user2"]}} + result, next_order = team_map_to_gateway_format(team_map) + assert len(result) == 2 + assert next_order == 3 + + def test_team_large_dataset_performance(self): + """Test team mapping with a large number of teams and users.""" + # Create a large team map + team_map = {} + for i in range(100): + team_map[f"team_{i}"] = { + "organization": f"org_{i % 10}", # 10 different orgs + "users": [f"user_{j}@org_{i % 10}.com" for j in range(5)], # 5 users per team + } + + result, next_order = team_map_to_gateway_format(team_map) + + # Should create 500 mappings (100 teams * 5 users each) + assert len(result) == 500 + + # Verify orders are sequential + orders = [mapping["order"] for mapping in result] + assert orders == list(range(1, 501)) + assert next_order == 501 + + # Verify all teams are represented + teams = {mapping["team"] for mapping in result} + assert len(teams) == 100 + + def test_team_mapping_field_validation(self): + """Test that all required fields are present and have correct types.""" + team_map = {"validation-team": {"organization": "test-org", "users": ["test@example.com"], "remove": True}} + + result, _ = team_map_to_gateway_format(team_map) + + for mapping in result: + # Check required fields exist + required_fields = ["name", "map_type", "order", "authenticator", "triggers", "organization", "team", "role", "revoke"] + for field in required_fields: + assert field in mapping, f"Missing required field: {field}" + + # Check field types + assert isinstance(mapping["name"], str) + assert isinstance(mapping["map_type"], str) + assert isinstance(mapping["order"], int) + assert isinstance(mapping["authenticator"], int) + assert isinstance(mapping["triggers"], dict) + assert isinstance(mapping["organization"], str) + assert isinstance(mapping["team"], str) + assert isinstance(mapping["role"], str) + assert isinstance(mapping["revoke"], bool) + + # Check specific values + assert mapping["map_type"] == "team" + assert mapping["authenticator"] == -1 + assert mapping["role"] == "Team Member" + assert mapping["revoke"] == True # Because remove was set to True + + def test_team_trigger_structure_validation(self): + """Test that trigger structures are correctly formatted.""" + team_map = {"trigger-test": {"organization": "org", "users": ["test@example.com", "username"]}} + + result, _ = team_map_to_gateway_format(team_map) + + for mapping in result: + triggers = mapping["triggers"] + + if "always" in triggers: + assert triggers["always"] == {} + elif "never" in triggers: + assert triggers["never"] == {} + elif "attributes" in triggers: + attrs = triggers["attributes"] + assert "join_condition" in attrs + assert attrs["join_condition"] == "or" # Implementation uses 'or' + + # Should have either username or email attribute + assert ("username" in attrs) or ("email" in attrs) + + # The attribute should have either "equals" or "matches" + for attr_name in ["username", "email"]: + if attr_name in attrs: + attr_value = attrs[attr_name] + assert ("equals" in attr_value) or ("matches" in attr_value) + + def test_team_boolean_false_trigger(self): + """Test that False users value creates never trigger correctly.""" + team_map = {"never-team": {"organization": "org", "users": False}} + + result, _ = team_map_to_gateway_format(team_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["triggers"] == {"never": {}} + assert "Never Allow" in mapping["name"] + + def test_team_boolean_true_trigger(self): + """Test that True users value creates always trigger correctly.""" + team_map = {"always-team": {"organization": "org", "users": True}} + + result, _ = team_map_to_gateway_format(team_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["triggers"] == {"always": {}} + assert "Always Allow" in mapping["name"] + + def test_team_string_false_trigger(self): + """Test that ['false'] users value creates never trigger correctly.""" + team_map = {"never-team": {"organization": "org", "users": ["false"]}} + + result, _ = team_map_to_gateway_format(team_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["triggers"] == {"never": {}} + assert "Never Allow" in mapping["name"] + + def test_team_string_true_trigger(self): + """Test that ['true'] users value creates always trigger correctly.""" + team_map = {"always-team": {"organization": "org", "users": ["true"]}} + + result, _ = team_map_to_gateway_format(team_map) + + assert len(result) == 1 + mapping = result[0] + assert mapping["triggers"] == {"always": {}} + assert "Always Allow" in mapping["name"] + + def test_team_with_join_condition_or(self): + """Test that all attribute-based triggers use 'or' join condition.""" + team_map = {"test-team": {"organization": "org", "users": ["user1", "user2"]}} + + result, _ = team_map_to_gateway_format(team_map) + + for mapping in result: + if "attributes" in mapping["triggers"]: + assert mapping["triggers"]["attributes"]["join_condition"] == "or" + + def test_team_with_default_organization_fallback(self): + """Test that teams without organization get 'Unknown' as default.""" + team_map = {"orphan-team": {"users": ["user1"]}} + + result, _ = team_map_to_gateway_format(team_map) + + assert len(result) == 1 + assert result[0]["organization"] == "Unknown" + assert "Unknown - orphan-team" in result[0]["name"] + + def test_team_with_regex_string_patterns(self): + """Test team mapping with regex patterns as strings (not compiled patterns).""" + team_map = {"regex-team": {"organization": "myorg", "users": ["/^admin.*@example\\.com$/"]}} + + result, _ = team_map_to_gateway_format(team_map) + + # String patterns should be treated as regular strings, not regex + assert len(result) == 1 + mapping = result[0] + assert "Username equals" in mapping["name"] + assert mapping["triggers"]["attributes"]["username"]["equals"] == "/^admin.*@example\\.com$/" + + def test_team_comprehensive_field_validation(self): + """Test comprehensive validation of all fields in team mappings.""" + team_map = {"comprehensive-team": {"organization": "test-org", "users": ["test-user"], "remove": False}} + + result, next_order = team_map_to_gateway_format(team_map, start_order=5) + + assert len(result) == 1 + mapping = result[0] + + # Validate all required fields and their types + assert isinstance(mapping["name"], str) + assert mapping["name"] == "test-org - comprehensive-team Username equals test-user" + + assert mapping["map_type"] == "team" + assert mapping["order"] == 5 + assert mapping["authenticator"] == -1 + + assert isinstance(mapping["triggers"], dict) + assert "attributes" in mapping["triggers"] + + assert mapping["organization"] == "test-org" + assert mapping["team"] == "comprehensive-team" + assert mapping["role"] == "Team Member" + assert mapping["revoke"] == False + + # Validate next_order is incremented correctly + assert next_order == 6 + + def test_team_with_none_and_remove_flag(self): + """Test that teams with None users are skipped even with remove flag.""" + team_map = { + "skipped-team": {"organization": "org", "users": None, "remove": True}, + "valid-team": {"organization": "org", "users": True, "remove": True}, + } + + result, _ = team_map_to_gateway_format(team_map) + + # Should only have one result (the valid team) + assert len(result) == 1 + assert result[0]["team"] == "valid-team" + assert result[0]["revoke"] == True + + def test_team_error_handling_edge_cases(self): + """Test various edge cases for error handling.""" + # Test with completely empty team config + team_map = {"empty-team": {}} + + try: + _, _ = team_map_to_gateway_format(team_map) + # Should not crash, but might skip the team due to missing 'users' key + except KeyError: + # This is expected if 'users' key is required + pass + + def test_team_ordering_with_mixed_types(self): + """Test that ordering works correctly with mixed user types.""" + team_map = { + "team1": {"organization": "org1", "users": True}, # 1 mapping + "team2": {"organization": "org2", "users": ["user1", "user2"]}, # 2 mappings + "team3": {"organization": "org3", "users": False}, # 1 mapping + } + + result, next_order = team_map_to_gateway_format(team_map, start_order=10) + + # Should have 4 total mappings + assert len(result) == 4 + + # Orders should be sequential starting from 10 + orders = [mapping["order"] for mapping in result] + assert orders == [10, 11, 12, 13] + assert next_order == 14 + + # Verify teams are represented correctly + teams = [mapping["team"] for mapping in result] + assert "team1" in teams + assert "team2" in teams + assert "team3" in teams + assert teams.count("team2") == 2 # team2 should appear twice # Parametrized tests for edge cases @@ -614,7 +842,7 @@ def test_org_map_result_lengths(org_map, expected_length): "org_map", [ {"org1": {"admins": True}}, - {"org1": {"users": ["group1"]}}, + {"org1": {"users": ["username1"]}}, {"org1": {"admins": False}}, ], ) @@ -654,7 +882,7 @@ def test_gateway_format_compliance(org_map): ({}, 0), ({"team1": {"organization": "org1", "users": None}}, 0), # Team with None users should be skipped ({"team1": {"organization": "org1", "users": True}}, 1), - ({"team1": {"organization": "org1", "users": ["group1"]}}, 1), + ({"team1": {"organization": "org1", "users": ["username1"]}}, 1), ({"team1": {"organization": "org1", "users": True}, "team2": {"organization": "org2", "users": False}}, 2), ], ) @@ -669,7 +897,7 @@ def test_team_map_result_lengths(team_map, expected_length): "team_map", [ {"team1": {"organization": "org1", "users": True}}, - {"team1": {"organization": "org1", "users": ["group1"]}}, + {"team1": {"organization": "org1", "users": ["username1"]}}, {"team1": {"organization": "org1", "users": False}}, ], ) diff --git a/awx/main/utils/gateway_mapping.py b/awx/main/utils/gateway_mapping.py index 8cc8f6f332..c559a431dd 100644 --- a/awx/main/utils/gateway_mapping.py +++ b/awx/main/utils/gateway_mapping.py @@ -5,13 +5,93 @@ This module contains functions to convert AWX authentication mappings (organization and team mappings) to AAP Gateway format. """ +import re +from typing import Union -def team_map_to_gateway_format(team_map, start_order=1): +email_regex = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") + + +def pattern_to_slash_format(pattern): + """Convert a re.Pattern object to /pattern/flags format.""" + if not isinstance(pattern, re.Pattern): + return str(pattern) + + flags_str = "" + if pattern.flags & re.IGNORECASE: + flags_str += "i" + if pattern.flags & re.MULTILINE: + flags_str += "m" + if pattern.flags & re.DOTALL: + flags_str += "s" + if pattern.flags & re.VERBOSE: + flags_str += "x" + + return f"/{pattern.pattern}/{flags_str}" + + +def process_sso_user_list( + users: Union[str, list[str], bool], email_attr: str = 'email', username_attr: str = 'username' +) -> list[dict[str : Union[str, dict[str : dict[str:str]]]]]: + if type(users) is str: + users = [users] + + triggers = [] + if users in [False, ["false"]]: + triggers.append( + { + "name": "Never Allow", + "trigger": {"never": {}}, + } + ) + elif users in [True, ["true"]]: + triggers.append({"name": "Always Allow", "trigger": {"always": {}}}) + else: + for user_or_email in users: + if isinstance(user_or_email, re.Pattern): + user_or_email = pattern_to_slash_format(user_or_email) + # If we got a regex it could be either a username or an email object + triggers.append( + {"name": f"Match Username {user_or_email}", "trigger": {"attributes": {"join_condition": "or", username_attr: {"matches": user_or_email}}}} + ) + triggers.append( + {"name": f"Match Email {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"matches": user_or_email}}}} + ) + elif isinstance(user_or_email, str): + # If we got a string its a direct match for either a username or an email + if email_regex.match(user_or_email): + triggers.append( + {"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": user_or_email}}}} + ) + else: + triggers.append( + { + "name": f"Username equals {user_or_email}", + "trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": user_or_email}}}, + } + ) + else: + # Convert other objects to string representation and assume it could be either an email or a username + # The other option we could take here would be to just error out + triggers.append( + { + "name": f"Username Equals {user_or_email}", + "trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": str(user_or_email)}}}, + } + ) + triggers.append( + {"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": str(user_or_email)}}}} + ) + return triggers + + +def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username'): """Convert AWX team mapping to Gateway authenticator format. Args: team_map: The SOCIAL_AUTH_*_TEAM_MAP setting value start_order: Starting order value for the mappers + email_attr: The attribute representing the email + username_attr: The attribute representing the username Returns: tuple: (List of Gateway-compatible team mappers, next_order) @@ -28,65 +108,39 @@ def team_map_to_gateway_format(team_map, start_order=1): if team['users'] is None: continue - if team['users'] is False: - triggers = {"never": {}} - elif team['users'] is True: - triggers = {"always": {}} - else: - import re - - # Handle the case where the value itself is a regex pattern - if isinstance(team['users'], re.Pattern): - # Convert single regex pattern to string in a list - triggers = {"groups": {"has_or": [str(team['users'])]}} - else: - # Handle list or string values - if type(team['users']) is str: - team['users'] = [team['users']] - - # Convert any non-string items to strings (e.g., regex patterns) - users_list = [] - for user in team['users']: - if isinstance(user, str): - users_list.append(user) - elif isinstance(user, re.Pattern): - # Convert regex patterns to string representation - users_list.append(str(user.pattern)) - else: - # Convert other objects to string representation - users_list.append(str(user)) - - triggers = {"groups": {"has_or": users_list}} - + # Get the organization name organization_name = team.get('organization', 'Unknown') + # Check for remove flag revoke = team.get('remove', False) - result.append( - { - "name": f"{organization_name} - {team_name}", - "map_type": "team", - "order": order, - "authenticator": -1, # Will be updated when creating the mapper - "triggers": triggers, - "organization": organization_name, - "team": team_name, - "role": "Team Member", # Gateway team member role - "revoke": revoke, - } - ) - - order += 1 + for trigger in process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr): + result.append( + { + "name": f"{organization_name} - {team_name} {trigger['name']}", + "map_type": "team", + "order": order, + "authenticator": -1, # Will be updated when creating the mapper + "triggers": trigger['trigger'], + "organization": organization_name, + "team": team_name, + "role": "Team Member", # Gateway team member role + "revoke": revoke, + } + ) + order += 1 return result, order -def org_map_to_gateway_format(org_map, start_order=1): +def org_map_to_gateway_format(org_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username'): """Convert AWX organization mapping to Gateway authenticator format. Args: org_map: The SOCIAL_AUTH_*_ORGANIZATION_MAP setting value start_order: Starting order value for the mappers + email_attr: The attribute representing the email + username_attr: The attribute representing the username Returns: tuple: (List of Gateway-compatible organization mappers, next_order) @@ -100,66 +154,35 @@ def org_map_to_gateway_format(org_map, start_order=1): for organization_name in org_map.keys(): organization = org_map[organization_name] for user_type in ['admins', 'users']: - if user_type in organization: + if organization.get(user_type, None) is None: # TODO: Confirm that if we have None with remove we still won't remove - if organization[user_type] is None: - continue + continue - if organization[user_type] is False: - triggers = {"never": {}} - elif organization[user_type] is True: - triggers = {"always": {}} - else: - import re + # Get the permission type + permission_type = user_type.title() - # Handle the case where the value itself is a regex pattern - if isinstance(organization[user_type], re.Pattern): - # Convert single regex pattern to string in a list - triggers = {"groups": {"has_or": [str(organization[user_type])]}} - else: - # Handle list or string values - if type(organization[user_type]) is str: - organization[user_type] = [organization[user_type]] + # Map AWX admin/users to appropriate Gateway organization roles + role = "Organization Admin" if user_type == "admins" else "Organization Member" - # Convert any non-string items to strings (e.g., regex patterns) - users_list = [] - for user in organization[user_type]: - if isinstance(user, str): - users_list.append(user) - elif isinstance(user, re.Pattern): - # Convert regex patterns to string representation - users_list.append(str(user.pattern)) - else: - # Convert other objects to string representation - users_list.append(str(user)) - - triggers = {"groups": {"has_or": users_list}} - - team_name = f"Organization {user_type.title()}" - # Map AWX admin/users to appropriate Gateway organization roles - role = "Organization Admin" if user_type == "admins" else "Organization Member" - - # Check for remove flags - revoke = False - if user_type == "admins" and organization.get("remove_admins"): - revoke = True - elif user_type == "users" and organization.get("remove_users"): - revoke = True + # Check for remove flags + revoke = False + if organization.get(f"remove_{user_type}"): + revoke = True + for trigger in process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr): result.append( { - "name": f"{organization_name} - {team_name}", + "name": f"{organization_name} - {permission_type} {trigger['name']}", "map_type": "organization", "order": order, "authenticator": -1, # Will be updated when creating the mapper - "triggers": triggers, + "triggers": trigger['trigger'], "organization": organization_name, "team": None, # Organization-level mapping, not team-specific "role": role, "revoke": revoke, } ) - order += 1 return result, order