diff --git a/awx/main/management/commands/import_auth_config_to_gateway.py b/awx/main/management/commands/import_auth_config_to_gateway.py index edc51fc546..a03097245f 100644 --- a/awx/main/management/commands/import_auth_config_to_gateway.py +++ b/awx/main/management/commands/import_auth_config_to_gateway.py @@ -4,6 +4,7 @@ import os from django.core.management.base import BaseCommand from awx.sso.utils.azure_ad_migrator import AzureADMigrator from awx.sso.utils.github_migrator import GitHubMigrator +from awx.sso.utils.ldap_migrator import LDAPMigrator from awx.sso.utils.oidc_migrator import OIDCMigrator from awx.sso.utils.saml_migrator import SAMLMigrator from awx.main.utils.gateway_client import GatewayClient, GatewayAPIError @@ -16,6 +17,7 @@ class Command(BaseCommand): parser.add_argument('--skip-oidc', action='store_true', help='Skip importing GitHub and generic OIDC authenticators') parser.add_argument('--skip-ldap', action='store_true', help='Skip importing LDAP authenticators') parser.add_argument('--skip-ad', action='store_true', help='Skip importing Azure AD authenticator') + parser.add_argument('--skip-saml', action='store_true', help='Skip importing SAML authenticator') def handle(self, *args, **options): # Read Gateway connection parameters from environment variables @@ -25,8 +27,9 @@ class Command(BaseCommand): gateway_skip_verify = os.getenv('GATEWAY_SKIP_VERIFY', '').lower() in ('true', '1', 'yes', 'on') skip_oidc = options['skip_oidc'] - # skip_ldap = options['skip_ldap'] + skip_ldap = options['skip_ldap'] skip_ad = options['skip_ad'] + skip_saml = options['skip_saml'] # If the management command isn't called with all parameters needed to talk to Gateway, consider # it a dry-run and exit cleanly @@ -56,12 +59,16 @@ class Command(BaseCommand): if not skip_oidc: migrators.append(GitHubMigrator(gateway_client, self)) migrators.append(OIDCMigrator(gateway_client, self)) + + if not skip_saml: migrators.append(SAMLMigrator(gateway_client, self)) - # if not skip_ldap: - # migrators.append(LDAPMigrator(gateway_client, self)) + if not skip_ad: migrators.append(AzureADMigrator(gateway_client, self)) + if not skip_ldap: + migrators.append(LDAPMigrator(gateway_client, self)) + # Run migrations total_results = { 'created': 0, diff --git a/awx/main/tests/unit/utils/test_auth_migration.py b/awx/main/tests/unit/utils/test_auth_migration.py index ab7cd7f77f..20d7340f68 100644 --- a/awx/main/tests/unit/utils/test_auth_migration.py +++ b/awx/main/tests/unit/utils/test_auth_migration.py @@ -3,7 +3,7 @@ Unit tests for auth migration utilities. """ import pytest -from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format +from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format, role_map_to_gateway_format def get_org_mappers(org_map, start_order=1): @@ -18,6 +18,12 @@ def get_team_mappers(team_map, start_order=1): return result +def get_role_mappers(role_map, start_order=1): + """Helper function to get just the mappers from role_map_to_gateway_format.""" + result, _ = role_map_to_gateway_format(role_map, start_order) + return result + + class TestOrgMapToGatewayFormat: def test_none_input(self): diff --git a/awx/main/tests/unit/utils/test_base_migrator.py b/awx/main/tests/unit/utils/test_base_migrator.py index f12cd5ccda..471314caba 100644 --- a/awx/main/tests/unit/utils/test_base_migrator.py +++ b/awx/main/tests/unit/utils/test_base_migrator.py @@ -18,22 +18,66 @@ class TestBaseAuthenticatorMigrator: def test_generate_authenticator_slug(self): """Test slug generation is deterministic.""" - slug1 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client123') - slug2 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client123') + slug1 = self.migrator._generate_authenticator_slug('github', 'github-org') + slug2 = self.migrator._generate_authenticator_slug('github', 'github-org') assert slug1 == slug2 - assert slug1.startswith('awx-github-') - assert len(slug1.split('-')[-1]) == 8 # Hash should be 8 characters + assert slug1 == 'aap-github-github-org' def test_generate_authenticator_slug_different_inputs(self): """Test that different inputs generate different slugs.""" - slug1 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client123') - slug2 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client456') - slug3 = self.migrator._generate_authenticator_slug('ldap', 'ldap', 'ldap://server') + slug1 = self.migrator._generate_authenticator_slug('github', 'github-org') + slug2 = self.migrator._generate_authenticator_slug('github', 'github-team') + slug3 = self.migrator._generate_authenticator_slug('ldap', 'ldap') assert slug1 != slug2 assert slug1 != slug3 assert slug2 != slug3 + assert slug1 == 'aap-github-github-org' + assert slug2 == 'aap-github-github-team' + assert slug3 == 'aap-ldap-ldap' + + def test_generate_authenticator_slug_ldap_variants(self): + """Test LDAP authenticator slug generation for all supported variants.""" + # Test all LDAP authenticator naming variants + ldap_base = self.migrator._generate_authenticator_slug('ldap', 'ldap') + ldap1 = self.migrator._generate_authenticator_slug('ldap', 'ldap1') + ldap2 = self.migrator._generate_authenticator_slug('ldap', 'ldap2') + ldap3 = self.migrator._generate_authenticator_slug('ldap', 'ldap3') + ldap4 = self.migrator._generate_authenticator_slug('ldap', 'ldap4') + ldap5 = self.migrator._generate_authenticator_slug('ldap', 'ldap5') + + # Verify correct slug format + assert ldap_base == 'aap-ldap-ldap' + assert ldap1 == 'aap-ldap-ldap1' + assert ldap2 == 'aap-ldap-ldap2' + assert ldap3 == 'aap-ldap-ldap3' + assert ldap4 == 'aap-ldap-ldap4' + assert ldap5 == 'aap-ldap-ldap5' + + # Verify all slugs are unique + all_slugs = [ldap_base, ldap1, ldap2, ldap3, ldap4, ldap5] + assert len(all_slugs) == len(set(all_slugs)) + + def test_generate_authenticator_slug_github_variants(self): + """Test GitHub authenticator slug generation for all supported variants.""" + # Test all GitHub authenticator naming variants + github_base = self.migrator._generate_authenticator_slug('github', 'github') + github_org = self.migrator._generate_authenticator_slug('github', 'github-org') + github_team = self.migrator._generate_authenticator_slug('github', 'github-team') + github_enterprise_org = self.migrator._generate_authenticator_slug('github', 'github-enterprise-org') + github_enterprise_team = self.migrator._generate_authenticator_slug('github', 'github-enterprise-team') + + # Verify correct slug format + assert github_base == 'aap-github-github' + assert github_org == 'aap-github-github-org' + assert github_team == 'aap-github-github-team' + assert github_enterprise_org == 'aap-github-github-enterprise-org' + assert github_enterprise_team == 'aap-github-github-enterprise-team' + + # Verify all slugs are unique + all_slugs = [github_base, github_org, github_team, github_enterprise_org, github_enterprise_team] + assert len(all_slugs) == len(set(all_slugs)) def test_get_mapper_ignore_keys_default(self): """Test default mapper ignore keys.""" @@ -224,34 +268,39 @@ class TestMapperComparison: def test_mappers_match_structurally_identical(self): """Test that identical mappers match structurally.""" - mapper1 = {'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'} + mapper1 = {'name': 'myorg - engineering', 'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'} mapper2 = mapper1.copy() assert self.migrator._mappers_match_structurally(mapper1, mapper2) is True def test_mappers_match_structurally_different_fields(self): - """Test that mappers don't match structurally when key fields differ.""" - base_mapper = {'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'} + """Test that mappers match structurally when only name is the same.""" + base_mapper = {'name': 'myorg - engineering', 'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'} - # Test different organization + # Test different organization but same name - should still match mapper2 = base_mapper.copy() mapper2['organization'] = 'otherorg' - assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False + assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True - # Test different team + # Test different team but same name - should still match mapper2 = base_mapper.copy() mapper2['team'] = 'qa' - assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False + assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True - # Test different map_type + # Test different map_type but same name - should still match mapper2 = base_mapper.copy() mapper2['map_type'] = 'organization' - assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False + assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True - # Test different role + # Test different role but same name - should still match mapper2 = base_mapper.copy() mapper2['role'] = 'Organization Admin' + assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True + + # Test different name - should not match + mapper2 = base_mapper.copy() + mapper2['name'] = 'otherorg - qa' assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False def test_mapper_configs_match_identical(self): @@ -593,36 +642,45 @@ def test_authenticator_configs_match_edge_cases(existing_auth, new_config, ignor @pytest.mark.parametrize( - "mapper1,mapper2,ignore_keys,expected", + "mapper1,mapper2,expected", [ - # Test with None team values (org mappers) + # Test with same name ( - {'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'}, - {'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'}, - [], + {'name': 'myorg - Organization Admins', 'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'}, + {'name': 'myorg - Organization Admins', 'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'}, True, ), - # Test with ignore keys (for structural matching, ignore_keys shouldn't matter) + # Test with same name but different other fields ( - {'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member', 'id': 123}, - {'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member', 'id': 456}, - ['id'], + {'name': 'myorg - eng', 'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member', 'id': 123}, + {'name': 'myorg - eng', 'organization': 'otherorg', 'team': 'qa', 'map_type': 'organization', 'role': 'Organization Admin', 'id': 456}, True, ), - # Test structural mismatch + # Test with different names + ( + {'name': 'myorg - eng', 'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'}, + {'name': 'myorg - qa', 'organization': 'myorg', 'team': 'qa', 'map_type': 'team', 'role': 'Team Member'}, + False, + ), + # Test with missing name ( {'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'}, - {'organization': 'myorg', 'team': 'qa', 'map_type': 'team', 'role': 'Team Member'}, - [], + {'name': 'myorg - eng', 'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'}, False, ), + # Test with both missing name + ( + {'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'}, + {'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'}, + True, + ), ], ) -def test_mappers_match_structurally_edge_cases(mapper1, mapper2, ignore_keys, expected): - """Test edge cases for mapper structural matching.""" +def test_mappers_match_structurally_edge_cases(mapper1, mapper2, expected): + """Test edge cases for mapper structural matching based on name.""" gateway_client = Mock() command = Mock() migrator = BaseAuthenticatorMigrator(gateway_client, command) - result = migrator._mappers_match_structurally(mapper1, mapper2, ignore_keys) + result = migrator._mappers_match_structurally(mapper1, mapper2) assert result == expected diff --git a/awx/main/tests/unit/utils/test_role_mapping.py b/awx/main/tests/unit/utils/test_role_mapping.py new file mode 100644 index 0000000000..72c0de84bb --- /dev/null +++ b/awx/main/tests/unit/utils/test_role_mapping.py @@ -0,0 +1,614 @@ +""" +Unit tests for role mapping utilities. +""" + +import pytest +from awx.main.utils.gateway_mapping import role_map_to_gateway_format +from awx.sso.utils.ldap_migrator import LDAPMigrator + + +def get_role_mappers(role_map, start_order=1): + """Helper function to get just the mappers from role_map_to_gateway_format.""" + result, _ = role_map_to_gateway_format(role_map, start_order) + return result + + +def ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=1): + """Helper function to test LDAP group allow mapping via LDAPMigrator.""" + migrator = LDAPMigrator() + return migrator._ldap_group_allow_to_gateway_format(result, ldap_group, deny, start_order) + + +class TestRoleMapToGatewayFormat: + """Tests for role_map_to_gateway_format function.""" + + def test_none_input(self): + """Test that None input returns empty list.""" + result, next_order = role_map_to_gateway_format(None) + assert result == [] + assert next_order == 1 # Default start_order + + def test_empty_dict(self): + """Test that empty dict returns empty list.""" + result, next_order = role_map_to_gateway_format({}) + assert result == [] + assert next_order == 1 + + def test_is_superuser_single_group(self): + """Test is_superuser with single group.""" + role_map = {"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"} + + result, _ = role_map_to_gateway_format(role_map) + + expected = [ + { + "name": "is_superuser - role", + "authenticator": -1, + "revoke": True, + "map_type": "is_superuser", + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"], + } + }, + "order": 1, + } + ] + + assert result == expected + + def test_is_superuser_multiple_groups(self): + """Test is_superuser with multiple groups.""" + role_map = {"is_superuser": ["cn=super_users,dc=example,dc=com", "cn=admins,dc=example,dc=com"]} + + result, _ = role_map_to_gateway_format(role_map) + + expected = [ + { + "name": "is_superuser - role", + "authenticator": -1, + "revoke": True, + "map_type": "is_superuser", + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": ["cn=super_users,dc=example,dc=com", "cn=admins,dc=example,dc=com"], + } + }, + "order": 1, + } + ] + + assert result == expected + + def test_is_system_auditor_single_group(self): + """Test is_system_auditor with single group.""" + role_map = {"is_system_auditor": "cn=auditors,dc=example,dc=com"} + + result, _ = role_map_to_gateway_format(role_map) + + expected = [ + { + "name": "is_system_auditor - role", + "authenticator": -1, + "revoke": True, + "map_type": "role", + "role": "Platform Auditor", + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": ["cn=auditors,dc=example,dc=com"], + } + }, + "order": 1, + } + ] + + assert result == expected + + def test_is_system_auditor_multiple_groups(self): + """Test is_system_auditor with multiple groups.""" + role_map = {"is_system_auditor": ["cn=auditors,dc=example,dc=com", "cn=viewers,dc=example,dc=com"]} + + result, _ = role_map_to_gateway_format(role_map) + + expected = [ + { + "name": "is_system_auditor - role", + "authenticator": -1, + "revoke": True, + "map_type": "role", + "role": "Platform Auditor", + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": ["cn=auditors,dc=example,dc=com", "cn=viewers,dc=example,dc=com"], + } + }, + "order": 1, + } + ] + + assert result == expected + + def test_multiple_roles(self): + """Test multiple role mappings.""" + role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"} + + result, _ = role_map_to_gateway_format(role_map) + + expected = [ + { + "name": "is_superuser - role", + "authenticator": -1, + "revoke": True, + "map_type": "is_superuser", + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": ["cn=super_users,dc=example,dc=com"], + } + }, + "order": 1, + }, + { + "name": "is_system_auditor - role", + "authenticator": -1, + "revoke": True, + "map_type": "role", + "role": "Platform Auditor", + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": ["cn=auditors,dc=example,dc=com"], + } + }, + "order": 2, + }, + ] + + assert result == expected + + def test_unsupported_role_flag(self): + """Test that unsupported role flags are ignored.""" + role_map = { + "is_superuser": "cn=super_users,dc=example,dc=com", + "is_staff": "cn=staff,dc=example,dc=com", # Unsupported flag + "is_system_auditor": "cn=auditors,dc=example,dc=com", + } + + result, _ = role_map_to_gateway_format(role_map) + + # Should only have 2 mappers (is_superuser and is_system_auditor) + assert len(result) == 2 + assert result[0]["map_type"] == "is_superuser" + assert result[1]["map_type"] == "role" + assert result[1]["role"] == "Platform Auditor" + + def test_order_increments_correctly(self): + """Test that order values increment correctly.""" + role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"} + + result, _ = role_map_to_gateway_format(role_map) + + assert len(result) == 2 + assert result[0]["order"] == 1 + assert result[1]["order"] == 2 + + def test_start_order_parameter(self): + """Test that start_order parameter is respected.""" + role_map = {"is_superuser": "cn=super_users,dc=example,dc=com"} + + result, next_order = role_map_to_gateway_format(role_map, start_order=5) + + assert result[0]["order"] == 5 + assert next_order == 6 + + def test_string_to_list_conversion(self): + """Test that string groups are converted to lists.""" + role_map = {"is_superuser": "single-group"} + + result, _ = role_map_to_gateway_format(role_map) + + # Should convert string to list for has_or + assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"] + + def test_triggers_format_validation(self): + """Test that trigger formats match Gateway specification.""" + role_map = {"is_superuser": ["group1", "group2"]} + + result, _ = role_map_to_gateway_format(role_map) + + # Validate that triggers follow Gateway format + triggers = result[0]["triggers"] + assert "groups" in triggers + assert "has_or" in triggers["groups"] + assert isinstance(triggers["groups"]["has_or"], list) + assert triggers["groups"]["has_or"] == ["group1", "group2"] + + def test_ldap_dn_format(self): + """Test with realistic LDAP DN format.""" + role_map = { + "is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com", + "is_system_auditor": "cn=awx_auditors,OU=administration groups,DC=contoso,DC=com", + } + + result, _ = role_map_to_gateway_format(role_map) + + assert len(result) == 2 + assert result[0]["triggers"]["groups"]["has_or"] == ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"] + assert result[1]["triggers"]["groups"]["has_or"] == ["cn=awx_auditors,OU=administration groups,DC=contoso,DC=com"] + + def test_gateway_format_compliance(self): + """Test that all results comply with Gateway role mapping format.""" + role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"} + + result, _ = role_map_to_gateway_format(role_map) + + for mapping in result: + # Required fields per Gateway spec + assert "name" in mapping + assert "authenticator" in mapping + assert "map_type" in mapping + assert "organization" in mapping + assert "team" in mapping + assert "triggers" in mapping + assert "order" in mapping + assert "revoke" in mapping + + # Field types + assert isinstance(mapping["name"], str) + assert isinstance(mapping["authenticator"], int) + assert mapping["map_type"] in ["is_superuser", "role"] + assert mapping["organization"] is None + assert mapping["team"] is None + assert isinstance(mapping["triggers"], dict) + assert isinstance(mapping["order"], int) + assert isinstance(mapping["revoke"], bool) + + # Specific field validations based on map_type + if mapping["map_type"] == "is_superuser": + assert "role" not in mapping + elif mapping["map_type"] == "role": + assert "role" in mapping + assert isinstance(mapping["role"], str) + assert mapping["role"] == "Platform Auditor" + + +# Parametrized tests for role mappings +@pytest.mark.parametrize( + "role_map,expected_length", + [ + (None, 0), + ({}, 0), + ({"is_superuser": "group1"}, 1), + ({"is_system_auditor": "group1"}, 1), + ({"is_superuser": "group1", "is_system_auditor": "group2"}, 2), + ({"is_staff": "group1"}, 0), # Unsupported flag + ({"is_superuser": "group1", "is_staff": "group2", "is_system_auditor": "group3"}, 2), # Mixed supported/unsupported + ], +) +def test_role_map_result_lengths(role_map, expected_length): + """Test that role_map_to_gateway_format returns expected number of mappings.""" + result, _ = role_map_to_gateway_format(role_map) + assert len(result) == expected_length + + +# Edge case tests +def test_empty_groups_handling(): + """Test handling of empty group lists.""" + role_map = {"is_superuser": []} + + result, _ = role_map_to_gateway_format(role_map) + + assert len(result) == 1 + assert result[0]["triggers"]["groups"]["has_or"] == [] + + +def test_mixed_group_types(): + """Test handling of mixed group types (string and list).""" + role_map = {"is_superuser": "single-group", "is_system_auditor": ["group1", "group2"]} + + result, _ = role_map_to_gateway_format(role_map) + + assert len(result) == 2 + assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"] + assert result[1]["triggers"]["groups"]["has_or"] == ["group1", "group2"] + + +def test_realistic_ldap_user_flags_by_group(): + """Test with realistic LDAP USER_FLAGS_BY_GROUP data.""" + role_map = {"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"} + + result, _ = role_map_to_gateway_format(role_map) + + # This is exactly the use case from the user's example + assert len(result) == 1 + assert result[0]["map_type"] == "is_superuser" + assert result[0]["triggers"]["groups"]["has_or"] == ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"] + assert result[0]["revoke"] is True + assert result[0]["team"] is None + assert result[0]["organization"] is None + + +class TestLdapGroupAllowToGatewayFormat: + """Tests for ldap_group_allow_to_gateway_format function.""" + + def test_none_input_with_empty_result(self): + """Test that None input with empty result returns unchanged result.""" + result = [] + output_result, next_order = ldap_group_allow_to_gateway_format(result, None, deny=False) + + assert output_result == [] + assert next_order == 1 # Default start_order + + def test_none_input_with_existing_result(self): + """Test that None input with existing mappers returns unchanged result.""" + result = [{"existing": "mapper"}] + output_result, next_order = ldap_group_allow_to_gateway_format(result, None, deny=False, start_order=5) + + assert output_result == [{"existing": "mapper"}] + assert next_order == 5 # start_order unchanged + + def test_require_group_mapping(self): + """Test LDAP REQUIRE_GROUP mapping (deny=False).""" + result = [] + ldap_group = "cn=allowed_users,dc=example,dc=com" + + output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=1) + + expected = [ + { + "name": "LDAP-RequireGroup", + "authenticator": -1, + "map_type": "allow", + "revoke": False, + "triggers": {"groups": {"has_and": ["cn=allowed_users,dc=example,dc=com"]}}, + "order": 1, + } + ] + + assert output_result == expected + assert next_order == 2 + + def test_deny_group_mapping(self): + """Test LDAP DENY_GROUP mapping (deny=True).""" + result = [] + ldap_group = "cn=blocked_users,dc=example,dc=com" + + output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=True, start_order=1) + + expected = [ + { + "name": "LDAP-DenyGroup", + "authenticator": -1, + "map_type": "allow", + "revoke": True, + "triggers": {"groups": {"has_or": ["cn=blocked_users,dc=example,dc=com"]}}, + "order": 1, + } + ] + + assert output_result == expected + assert next_order == 2 + + def test_appending_to_existing_result(self): + """Test appending to existing result list.""" + existing_mapper = { + "name": "existing-mapper", + "authenticator": -1, + "map_type": "role", + "order": 1, + } + result = [existing_mapper] + ldap_group = "cn=new_group,dc=example,dc=com" + + output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=2) + + assert len(output_result) == 2 + assert output_result[0] == existing_mapper # Original mapper unchanged + assert output_result[1]["name"] == "LDAP-RequireGroup" + assert output_result[1]["order"] == 2 + assert next_order == 3 + + def test_custom_start_order(self): + """Test that custom start_order is respected.""" + result = [] + ldap_group = "cn=test_group,dc=example,dc=com" + + output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=10) + + assert output_result[0]["order"] == 10 + assert next_order == 11 + + def test_require_vs_deny_trigger_differences(self): + """Test the difference between require and deny group triggers.""" + ldap_group = "cn=test_group,dc=example,dc=com" + + # Test require group (deny=False) + require_result, _ = ldap_group_allow_to_gateway_format([], ldap_group, deny=False) + + # Test deny group (deny=True) + deny_result, _ = ldap_group_allow_to_gateway_format([], ldap_group, deny=True) + + # Require group should use has_and + assert require_result[0]["triggers"]["groups"]["has_and"] == ["cn=test_group,dc=example,dc=com"] + assert require_result[0]["revoke"] is False + assert require_result[0]["name"] == "LDAP-RequireGroup" + + # Deny group should use has_or + assert deny_result[0]["triggers"]["groups"]["has_or"] == ["cn=test_group,dc=example,dc=com"] + assert deny_result[0]["revoke"] is True + assert deny_result[0]["name"] == "LDAP-DenyGroup" + + def test_realistic_ldap_dn_format(self): + """Test with realistic LDAP DN format.""" + result = [] + + # Test with require group + require_group = "cn=awx_users,OU=application groups,DC=contoso,DC=com" + output_result, next_order = ldap_group_allow_to_gateway_format(result, require_group, deny=False, start_order=1) + + assert len(output_result) == 1 + assert output_result[0]["triggers"]["groups"]["has_and"] == ["cn=awx_users,OU=application groups,DC=contoso,DC=com"] + assert output_result[0]["name"] == "LDAP-RequireGroup" + assert next_order == 2 + + def test_multiple_sequential_calls(self): + """Test multiple sequential calls to build complex allow mappers.""" + result = [] + + # Add deny group first + result, next_order = ldap_group_allow_to_gateway_format(result, "cn=blocked,dc=example,dc=com", deny=True, start_order=1) + + # Add require group second + result, next_order = ldap_group_allow_to_gateway_format(result, "cn=allowed,dc=example,dc=com", deny=False, start_order=next_order) + + assert len(result) == 2 + + # First mapper should be deny group + assert result[0]["name"] == "LDAP-DenyGroup" + assert result[0]["revoke"] is True + assert result[0]["triggers"]["groups"]["has_or"] == ["cn=blocked,dc=example,dc=com"] + assert result[0]["order"] == 1 + + # Second mapper should be require group + assert result[1]["name"] == "LDAP-RequireGroup" + assert result[1]["revoke"] is False + assert result[1]["triggers"]["groups"]["has_and"] == ["cn=allowed,dc=example,dc=com"] + assert result[1]["order"] == 2 + + assert next_order == 3 + + def test_gateway_format_compliance(self): + """Test that all results comply with Gateway allow mapping format.""" + result = [] + + # Test both deny and require groups + result, _ = ldap_group_allow_to_gateway_format(result, "cn=denied,dc=example,dc=com", deny=True, start_order=1) + result, _ = ldap_group_allow_to_gateway_format(result, "cn=required,dc=example,dc=com", deny=False, start_order=2) + + for mapping in result: + # Required fields per Gateway spec + assert "name" in mapping + assert "authenticator" in mapping + assert "map_type" in mapping + assert "triggers" in mapping + assert "order" in mapping + assert "revoke" in mapping + + # Field types + assert isinstance(mapping["name"], str) + assert isinstance(mapping["authenticator"], int) + assert mapping["map_type"] == "allow" + assert isinstance(mapping["triggers"], dict) + assert isinstance(mapping["order"], int) + assert isinstance(mapping["revoke"], bool) + + # Trigger format validation + assert "groups" in mapping["triggers"] + groups_trigger = mapping["triggers"]["groups"] + + # Should have either has_and or has_or, but not both + has_and = "has_and" in groups_trigger + has_or = "has_or" in groups_trigger + assert has_and != has_or # XOR - exactly one should be true + + if has_and: + assert isinstance(groups_trigger["has_and"], list) + assert len(groups_trigger["has_and"]) == 1 + if has_or: + assert isinstance(groups_trigger["has_or"], list) + assert len(groups_trigger["has_or"]) == 1 + + def test_original_result_not_modified_when_none(self): + """Test that original result list is not modified when ldap_group is None.""" + original_result = [{"original": "mapper"}] + result_copy = original_result.copy() + + output_result, _ = ldap_group_allow_to_gateway_format(original_result, None, deny=False) + + # Original list should be unchanged + assert original_result == result_copy + # Output should be the same reference + assert output_result is original_result + + def test_empty_string_group(self): + """Test handling of empty string group.""" + result = [] + + output_result, next_order = ldap_group_allow_to_gateway_format(result, "", deny=False, start_order=1) + + # Should still create a mapper even with empty string + assert len(output_result) == 1 + assert output_result[0]["triggers"]["groups"]["has_and"] == [""] + assert next_order == 2 + + +# Parametrized tests for ldap_group_allow_to_gateway_format +@pytest.mark.parametrize( + "ldap_group,deny,expected_name,expected_revoke,expected_trigger_type", + [ + ("cn=test,dc=example,dc=com", True, "LDAP-DenyGroup", True, "has_or"), + ("cn=test,dc=example,dc=com", False, "LDAP-RequireGroup", False, "has_and"), + ("cn=users,ou=groups,dc=company,dc=com", True, "LDAP-DenyGroup", True, "has_or"), + ("cn=users,ou=groups,dc=company,dc=com", False, "LDAP-RequireGroup", False, "has_and"), + ], +) +def test_ldap_group_parametrized(ldap_group, deny, expected_name, expected_revoke, expected_trigger_type): + """Parametrized test for various LDAP group configurations.""" + result = [] + + output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=deny, start_order=1) + + assert len(output_result) == 1 + mapper = output_result[0] + + assert mapper["name"] == expected_name + assert mapper["revoke"] == expected_revoke + assert expected_trigger_type in mapper["triggers"]["groups"] + assert mapper["triggers"]["groups"][expected_trigger_type] == [ldap_group] + assert next_order == 2 + + +def test_realistic_awx_ldap_migration_scenario(): + """Test realistic scenario from AWX LDAP migration.""" + result = [] + + # Simulate AWX LDAP configuration with both REQUIRE_GROUP and DENY_GROUP + deny_group = "cn=blocked_users,OU=blocked groups,DC=contoso,DC=com" + require_group = "cn=awx_users,OU=application groups,DC=contoso,DC=com" + + # Add deny group first (as in the migrator) + result, next_order = ldap_group_allow_to_gateway_format(result, deny_group, deny=True, start_order=1) + + # Add require group second + result, next_order = ldap_group_allow_to_gateway_format(result, require_group, deny=False, start_order=next_order) + + # Should have 2 allow mappers + assert len(result) == 2 + + # Verify deny group mapper + deny_mapper = result[0] + assert deny_mapper["name"] == "LDAP-DenyGroup" + assert deny_mapper["map_type"] == "allow" + assert deny_mapper["revoke"] is True + assert deny_mapper["triggers"]["groups"]["has_or"] == [deny_group] + assert deny_mapper["order"] == 1 + + # Verify require group mapper + require_mapper = result[1] + assert require_mapper["name"] == "LDAP-RequireGroup" + assert require_mapper["map_type"] == "allow" + assert require_mapper["revoke"] is False + assert require_mapper["triggers"]["groups"]["has_and"] == [require_group] + assert require_mapper["order"] == 2 + + assert next_order == 3 diff --git a/awx/main/utils/gateway_mapping.py b/awx/main/utils/gateway_mapping.py index 0f56ba6e0c..8cc8f6f332 100644 --- a/awx/main/utils/gateway_mapping.py +++ b/awx/main/utils/gateway_mapping.py @@ -163,3 +163,67 @@ def org_map_to_gateway_format(org_map, start_order=1): order += 1 return result, order + + +def role_map_to_gateway_format(role_map, start_order=1): + """Convert AWX role mapping to Gateway authenticator format. + + Args: + role_map: An LDAP or SAML role mapping + start_order: Starting order value for the mappers + + Returns: + tuple: (List of Gateway-compatible organization mappers, next_order) + """ + if role_map is None: + return [], start_order + + result = [] + order = start_order + + for flag in role_map: + groups = role_map[flag] + if type(groups) is str: + groups = [groups] + + if flag == 'is_superuser': + # Gateway has a special map_type for superusers + result.append( + { + "name": f"{flag} - role", + "authenticator": -1, + "revoke": True, + "map_type": flag, + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": groups, + } + }, + "order": order, + } + ) + elif flag == 'is_system_auditor': + # roles other than superuser must be represented as a generic role mapper + result.append( + { + "name": f"{flag} - role", + "authenticator": -1, + "revoke": True, + "map_type": "role", + "role": "Platform Auditor", + "team": None, + "organization": None, + "triggers": { + "groups": { + "has_or": groups, + } + }, + "order": order, + } + ) + + order += 1 + + return result, order diff --git a/awx/sso/utils/azure_ad_migrator.py b/awx/sso/utils/azure_ad_migrator.py index 2f0ca81dd9..b17cd3d8b7 100644 --- a/awx/sso/utils/azure_ad_migrator.py +++ b/awx/sso/utils/azure_ad_migrator.py @@ -46,7 +46,7 @@ class AzureADMigrator(BaseAuthenticatorMigrator): # Generate authenticator name and slug authenticator_name = "Controller Azure AD" - authenticator_slug = self._generate_authenticator_slug("azure_ad", category, key_value) + authenticator_slug = self._generate_authenticator_slug("azure_ad", category) return [ { diff --git a/awx/sso/utils/base_migrator.py b/awx/sso/utils/base_migrator.py index 49cbd98598..270af65bd2 100644 --- a/awx/sso/utils/base_migrator.py +++ b/awx/sso/utils/base_migrator.py @@ -5,8 +5,6 @@ This module defines the contract that all specific authenticator migrators must """ from awx.main.utils.gateway_client import GatewayAPIError -import re -import hashlib class BaseAuthenticatorMigrator: @@ -98,15 +96,9 @@ class BaseAuthenticatorMigrator: """ raise NotImplementedError("Subclasses must implement get_authenticator_type()") - def _generate_authenticator_slug(self, auth_type, category, identifier): + def _generate_authenticator_slug(self, auth_type, category): """Generate a deterministic slug for an authenticator.""" - base_string = f"awx-{auth_type}-{category}-{identifier}" - cleaned = re.sub(r'[^a-zA-Z0-9]+', '-', base_string.lower()) - cleaned = re.sub(r'^-+|-+$', '', cleaned) - cleaned = re.sub(r'-+', '-', cleaned) - slug_hash = hashlib.md5(cleaned.encode('utf-8')).hexdigest()[:8] - final_slug = f"awx-{auth_type}-{slug_hash}" - return final_slug + return f"aap-{auth_type}-{category}" def submit_authenticator(self, gateway_config, ignore_keys=[], config={}): """ @@ -273,7 +265,7 @@ class BaseAuthenticatorMigrator: # Try to find a matching existing mapper for existing_mapper in existing_mappers: - if self._mappers_match_structurally(existing_mapper, new_mapper, ignore_keys): + if self._mappers_match_structurally(existing_mapper, new_mapper): matched_existing = existing_mapper break @@ -288,7 +280,7 @@ class BaseAuthenticatorMigrator: return mappers_to_update, mappers_to_create - def _mappers_match_structurally(self, existing_mapper, new_mapper, ignore_keys=None): + def _mappers_match_structurally(self, existing_mapper, new_mapper): """ Check if two mappers match structurally (same organization, team, map_type, role). This identifies if they represent the same logical mapping. @@ -296,16 +288,13 @@ class BaseAuthenticatorMigrator: Args: existing_mapper: Existing mapper configuration from Gateway new_mapper: New mapper configuration - ignore_keys: List of keys to ignore during comparison Returns: bool: True if mappers represent the same logical mapping """ - if ignore_keys is None: - ignore_keys = [] # Compare key structural fields that identify the same logical mapper - structural_fields = ['organization', 'team', 'map_type', 'role'] + structural_fields = ['name'] for field in structural_fields: if existing_mapper.get(field) != new_mapper.get(field): @@ -357,7 +346,9 @@ class BaseAuthenticatorMigrator: category = config['category'] org_mappers = config.get('org_mappers', []) team_mappers = config.get('team_mappers', []) - all_new_mappers = org_mappers + team_mappers + role_mappers = config.get('role_mappers', []) + allow_mappers = config.get('allow_mappers', []) + all_new_mappers = org_mappers + team_mappers + role_mappers + allow_mappers if len(all_new_mappers) == 0: self._write_output(f'No mappers to process for {category} authenticator') @@ -366,6 +357,8 @@ class BaseAuthenticatorMigrator: self._write_output(f'\n--- Processing mappers for {category} authenticator (ID: {authenticator_id}) ---') self._write_output(f'Organization mappers: {len(org_mappers)}') self._write_output(f'Team mappers: {len(team_mappers)}') + self._write_output(f'Role mappers: {len(role_mappers)}') + self._write_output(f'Allow mappers: {len(allow_mappers)}') # Get existing mappers from Gateway try: @@ -380,6 +373,9 @@ class BaseAuthenticatorMigrator: # Compare existing vs new mappers mappers_to_update, mappers_to_create = self._compare_mapper_lists(existing_mappers, all_new_mappers, ignore_keys) + self._write_output(f'Mappers to create: {len(mappers_to_create)}') + self._write_output(f'Mappers to update: {len(mappers_to_update)}') + created_count = 0 updated_count = 0 failed_count = 0 diff --git a/awx/sso/utils/github_migrator.py b/awx/sso/utils/github_migrator.py index fd2aef429a..708bb510a5 100644 --- a/awx/sso/utils/github_migrator.py +++ b/awx/sso/utils/github_migrator.py @@ -114,8 +114,8 @@ class GitHubMigrator(BaseAuthenticatorMigrator): return False # Generate authenticator name and slug - authenticator_name = f"AWX-{category.replace('-', '_').title()}" - authenticator_slug = self._generate_authenticator_slug('github', category, key_value) + authenticator_name = category + authenticator_slug = self._generate_authenticator_slug('github', category) # Map AWX category to Gateway authenticator type type_mapping = { diff --git a/awx/sso/utils/ldap_migrator.py b/awx/sso/utils/ldap_migrator.py index 47c301cdb8..3976393189 100644 --- a/awx/sso/utils/ldap_migrator.py +++ b/awx/sso/utils/ldap_migrator.py @@ -4,7 +4,10 @@ LDAP authenticator migrator. This module handles the migration of LDAP authenticators from AWX to Gateway. """ +from django.conf import settings +from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format, role_map_to_gateway_format from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator +import ldap class LDAPMigrator(BaseAuthenticatorMigrator): @@ -24,39 +27,341 @@ class LDAPMigrator(BaseAuthenticatorMigrator): Returns: list: List of configured LDAP authentication providers with their settings """ - # TODO: Implement LDAP configuration retrieval # AWX supports up to 6 LDAP configurations: AUTH_LDAP (default) and AUTH_LDAP_1 through AUTH_LDAP_5 - # LDAP settings typically include: - # - AUTH_LDAP_SERVER_URI - # - AUTH_LDAP_BIND_DN - # - AUTH_LDAP_BIND_PASSWORD - # - AUTH_LDAP_START_TLS - # - AUTH_LDAP_CONNECTION_OPTIONS - # - AUTH_LDAP_USER_SEARCH - # - AUTH_LDAP_USER_DN_TEMPLATE - # - AUTH_LDAP_USER_ATTR_MAP - # - AUTH_LDAP_GROUP_SEARCH - # - AUTH_LDAP_GROUP_TYPE - # - AUTH_LDAP_GROUP_TYPE_PARAMS - # - AUTH_LDAP_REQUIRE_GROUP - # - AUTH_LDAP_DENY_GROUP - # - AUTH_LDAP_USER_FLAGS_BY_GROUP - # - AUTH_LDAP_ORGANIZATION_MAP - # - AUTH_LDAP_TEAM_MAP + ldap_instances = [None, 1, 2, 3, 4, 5] # None represents the default AUTH_LDAP_ configuration found_configs = [] + + for instance in ldap_instances: + # Build the prefix for this LDAP instance + prefix = f"AUTH_LDAP_{instance}_" if instance is not None else "AUTH_LDAP_" + category = f"ldap{instance}" if instance is not None else "ldap" + + try: + # Get all LDAP settings for this instance + config_data = self._get_ldap_instance_config(prefix) + except Exception as e: + raise Exception(f'Could not retrieve {category} settings: {str(e)}') + + # Skip if SERVER_URI is not configured (required for LDAP to function) + if not config_data.get('SERVER_URI'): + continue + + # Convert organization, team, and role mappings to Gateway format + org_map_value = config_data.get('ORGANIZATION_MAP', {}) + team_map_value = config_data.get('TEAM_MAP', {}) + role_map_value = config_data.get('USER_FLAGS_BY_GROUP', {}) + require_group_value = config_data.get('REQUIRE_GROUP', {}) + deny_group_value = config_data.get('DENY_GROUP', {}) + + allow_mappers = [] + + # Start with order 1 and maintain sequence across org, team, and role mappers + allow_mappers, next_order = self._ldap_group_allow_to_gateway_format(allow_mappers, deny_group_value, deny=True, start_order=1) + allow_mappers, next_order = self._ldap_group_allow_to_gateway_format(allow_mappers, require_group_value, deny=False, start_order=next_order) + + org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=next_order) + team_mappers, next_order = team_map_to_gateway_format(team_map_value, start_order=next_order) + role_mappers, _ = role_map_to_gateway_format(role_map_value, start_order=next_order) + + found_configs.append( + { + 'category': category, + 'settings': config_data, + 'org_mappers': org_mappers, + 'team_mappers': team_mappers, + 'role_mappers': role_mappers, + 'allow_mappers': allow_mappers, + } + ) + return found_configs + def _get_ldap_instance_config(self, prefix): + """ + Get all LDAP configuration settings for a specific instance. + + Args: + prefix: The setting prefix (e.g., 'AUTH_LDAP_' or 'AUTH_LDAP_1_') + + Returns: + dict: Dictionary of LDAP configuration settings + """ + # Define all LDAP setting keys + ldap_keys = [ + 'SERVER_URI', # Required: LDAP server URI(s) + 'BIND_DN', # Optional: Bind DN for authentication + 'BIND_PASSWORD', # Optional: Bind password + 'START_TLS', # Optional: Enable TLS + 'CONNECTION_OPTIONS', # Optional: LDAP connection options + 'USER_SEARCH', # Optional: User search configuration + 'USER_DN_TEMPLATE', # Optional: User DN template + 'USER_ATTR_MAP', # Optional: User attribute mapping + 'GROUP_SEARCH', # Optional: Group search configuration + 'GROUP_TYPE', # Optional: Group type class + 'GROUP_TYPE_PARAMS', # Optional: Group type parameters + 'REQUIRE_GROUP', # Optional: Required group DN + 'DENY_GROUP', # Optional: Denied group DN + 'USER_FLAGS_BY_GROUP', # Optional: User flags mapping + 'ORGANIZATION_MAP', # Optional: Organization mapping + 'TEAM_MAP', # Optional: Team mapping + ] + + config_data = {} + + for key in ldap_keys: + setting_name = f"{prefix}{key}" + value = getattr(settings, setting_name, None) + + # Handle special field types that need conversion + if key == 'GROUP_TYPE' and value: + # Convert GROUP_TYPE class to string representation + config_data[key] = type(value).__name__ + elif key == 'SERVER_URI' and value: + # Convert SERVER_URI to list format if it's a comma-separated string + config_data[key] = [uri.strip() for uri in value.split(',')] + elif key in ['USER_SEARCH', 'GROUP_SEARCH'] and value: + # Convert LDAPSearch objects to list format [base_dn, scope, filter] + if hasattr(value, 'base_dn') and hasattr(value, 'filterstr'): + # Get the actual scope instead of hardcoding SCOPE_SUBTREE + scope = getattr(value, 'scope', ldap.SCOPE_SUBTREE) # 2 is SCOPE_SUBTREE default + scope_name = {ldap.SCOPE_BASE: 'SCOPE_BASE', ldap.SCOPE_ONELEVEL: 'SCOPE_ONELEVEL', ldap.SCOPE_SUBTREE: 'SCOPE_SUBTREE'}.get( + scope, 'SCOPE_SUBTREE' + ) + config_data[key] = [value.base_dn, scope_name, value.filterstr] + else: + config_data[key] = value + elif key in ['USER_ATTR_MAP', 'GROUP_TYPE_PARAMS', 'USER_FLAGS_BY_GROUP', 'ORGANIZATION_MAP', 'TEAM_MAP']: + # Ensure dict fields are properly handled + config_data[key] = value if value is not None else {} + elif key == 'CONNECTION_OPTIONS' and value: + # CONNECTION_OPTIONS is a dict of LDAP options + config_data[key] = value if value is not None else {} + else: + # Store the value as-is for other fields + config_data[key] = value + + return config_data + def create_gateway_authenticator(self, config): """Create an LDAP authenticator in Gateway.""" - # TODO: Implement LDAP authenticator creation - # When implementing, use this pattern for slug generation: - # server_uri = settings.get('AUTH_LDAP_SERVER_URI', 'ldap') - # authenticator_slug = self._generate_authenticator_slug('ldap', category, server_uri) - # LDAP requires: - # - Server URI and connection settings - # - Bind DN and password for authentication - # - User and group search configurations - # - Attribute mapping for user fields - # - Group type and parameters - self._write_output('LDAP authenticator creation not yet implemented', 'warning') - return False + category = config['category'] + settings = config['settings'] + + # Extract the first server URI for slug generation + authenticator_slug = self._generate_authenticator_slug('ldap', category) + + # Build the gateway payload + gateway_config = { + 'name': category, + 'slug': authenticator_slug, + 'type': 'ansible_base.authentication.authenticator_plugins.ldap', + 'create_objects': True, + 'remove_users': False, + 'enabled': True, + 'configuration': self._build_ldap_configuration(settings), + } + + self._write_output(f'Creating LDAP authenticator: {gateway_config["name"]}') + + # LDAP authenticators have auto-generated fields that should be ignored during comparison + # BIND_PASSWORD - encrypted value, can't be compared + ignore_keys = ['BIND_PASSWORD'] + + # Submit the authenticator using the base class method + return self.submit_authenticator(gateway_config, config=config, ignore_keys=ignore_keys) + + def _build_ldap_configuration(self, settings): + """Build the LDAP configuration section for Gateway.""" + config = {} + + # Server URI is required + if settings.get('SERVER_URI'): + config['SERVER_URI'] = settings['SERVER_URI'] + + # Authentication settings + if settings.get('BIND_DN'): + config['BIND_DN'] = settings['BIND_DN'] + if settings.get('BIND_PASSWORD'): + config['BIND_PASSWORD'] = settings['BIND_PASSWORD'] + + # TLS settings + if settings.get('START_TLS') is not None: + config['START_TLS'] = settings['START_TLS'] + + # User search configuration + if settings.get('USER_SEARCH'): + config['USER_SEARCH'] = settings['USER_SEARCH'] + + # User attribute mapping + if settings.get('USER_ATTR_MAP'): + config['USER_ATTR_MAP'] = settings['USER_ATTR_MAP'] + + # Group search configuration + if settings.get('GROUP_SEARCH'): + config['GROUP_SEARCH'] = settings['GROUP_SEARCH'] + + # Group type and parameters + if settings.get('GROUP_TYPE'): + config['GROUP_TYPE'] = settings['GROUP_TYPE'] + if settings.get('GROUP_TYPE_PARAMS'): + config['GROUP_TYPE_PARAMS'] = settings['GROUP_TYPE_PARAMS'] + + # Connection options - convert numeric LDAP constants to string keys + if settings.get('CONNECTION_OPTIONS'): + config['CONNECTION_OPTIONS'] = self._convert_ldap_connection_options(settings['CONNECTION_OPTIONS']) + + # User DN template + if settings.get('USER_DN_TEMPLATE'): + config['USER_DN_TEMPLATE'] = settings['USER_DN_TEMPLATE'] + + # REQUIRE_GROUP and DENY_GROUP are handled as allow mappers, not included in config + # USER_FLAGS_BY_GROUP is handled as role mappers, not included in config + + return config + + def _convert_ldap_connection_options(self, connection_options): + """ + Convert numeric LDAP connection option constants to their string representations. + Uses the actual constants from the python-ldap library. + + Args: + connection_options: Dictionary with numeric LDAP option keys + + Returns: + dict: Dictionary with string LDAP option keys + """ + # Comprehensive mapping using LDAP constants as keys + ldap_option_map = { + # Basic LDAP options + ldap.OPT_API_INFO: 'OPT_API_INFO', + ldap.OPT_DEREF: 'OPT_DEREF', + ldap.OPT_SIZELIMIT: 'OPT_SIZELIMIT', + ldap.OPT_TIMELIMIT: 'OPT_TIMELIMIT', + ldap.OPT_REFERRALS: 'OPT_REFERRALS', + ldap.OPT_RESULT_CODE: 'OPT_RESULT_CODE', + ldap.OPT_ERROR_NUMBER: 'OPT_ERROR_NUMBER', + ldap.OPT_RESTART: 'OPT_RESTART', + ldap.OPT_PROTOCOL_VERSION: 'OPT_PROTOCOL_VERSION', + ldap.OPT_SERVER_CONTROLS: 'OPT_SERVER_CONTROLS', + ldap.OPT_CLIENT_CONTROLS: 'OPT_CLIENT_CONTROLS', + ldap.OPT_API_FEATURE_INFO: 'OPT_API_FEATURE_INFO', + ldap.OPT_HOST_NAME: 'OPT_HOST_NAME', + ldap.OPT_DESC: 'OPT_DESC', + ldap.OPT_DIAGNOSTIC_MESSAGE: 'OPT_DIAGNOSTIC_MESSAGE', + ldap.OPT_ERROR_STRING: 'OPT_ERROR_STRING', + ldap.OPT_MATCHED_DN: 'OPT_MATCHED_DN', + ldap.OPT_DEBUG_LEVEL: 'OPT_DEBUG_LEVEL', + ldap.OPT_TIMEOUT: 'OPT_TIMEOUT', + ldap.OPT_REFHOPLIMIT: 'OPT_REFHOPLIMIT', + ldap.OPT_NETWORK_TIMEOUT: 'OPT_NETWORK_TIMEOUT', + ldap.OPT_URI: 'OPT_URI', + # TLS options + ldap.OPT_X_TLS: 'OPT_X_TLS', + ldap.OPT_X_TLS_CTX: 'OPT_X_TLS_CTX', + ldap.OPT_X_TLS_CACERTFILE: 'OPT_X_TLS_CACERTFILE', + ldap.OPT_X_TLS_CACERTDIR: 'OPT_X_TLS_CACERTDIR', + ldap.OPT_X_TLS_CERTFILE: 'OPT_X_TLS_CERTFILE', + ldap.OPT_X_TLS_KEYFILE: 'OPT_X_TLS_KEYFILE', + ldap.OPT_X_TLS_REQUIRE_CERT: 'OPT_X_TLS_REQUIRE_CERT', + ldap.OPT_X_TLS_CIPHER_SUITE: 'OPT_X_TLS_CIPHER_SUITE', + ldap.OPT_X_TLS_RANDOM_FILE: 'OPT_X_TLS_RANDOM_FILE', + ldap.OPT_X_TLS_DHFILE: 'OPT_X_TLS_DHFILE', + ldap.OPT_X_TLS_NEVER: 'OPT_X_TLS_NEVER', + ldap.OPT_X_TLS_HARD: 'OPT_X_TLS_HARD', + ldap.OPT_X_TLS_DEMAND: 'OPT_X_TLS_DEMAND', + ldap.OPT_X_TLS_ALLOW: 'OPT_X_TLS_ALLOW', + ldap.OPT_X_TLS_TRY: 'OPT_X_TLS_TRY', + ldap.OPT_X_TLS_CRL_NONE: 'OPT_X_TLS_CRL_NONE', + ldap.OPT_X_TLS_CRL_PEER: 'OPT_X_TLS_CRL_PEER', + ldap.OPT_X_TLS_CRL_ALL: 'OPT_X_TLS_CRL_ALL', + # SASL options + ldap.OPT_X_SASL_MECH: 'OPT_X_SASL_MECH', + ldap.OPT_X_SASL_REALM: 'OPT_X_SASL_REALM', + ldap.OPT_X_SASL_AUTHCID: 'OPT_X_SASL_AUTHCID', + ldap.OPT_X_SASL_AUTHZID: 'OPT_X_SASL_AUTHZID', + ldap.OPT_X_SASL_SSF: 'OPT_X_SASL_SSF', + ldap.OPT_X_SASL_SSF_EXTERNAL: 'OPT_X_SASL_SSF_EXTERNAL', + ldap.OPT_X_SASL_SECPROPS: 'OPT_X_SASL_SECPROPS', + ldap.OPT_X_SASL_SSF_MIN: 'OPT_X_SASL_SSF_MIN', + ldap.OPT_X_SASL_SSF_MAX: 'OPT_X_SASL_SSF_MAX', + } + + # Add optional options that may not be available in all versions + optional_options = [ + 'OPT_TCP_USER_TIMEOUT', + 'OPT_DEFBASE', + 'OPT_X_TLS_VERSION', + 'OPT_X_TLS_CIPHER', + 'OPT_X_TLS_PEERCERT', + 'OPT_X_TLS_CRLCHECK', + 'OPT_X_TLS_CRLFILE', + 'OPT_X_TLS_NEWCTX', + 'OPT_X_TLS_PROTOCOL_MIN', + 'OPT_X_TLS_PACKAGE', + 'OPT_X_TLS_ECNAME', + 'OPT_X_TLS_REQUIRE_SAN', + 'OPT_X_TLS_PROTOCOL_MAX', + 'OPT_X_TLS_PROTOCOL_SSL3', + 'OPT_X_TLS_PROTOCOL_TLS1_0', + 'OPT_X_TLS_PROTOCOL_TLS1_1', + 'OPT_X_TLS_PROTOCOL_TLS1_2', + 'OPT_X_TLS_PROTOCOL_TLS1_3', + 'OPT_X_SASL_NOCANON', + 'OPT_X_SASL_USERNAME', + 'OPT_CONNECT_ASYNC', + 'OPT_X_KEEPALIVE_IDLE', + 'OPT_X_KEEPALIVE_PROBES', + 'OPT_X_KEEPALIVE_INTERVAL', + ] + + for option_name in optional_options: + if hasattr(ldap, option_name): + ldap_option_map[getattr(ldap, option_name)] = option_name + + converted_options = {} + + for key, value in connection_options.items(): + if key in ldap_option_map: + converted_options[ldap_option_map[key]] = value + + return converted_options + + def _ldap_group_allow_to_gateway_format(self, result: list, ldap_group: str, deny=False, start_order=1): + """Convert an LDAP require or deny group to a Gateway mapper + + Args: + result: array to append the mapper to + ldap_group: An LDAP group query + deny: Whether the mapper denies or requires users to be in the group + start_order: Starting order value for the mappers + + Returns: + tuple: (List of Gateway-compatible organization mappers, next_order) + """ + if ldap_group is None: + return result, start_order + + if deny: + result.append( + { + "name": "LDAP-DenyGroup", + "authenticator": -1, + "map_type": "allow", + "revoke": True, + "triggers": {"groups": {"has_or": [ldap_group]}}, + "order": start_order, + } + ) + else: + result.append( + { + "name": "LDAP-RequireGroup", + "authenticator": -1, + "map_type": "allow", + "revoke": False, + "triggers": {"groups": {"has_and": [ldap_group]}}, + "order": start_order, + } + ) + + return result, start_order + 1 diff --git a/awx/sso/utils/saml_migrator.py b/awx/sso/utils/saml_migrator.py index 506b0c3736..4c55d8ac8c 100644 --- a/awx/sso/utils/saml_migrator.py +++ b/awx/sso/utils/saml_migrator.py @@ -111,7 +111,7 @@ class SAMLMigrator(BaseAuthenticatorMigrator): # Generate authenticator name and slug authenticator_name = f"AWX-{category.replace('-', '_').title()}-{name}" - authenticator_slug = self._generate_authenticator_slug("saml", category, name) + authenticator_slug = self._generate_authenticator_slug("saml", category) self._write_output(f"\n--- Processing {category} authenticator ---") self._write_output(f"Name: {authenticator_name}")