* collect controller ldap configuration

* translate role mapping and submit ldap authenticator

* implement require and deny group mapping

* remove all references of awx in the naming

* fix linter issues

* address PR feedback

* update ldap authenticator naming

* update github authenticator naming

* assume that server_uri is always a string

* update order of evaluation for require and deny groups

* cleanup and move ldap related functions into the ldap migrator

* add skip option for saml

* update saml authenticator to new slug format

* update azuread authenticator to new slug format
This commit is contained in:
Peter Braun
2025-07-11 18:05:23 +02:00
committed by thedoubl3j
parent 6b2e9a66d5
commit c5e55fe0f5
10 changed files with 1137 additions and 87 deletions

View File

@@ -4,6 +4,7 @@ import os
from django.core.management.base import BaseCommand
from awx.sso.utils.azure_ad_migrator import AzureADMigrator
from awx.sso.utils.github_migrator import GitHubMigrator
from awx.sso.utils.ldap_migrator import LDAPMigrator
from awx.sso.utils.oidc_migrator import OIDCMigrator
from awx.sso.utils.saml_migrator import SAMLMigrator
from awx.main.utils.gateway_client import GatewayClient, GatewayAPIError
@@ -16,6 +17,7 @@ class Command(BaseCommand):
parser.add_argument('--skip-oidc', action='store_true', help='Skip importing GitHub and generic OIDC authenticators')
parser.add_argument('--skip-ldap', action='store_true', help='Skip importing LDAP authenticators')
parser.add_argument('--skip-ad', action='store_true', help='Skip importing Azure AD authenticator')
parser.add_argument('--skip-saml', action='store_true', help='Skip importing SAML authenticator')
def handle(self, *args, **options):
# Read Gateway connection parameters from environment variables
@@ -25,8 +27,9 @@ class Command(BaseCommand):
gateway_skip_verify = os.getenv('GATEWAY_SKIP_VERIFY', '').lower() in ('true', '1', 'yes', 'on')
skip_oidc = options['skip_oidc']
# skip_ldap = options['skip_ldap']
skip_ldap = options['skip_ldap']
skip_ad = options['skip_ad']
skip_saml = options['skip_saml']
# If the management command isn't called with all parameters needed to talk to Gateway, consider
# it a dry-run and exit cleanly
@@ -56,12 +59,16 @@ class Command(BaseCommand):
if not skip_oidc:
migrators.append(GitHubMigrator(gateway_client, self))
migrators.append(OIDCMigrator(gateway_client, self))
if not skip_saml:
migrators.append(SAMLMigrator(gateway_client, self))
# if not skip_ldap:
# migrators.append(LDAPMigrator(gateway_client, self))
if not skip_ad:
migrators.append(AzureADMigrator(gateway_client, self))
if not skip_ldap:
migrators.append(LDAPMigrator(gateway_client, self))
# Run migrations
total_results = {
'created': 0,

View File

@@ -3,7 +3,7 @@ Unit tests for auth migration utilities.
"""
import pytest
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format, role_map_to_gateway_format
def get_org_mappers(org_map, start_order=1):
@@ -18,6 +18,12 @@ def get_team_mappers(team_map, start_order=1):
return result
def get_role_mappers(role_map, start_order=1):
"""Helper function to get just the mappers from role_map_to_gateway_format."""
result, _ = role_map_to_gateway_format(role_map, start_order)
return result
class TestOrgMapToGatewayFormat:
def test_none_input(self):

View File

@@ -18,22 +18,66 @@ class TestBaseAuthenticatorMigrator:
def test_generate_authenticator_slug(self):
"""Test slug generation is deterministic."""
slug1 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client123')
slug2 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client123')
slug1 = self.migrator._generate_authenticator_slug('github', 'github-org')
slug2 = self.migrator._generate_authenticator_slug('github', 'github-org')
assert slug1 == slug2
assert slug1.startswith('awx-github-')
assert len(slug1.split('-')[-1]) == 8 # Hash should be 8 characters
assert slug1 == 'aap-github-github-org'
def test_generate_authenticator_slug_different_inputs(self):
"""Test that different inputs generate different slugs."""
slug1 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client123')
slug2 = self.migrator._generate_authenticator_slug('github', 'github-org', 'client456')
slug3 = self.migrator._generate_authenticator_slug('ldap', 'ldap', 'ldap://server')
slug1 = self.migrator._generate_authenticator_slug('github', 'github-org')
slug2 = self.migrator._generate_authenticator_slug('github', 'github-team')
slug3 = self.migrator._generate_authenticator_slug('ldap', 'ldap')
assert slug1 != slug2
assert slug1 != slug3
assert slug2 != slug3
assert slug1 == 'aap-github-github-org'
assert slug2 == 'aap-github-github-team'
assert slug3 == 'aap-ldap-ldap'
def test_generate_authenticator_slug_ldap_variants(self):
"""Test LDAP authenticator slug generation for all supported variants."""
# Test all LDAP authenticator naming variants
ldap_base = self.migrator._generate_authenticator_slug('ldap', 'ldap')
ldap1 = self.migrator._generate_authenticator_slug('ldap', 'ldap1')
ldap2 = self.migrator._generate_authenticator_slug('ldap', 'ldap2')
ldap3 = self.migrator._generate_authenticator_slug('ldap', 'ldap3')
ldap4 = self.migrator._generate_authenticator_slug('ldap', 'ldap4')
ldap5 = self.migrator._generate_authenticator_slug('ldap', 'ldap5')
# Verify correct slug format
assert ldap_base == 'aap-ldap-ldap'
assert ldap1 == 'aap-ldap-ldap1'
assert ldap2 == 'aap-ldap-ldap2'
assert ldap3 == 'aap-ldap-ldap3'
assert ldap4 == 'aap-ldap-ldap4'
assert ldap5 == 'aap-ldap-ldap5'
# Verify all slugs are unique
all_slugs = [ldap_base, ldap1, ldap2, ldap3, ldap4, ldap5]
assert len(all_slugs) == len(set(all_slugs))
def test_generate_authenticator_slug_github_variants(self):
"""Test GitHub authenticator slug generation for all supported variants."""
# Test all GitHub authenticator naming variants
github_base = self.migrator._generate_authenticator_slug('github', 'github')
github_org = self.migrator._generate_authenticator_slug('github', 'github-org')
github_team = self.migrator._generate_authenticator_slug('github', 'github-team')
github_enterprise_org = self.migrator._generate_authenticator_slug('github', 'github-enterprise-org')
github_enterprise_team = self.migrator._generate_authenticator_slug('github', 'github-enterprise-team')
# Verify correct slug format
assert github_base == 'aap-github-github'
assert github_org == 'aap-github-github-org'
assert github_team == 'aap-github-github-team'
assert github_enterprise_org == 'aap-github-github-enterprise-org'
assert github_enterprise_team == 'aap-github-github-enterprise-team'
# Verify all slugs are unique
all_slugs = [github_base, github_org, github_team, github_enterprise_org, github_enterprise_team]
assert len(all_slugs) == len(set(all_slugs))
def test_get_mapper_ignore_keys_default(self):
"""Test default mapper ignore keys."""
@@ -224,34 +268,39 @@ class TestMapperComparison:
def test_mappers_match_structurally_identical(self):
"""Test that identical mappers match structurally."""
mapper1 = {'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'}
mapper1 = {'name': 'myorg - engineering', 'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'}
mapper2 = mapper1.copy()
assert self.migrator._mappers_match_structurally(mapper1, mapper2) is True
def test_mappers_match_structurally_different_fields(self):
"""Test that mappers don't match structurally when key fields differ."""
base_mapper = {'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'}
"""Test that mappers match structurally when only name is the same."""
base_mapper = {'name': 'myorg - engineering', 'organization': 'myorg', 'team': 'engineering', 'map_type': 'team', 'role': 'Team Member'}
# Test different organization
# Test different organization but same name - should still match
mapper2 = base_mapper.copy()
mapper2['organization'] = 'otherorg'
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True
# Test different team
# Test different team but same name - should still match
mapper2 = base_mapper.copy()
mapper2['team'] = 'qa'
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True
# Test different map_type
# Test different map_type but same name - should still match
mapper2 = base_mapper.copy()
mapper2['map_type'] = 'organization'
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True
# Test different role
# Test different role but same name - should still match
mapper2 = base_mapper.copy()
mapper2['role'] = 'Organization Admin'
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is True
# Test different name - should not match
mapper2 = base_mapper.copy()
mapper2['name'] = 'otherorg - qa'
assert self.migrator._mappers_match_structurally(base_mapper, mapper2) is False
def test_mapper_configs_match_identical(self):
@@ -593,36 +642,45 @@ def test_authenticator_configs_match_edge_cases(existing_auth, new_config, ignor
@pytest.mark.parametrize(
"mapper1,mapper2,ignore_keys,expected",
"mapper1,mapper2,expected",
[
# Test with None team values (org mappers)
# Test with same name
(
{'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'},
{'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'},
[],
{'name': 'myorg - Organization Admins', 'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'},
{'name': 'myorg - Organization Admins', 'organization': 'myorg', 'team': None, 'map_type': 'organization', 'role': 'Organization Admin'},
True,
),
# Test with ignore keys (for structural matching, ignore_keys shouldn't matter)
# Test with same name but different other fields
(
{'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member', 'id': 123},
{'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member', 'id': 456},
['id'],
{'name': 'myorg - eng', 'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member', 'id': 123},
{'name': 'myorg - eng', 'organization': 'otherorg', 'team': 'qa', 'map_type': 'organization', 'role': 'Organization Admin', 'id': 456},
True,
),
# Test structural mismatch
# Test with different names
(
{'name': 'myorg - eng', 'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'},
{'name': 'myorg - qa', 'organization': 'myorg', 'team': 'qa', 'map_type': 'team', 'role': 'Team Member'},
False,
),
# Test with missing name
(
{'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'},
{'organization': 'myorg', 'team': 'qa', 'map_type': 'team', 'role': 'Team Member'},
[],
{'name': 'myorg - eng', 'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'},
False,
),
# Test with both missing name
(
{'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'},
{'organization': 'myorg', 'team': 'eng', 'map_type': 'team', 'role': 'Team Member'},
True,
),
],
)
def test_mappers_match_structurally_edge_cases(mapper1, mapper2, ignore_keys, expected):
"""Test edge cases for mapper structural matching."""
def test_mappers_match_structurally_edge_cases(mapper1, mapper2, expected):
"""Test edge cases for mapper structural matching based on name."""
gateway_client = Mock()
command = Mock()
migrator = BaseAuthenticatorMigrator(gateway_client, command)
result = migrator._mappers_match_structurally(mapper1, mapper2, ignore_keys)
result = migrator._mappers_match_structurally(mapper1, mapper2)
assert result == expected

View File

@@ -0,0 +1,614 @@
"""
Unit tests for role mapping utilities.
"""
import pytest
from awx.main.utils.gateway_mapping import role_map_to_gateway_format
from awx.sso.utils.ldap_migrator import LDAPMigrator
def get_role_mappers(role_map, start_order=1):
"""Helper function to get just the mappers from role_map_to_gateway_format."""
result, _ = role_map_to_gateway_format(role_map, start_order)
return result
def ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=1):
"""Helper function to test LDAP group allow mapping via LDAPMigrator."""
migrator = LDAPMigrator()
return migrator._ldap_group_allow_to_gateway_format(result, ldap_group, deny, start_order)
class TestRoleMapToGatewayFormat:
"""Tests for role_map_to_gateway_format function."""
def test_none_input(self):
"""Test that None input returns empty list."""
result, next_order = role_map_to_gateway_format(None)
assert result == []
assert next_order == 1 # Default start_order
def test_empty_dict(self):
"""Test that empty dict returns empty list."""
result, next_order = role_map_to_gateway_format({})
assert result == []
assert next_order == 1
def test_is_superuser_single_group(self):
"""Test is_superuser with single group."""
role_map = {"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_superuser - role",
"authenticator": -1,
"revoke": True,
"map_type": "is_superuser",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_is_superuser_multiple_groups(self):
"""Test is_superuser with multiple groups."""
role_map = {"is_superuser": ["cn=super_users,dc=example,dc=com", "cn=admins,dc=example,dc=com"]}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_superuser - role",
"authenticator": -1,
"revoke": True,
"map_type": "is_superuser",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=super_users,dc=example,dc=com", "cn=admins,dc=example,dc=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_is_system_auditor_single_group(self):
"""Test is_system_auditor with single group."""
role_map = {"is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_system_auditor - role",
"authenticator": -1,
"revoke": True,
"map_type": "role",
"role": "Platform Auditor",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=auditors,dc=example,dc=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_is_system_auditor_multiple_groups(self):
"""Test is_system_auditor with multiple groups."""
role_map = {"is_system_auditor": ["cn=auditors,dc=example,dc=com", "cn=viewers,dc=example,dc=com"]}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_system_auditor - role",
"authenticator": -1,
"revoke": True,
"map_type": "role",
"role": "Platform Auditor",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=auditors,dc=example,dc=com", "cn=viewers,dc=example,dc=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_multiple_roles(self):
"""Test multiple role mappings."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_superuser - role",
"authenticator": -1,
"revoke": True,
"map_type": "is_superuser",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=super_users,dc=example,dc=com"],
}
},
"order": 1,
},
{
"name": "is_system_auditor - role",
"authenticator": -1,
"revoke": True,
"map_type": "role",
"role": "Platform Auditor",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=auditors,dc=example,dc=com"],
}
},
"order": 2,
},
]
assert result == expected
def test_unsupported_role_flag(self):
"""Test that unsupported role flags are ignored."""
role_map = {
"is_superuser": "cn=super_users,dc=example,dc=com",
"is_staff": "cn=staff,dc=example,dc=com", # Unsupported flag
"is_system_auditor": "cn=auditors,dc=example,dc=com",
}
result, _ = role_map_to_gateway_format(role_map)
# Should only have 2 mappers (is_superuser and is_system_auditor)
assert len(result) == 2
assert result[0]["map_type"] == "is_superuser"
assert result[1]["map_type"] == "role"
assert result[1]["role"] == "Platform Auditor"
def test_order_increments_correctly(self):
"""Test that order values increment correctly."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 2
assert result[0]["order"] == 1
assert result[1]["order"] == 2
def test_start_order_parameter(self):
"""Test that start_order parameter is respected."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com"}
result, next_order = role_map_to_gateway_format(role_map, start_order=5)
assert result[0]["order"] == 5
assert next_order == 6
def test_string_to_list_conversion(self):
"""Test that string groups are converted to lists."""
role_map = {"is_superuser": "single-group"}
result, _ = role_map_to_gateway_format(role_map)
# Should convert string to list for has_or
assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"]
def test_triggers_format_validation(self):
"""Test that trigger formats match Gateway specification."""
role_map = {"is_superuser": ["group1", "group2"]}
result, _ = role_map_to_gateway_format(role_map)
# Validate that triggers follow Gateway format
triggers = result[0]["triggers"]
assert "groups" in triggers
assert "has_or" in triggers["groups"]
assert isinstance(triggers["groups"]["has_or"], list)
assert triggers["groups"]["has_or"] == ["group1", "group2"]
def test_ldap_dn_format(self):
"""Test with realistic LDAP DN format."""
role_map = {
"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com",
"is_system_auditor": "cn=awx_auditors,OU=administration groups,DC=contoso,DC=com",
}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 2
assert result[0]["triggers"]["groups"]["has_or"] == ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"]
assert result[1]["triggers"]["groups"]["has_or"] == ["cn=awx_auditors,OU=administration groups,DC=contoso,DC=com"]
def test_gateway_format_compliance(self):
"""Test that all results comply with Gateway role mapping format."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
for mapping in result:
# Required fields per Gateway spec
assert "name" in mapping
assert "authenticator" in mapping
assert "map_type" in mapping
assert "organization" in mapping
assert "team" in mapping
assert "triggers" in mapping
assert "order" in mapping
assert "revoke" in mapping
# Field types
assert isinstance(mapping["name"], str)
assert isinstance(mapping["authenticator"], int)
assert mapping["map_type"] in ["is_superuser", "role"]
assert mapping["organization"] is None
assert mapping["team"] is None
assert isinstance(mapping["triggers"], dict)
assert isinstance(mapping["order"], int)
assert isinstance(mapping["revoke"], bool)
# Specific field validations based on map_type
if mapping["map_type"] == "is_superuser":
assert "role" not in mapping
elif mapping["map_type"] == "role":
assert "role" in mapping
assert isinstance(mapping["role"], str)
assert mapping["role"] == "Platform Auditor"
# Parametrized tests for role mappings
@pytest.mark.parametrize(
"role_map,expected_length",
[
(None, 0),
({}, 0),
({"is_superuser": "group1"}, 1),
({"is_system_auditor": "group1"}, 1),
({"is_superuser": "group1", "is_system_auditor": "group2"}, 2),
({"is_staff": "group1"}, 0), # Unsupported flag
({"is_superuser": "group1", "is_staff": "group2", "is_system_auditor": "group3"}, 2), # Mixed supported/unsupported
],
)
def test_role_map_result_lengths(role_map, expected_length):
"""Test that role_map_to_gateway_format returns expected number of mappings."""
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == expected_length
# Edge case tests
def test_empty_groups_handling():
"""Test handling of empty group lists."""
role_map = {"is_superuser": []}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 1
assert result[0]["triggers"]["groups"]["has_or"] == []
def test_mixed_group_types():
"""Test handling of mixed group types (string and list)."""
role_map = {"is_superuser": "single-group", "is_system_auditor": ["group1", "group2"]}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 2
assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"]
assert result[1]["triggers"]["groups"]["has_or"] == ["group1", "group2"]
def test_realistic_ldap_user_flags_by_group():
"""Test with realistic LDAP USER_FLAGS_BY_GROUP data."""
role_map = {"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"}
result, _ = role_map_to_gateway_format(role_map)
# This is exactly the use case from the user's example
assert len(result) == 1
assert result[0]["map_type"] == "is_superuser"
assert result[0]["triggers"]["groups"]["has_or"] == ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"]
assert result[0]["revoke"] is True
assert result[0]["team"] is None
assert result[0]["organization"] is None
class TestLdapGroupAllowToGatewayFormat:
"""Tests for ldap_group_allow_to_gateway_format function."""
def test_none_input_with_empty_result(self):
"""Test that None input with empty result returns unchanged result."""
result = []
output_result, next_order = ldap_group_allow_to_gateway_format(result, None, deny=False)
assert output_result == []
assert next_order == 1 # Default start_order
def test_none_input_with_existing_result(self):
"""Test that None input with existing mappers returns unchanged result."""
result = [{"existing": "mapper"}]
output_result, next_order = ldap_group_allow_to_gateway_format(result, None, deny=False, start_order=5)
assert output_result == [{"existing": "mapper"}]
assert next_order == 5 # start_order unchanged
def test_require_group_mapping(self):
"""Test LDAP REQUIRE_GROUP mapping (deny=False)."""
result = []
ldap_group = "cn=allowed_users,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=1)
expected = [
{
"name": "LDAP-RequireGroup",
"authenticator": -1,
"map_type": "allow",
"revoke": False,
"triggers": {"groups": {"has_and": ["cn=allowed_users,dc=example,dc=com"]}},
"order": 1,
}
]
assert output_result == expected
assert next_order == 2
def test_deny_group_mapping(self):
"""Test LDAP DENY_GROUP mapping (deny=True)."""
result = []
ldap_group = "cn=blocked_users,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=True, start_order=1)
expected = [
{
"name": "LDAP-DenyGroup",
"authenticator": -1,
"map_type": "allow",
"revoke": True,
"triggers": {"groups": {"has_or": ["cn=blocked_users,dc=example,dc=com"]}},
"order": 1,
}
]
assert output_result == expected
assert next_order == 2
def test_appending_to_existing_result(self):
"""Test appending to existing result list."""
existing_mapper = {
"name": "existing-mapper",
"authenticator": -1,
"map_type": "role",
"order": 1,
}
result = [existing_mapper]
ldap_group = "cn=new_group,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=2)
assert len(output_result) == 2
assert output_result[0] == existing_mapper # Original mapper unchanged
assert output_result[1]["name"] == "LDAP-RequireGroup"
assert output_result[1]["order"] == 2
assert next_order == 3
def test_custom_start_order(self):
"""Test that custom start_order is respected."""
result = []
ldap_group = "cn=test_group,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=10)
assert output_result[0]["order"] == 10
assert next_order == 11
def test_require_vs_deny_trigger_differences(self):
"""Test the difference between require and deny group triggers."""
ldap_group = "cn=test_group,dc=example,dc=com"
# Test require group (deny=False)
require_result, _ = ldap_group_allow_to_gateway_format([], ldap_group, deny=False)
# Test deny group (deny=True)
deny_result, _ = ldap_group_allow_to_gateway_format([], ldap_group, deny=True)
# Require group should use has_and
assert require_result[0]["triggers"]["groups"]["has_and"] == ["cn=test_group,dc=example,dc=com"]
assert require_result[0]["revoke"] is False
assert require_result[0]["name"] == "LDAP-RequireGroup"
# Deny group should use has_or
assert deny_result[0]["triggers"]["groups"]["has_or"] == ["cn=test_group,dc=example,dc=com"]
assert deny_result[0]["revoke"] is True
assert deny_result[0]["name"] == "LDAP-DenyGroup"
def test_realistic_ldap_dn_format(self):
"""Test with realistic LDAP DN format."""
result = []
# Test with require group
require_group = "cn=awx_users,OU=application groups,DC=contoso,DC=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, require_group, deny=False, start_order=1)
assert len(output_result) == 1
assert output_result[0]["triggers"]["groups"]["has_and"] == ["cn=awx_users,OU=application groups,DC=contoso,DC=com"]
assert output_result[0]["name"] == "LDAP-RequireGroup"
assert next_order == 2
def test_multiple_sequential_calls(self):
"""Test multiple sequential calls to build complex allow mappers."""
result = []
# Add deny group first
result, next_order = ldap_group_allow_to_gateway_format(result, "cn=blocked,dc=example,dc=com", deny=True, start_order=1)
# Add require group second
result, next_order = ldap_group_allow_to_gateway_format(result, "cn=allowed,dc=example,dc=com", deny=False, start_order=next_order)
assert len(result) == 2
# First mapper should be deny group
assert result[0]["name"] == "LDAP-DenyGroup"
assert result[0]["revoke"] is True
assert result[0]["triggers"]["groups"]["has_or"] == ["cn=blocked,dc=example,dc=com"]
assert result[0]["order"] == 1
# Second mapper should be require group
assert result[1]["name"] == "LDAP-RequireGroup"
assert result[1]["revoke"] is False
assert result[1]["triggers"]["groups"]["has_and"] == ["cn=allowed,dc=example,dc=com"]
assert result[1]["order"] == 2
assert next_order == 3
def test_gateway_format_compliance(self):
"""Test that all results comply with Gateway allow mapping format."""
result = []
# Test both deny and require groups
result, _ = ldap_group_allow_to_gateway_format(result, "cn=denied,dc=example,dc=com", deny=True, start_order=1)
result, _ = ldap_group_allow_to_gateway_format(result, "cn=required,dc=example,dc=com", deny=False, start_order=2)
for mapping in result:
# Required fields per Gateway spec
assert "name" in mapping
assert "authenticator" in mapping
assert "map_type" in mapping
assert "triggers" in mapping
assert "order" in mapping
assert "revoke" in mapping
# Field types
assert isinstance(mapping["name"], str)
assert isinstance(mapping["authenticator"], int)
assert mapping["map_type"] == "allow"
assert isinstance(mapping["triggers"], dict)
assert isinstance(mapping["order"], int)
assert isinstance(mapping["revoke"], bool)
# Trigger format validation
assert "groups" in mapping["triggers"]
groups_trigger = mapping["triggers"]["groups"]
# Should have either has_and or has_or, but not both
has_and = "has_and" in groups_trigger
has_or = "has_or" in groups_trigger
assert has_and != has_or # XOR - exactly one should be true
if has_and:
assert isinstance(groups_trigger["has_and"], list)
assert len(groups_trigger["has_and"]) == 1
if has_or:
assert isinstance(groups_trigger["has_or"], list)
assert len(groups_trigger["has_or"]) == 1
def test_original_result_not_modified_when_none(self):
"""Test that original result list is not modified when ldap_group is None."""
original_result = [{"original": "mapper"}]
result_copy = original_result.copy()
output_result, _ = ldap_group_allow_to_gateway_format(original_result, None, deny=False)
# Original list should be unchanged
assert original_result == result_copy
# Output should be the same reference
assert output_result is original_result
def test_empty_string_group(self):
"""Test handling of empty string group."""
result = []
output_result, next_order = ldap_group_allow_to_gateway_format(result, "", deny=False, start_order=1)
# Should still create a mapper even with empty string
assert len(output_result) == 1
assert output_result[0]["triggers"]["groups"]["has_and"] == [""]
assert next_order == 2
# Parametrized tests for ldap_group_allow_to_gateway_format
@pytest.mark.parametrize(
"ldap_group,deny,expected_name,expected_revoke,expected_trigger_type",
[
("cn=test,dc=example,dc=com", True, "LDAP-DenyGroup", True, "has_or"),
("cn=test,dc=example,dc=com", False, "LDAP-RequireGroup", False, "has_and"),
("cn=users,ou=groups,dc=company,dc=com", True, "LDAP-DenyGroup", True, "has_or"),
("cn=users,ou=groups,dc=company,dc=com", False, "LDAP-RequireGroup", False, "has_and"),
],
)
def test_ldap_group_parametrized(ldap_group, deny, expected_name, expected_revoke, expected_trigger_type):
"""Parametrized test for various LDAP group configurations."""
result = []
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=deny, start_order=1)
assert len(output_result) == 1
mapper = output_result[0]
assert mapper["name"] == expected_name
assert mapper["revoke"] == expected_revoke
assert expected_trigger_type in mapper["triggers"]["groups"]
assert mapper["triggers"]["groups"][expected_trigger_type] == [ldap_group]
assert next_order == 2
def test_realistic_awx_ldap_migration_scenario():
"""Test realistic scenario from AWX LDAP migration."""
result = []
# Simulate AWX LDAP configuration with both REQUIRE_GROUP and DENY_GROUP
deny_group = "cn=blocked_users,OU=blocked groups,DC=contoso,DC=com"
require_group = "cn=awx_users,OU=application groups,DC=contoso,DC=com"
# Add deny group first (as in the migrator)
result, next_order = ldap_group_allow_to_gateway_format(result, deny_group, deny=True, start_order=1)
# Add require group second
result, next_order = ldap_group_allow_to_gateway_format(result, require_group, deny=False, start_order=next_order)
# Should have 2 allow mappers
assert len(result) == 2
# Verify deny group mapper
deny_mapper = result[0]
assert deny_mapper["name"] == "LDAP-DenyGroup"
assert deny_mapper["map_type"] == "allow"
assert deny_mapper["revoke"] is True
assert deny_mapper["triggers"]["groups"]["has_or"] == [deny_group]
assert deny_mapper["order"] == 1
# Verify require group mapper
require_mapper = result[1]
assert require_mapper["name"] == "LDAP-RequireGroup"
assert require_mapper["map_type"] == "allow"
assert require_mapper["revoke"] is False
assert require_mapper["triggers"]["groups"]["has_and"] == [require_group]
assert require_mapper["order"] == 2
assert next_order == 3

View File

@@ -163,3 +163,67 @@ def org_map_to_gateway_format(org_map, start_order=1):
order += 1
return result, order
def role_map_to_gateway_format(role_map, start_order=1):
"""Convert AWX role mapping to Gateway authenticator format.
Args:
role_map: An LDAP or SAML role mapping
start_order: Starting order value for the mappers
Returns:
tuple: (List of Gateway-compatible organization mappers, next_order)
"""
if role_map is None:
return [], start_order
result = []
order = start_order
for flag in role_map:
groups = role_map[flag]
if type(groups) is str:
groups = [groups]
if flag == 'is_superuser':
# Gateway has a special map_type for superusers
result.append(
{
"name": f"{flag} - role",
"authenticator": -1,
"revoke": True,
"map_type": flag,
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": groups,
}
},
"order": order,
}
)
elif flag == 'is_system_auditor':
# roles other than superuser must be represented as a generic role mapper
result.append(
{
"name": f"{flag} - role",
"authenticator": -1,
"revoke": True,
"map_type": "role",
"role": "Platform Auditor",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": groups,
}
},
"order": order,
}
)
order += 1
return result, order

View File

@@ -46,7 +46,7 @@ class AzureADMigrator(BaseAuthenticatorMigrator):
# Generate authenticator name and slug
authenticator_name = "Controller Azure AD"
authenticator_slug = self._generate_authenticator_slug("azure_ad", category, key_value)
authenticator_slug = self._generate_authenticator_slug("azure_ad", category)
return [
{

View File

@@ -5,8 +5,6 @@ This module defines the contract that all specific authenticator migrators must
"""
from awx.main.utils.gateway_client import GatewayAPIError
import re
import hashlib
class BaseAuthenticatorMigrator:
@@ -98,15 +96,9 @@ class BaseAuthenticatorMigrator:
"""
raise NotImplementedError("Subclasses must implement get_authenticator_type()")
def _generate_authenticator_slug(self, auth_type, category, identifier):
def _generate_authenticator_slug(self, auth_type, category):
"""Generate a deterministic slug for an authenticator."""
base_string = f"awx-{auth_type}-{category}-{identifier}"
cleaned = re.sub(r'[^a-zA-Z0-9]+', '-', base_string.lower())
cleaned = re.sub(r'^-+|-+$', '', cleaned)
cleaned = re.sub(r'-+', '-', cleaned)
slug_hash = hashlib.md5(cleaned.encode('utf-8')).hexdigest()[:8]
final_slug = f"awx-{auth_type}-{slug_hash}"
return final_slug
return f"aap-{auth_type}-{category}"
def submit_authenticator(self, gateway_config, ignore_keys=[], config={}):
"""
@@ -273,7 +265,7 @@ class BaseAuthenticatorMigrator:
# Try to find a matching existing mapper
for existing_mapper in existing_mappers:
if self._mappers_match_structurally(existing_mapper, new_mapper, ignore_keys):
if self._mappers_match_structurally(existing_mapper, new_mapper):
matched_existing = existing_mapper
break
@@ -288,7 +280,7 @@ class BaseAuthenticatorMigrator:
return mappers_to_update, mappers_to_create
def _mappers_match_structurally(self, existing_mapper, new_mapper, ignore_keys=None):
def _mappers_match_structurally(self, existing_mapper, new_mapper):
"""
Check if two mappers match structurally (same organization, team, map_type, role).
This identifies if they represent the same logical mapping.
@@ -296,16 +288,13 @@ class BaseAuthenticatorMigrator:
Args:
existing_mapper: Existing mapper configuration from Gateway
new_mapper: New mapper configuration
ignore_keys: List of keys to ignore during comparison
Returns:
bool: True if mappers represent the same logical mapping
"""
if ignore_keys is None:
ignore_keys = []
# Compare key structural fields that identify the same logical mapper
structural_fields = ['organization', 'team', 'map_type', 'role']
structural_fields = ['name']
for field in structural_fields:
if existing_mapper.get(field) != new_mapper.get(field):
@@ -357,7 +346,9 @@ class BaseAuthenticatorMigrator:
category = config['category']
org_mappers = config.get('org_mappers', [])
team_mappers = config.get('team_mappers', [])
all_new_mappers = org_mappers + team_mappers
role_mappers = config.get('role_mappers', [])
allow_mappers = config.get('allow_mappers', [])
all_new_mappers = org_mappers + team_mappers + role_mappers + allow_mappers
if len(all_new_mappers) == 0:
self._write_output(f'No mappers to process for {category} authenticator')
@@ -366,6 +357,8 @@ class BaseAuthenticatorMigrator:
self._write_output(f'\n--- Processing mappers for {category} authenticator (ID: {authenticator_id}) ---')
self._write_output(f'Organization mappers: {len(org_mappers)}')
self._write_output(f'Team mappers: {len(team_mappers)}')
self._write_output(f'Role mappers: {len(role_mappers)}')
self._write_output(f'Allow mappers: {len(allow_mappers)}')
# Get existing mappers from Gateway
try:
@@ -380,6 +373,9 @@ class BaseAuthenticatorMigrator:
# Compare existing vs new mappers
mappers_to_update, mappers_to_create = self._compare_mapper_lists(existing_mappers, all_new_mappers, ignore_keys)
self._write_output(f'Mappers to create: {len(mappers_to_create)}')
self._write_output(f'Mappers to update: {len(mappers_to_update)}')
created_count = 0
updated_count = 0
failed_count = 0

View File

@@ -114,8 +114,8 @@ class GitHubMigrator(BaseAuthenticatorMigrator):
return False
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('-', '_').title()}"
authenticator_slug = self._generate_authenticator_slug('github', category, key_value)
authenticator_name = category
authenticator_slug = self._generate_authenticator_slug('github', category)
# Map AWX category to Gateway authenticator type
type_mapping = {

View File

@@ -4,7 +4,10 @@ LDAP authenticator migrator.
This module handles the migration of LDAP authenticators from AWX to Gateway.
"""
from django.conf import settings
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format, role_map_to_gateway_format
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
import ldap
class LDAPMigrator(BaseAuthenticatorMigrator):
@@ -24,39 +27,341 @@ class LDAPMigrator(BaseAuthenticatorMigrator):
Returns:
list: List of configured LDAP authentication providers with their settings
"""
# TODO: Implement LDAP configuration retrieval
# AWX supports up to 6 LDAP configurations: AUTH_LDAP (default) and AUTH_LDAP_1 through AUTH_LDAP_5
# LDAP settings typically include:
# - AUTH_LDAP_SERVER_URI
# - AUTH_LDAP_BIND_DN
# - AUTH_LDAP_BIND_PASSWORD
# - AUTH_LDAP_START_TLS
# - AUTH_LDAP_CONNECTION_OPTIONS
# - AUTH_LDAP_USER_SEARCH
# - AUTH_LDAP_USER_DN_TEMPLATE
# - AUTH_LDAP_USER_ATTR_MAP
# - AUTH_LDAP_GROUP_SEARCH
# - AUTH_LDAP_GROUP_TYPE
# - AUTH_LDAP_GROUP_TYPE_PARAMS
# - AUTH_LDAP_REQUIRE_GROUP
# - AUTH_LDAP_DENY_GROUP
# - AUTH_LDAP_USER_FLAGS_BY_GROUP
# - AUTH_LDAP_ORGANIZATION_MAP
# - AUTH_LDAP_TEAM_MAP
ldap_instances = [None, 1, 2, 3, 4, 5] # None represents the default AUTH_LDAP_ configuration
found_configs = []
for instance in ldap_instances:
# Build the prefix for this LDAP instance
prefix = f"AUTH_LDAP_{instance}_" if instance is not None else "AUTH_LDAP_"
category = f"ldap{instance}" if instance is not None else "ldap"
try:
# Get all LDAP settings for this instance
config_data = self._get_ldap_instance_config(prefix)
except Exception as e:
raise Exception(f'Could not retrieve {category} settings: {str(e)}')
# Skip if SERVER_URI is not configured (required for LDAP to function)
if not config_data.get('SERVER_URI'):
continue
# Convert organization, team, and role mappings to Gateway format
org_map_value = config_data.get('ORGANIZATION_MAP', {})
team_map_value = config_data.get('TEAM_MAP', {})
role_map_value = config_data.get('USER_FLAGS_BY_GROUP', {})
require_group_value = config_data.get('REQUIRE_GROUP', {})
deny_group_value = config_data.get('DENY_GROUP', {})
allow_mappers = []
# Start with order 1 and maintain sequence across org, team, and role mappers
allow_mappers, next_order = self._ldap_group_allow_to_gateway_format(allow_mappers, deny_group_value, deny=True, start_order=1)
allow_mappers, next_order = self._ldap_group_allow_to_gateway_format(allow_mappers, require_group_value, deny=False, start_order=next_order)
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=next_order)
team_mappers, next_order = team_map_to_gateway_format(team_map_value, start_order=next_order)
role_mappers, _ = role_map_to_gateway_format(role_map_value, start_order=next_order)
found_configs.append(
{
'category': category,
'settings': config_data,
'org_mappers': org_mappers,
'team_mappers': team_mappers,
'role_mappers': role_mappers,
'allow_mappers': allow_mappers,
}
)
return found_configs
def _get_ldap_instance_config(self, prefix):
"""
Get all LDAP configuration settings for a specific instance.
Args:
prefix: The setting prefix (e.g., 'AUTH_LDAP_' or 'AUTH_LDAP_1_')
Returns:
dict: Dictionary of LDAP configuration settings
"""
# Define all LDAP setting keys
ldap_keys = [
'SERVER_URI', # Required: LDAP server URI(s)
'BIND_DN', # Optional: Bind DN for authentication
'BIND_PASSWORD', # Optional: Bind password
'START_TLS', # Optional: Enable TLS
'CONNECTION_OPTIONS', # Optional: LDAP connection options
'USER_SEARCH', # Optional: User search configuration
'USER_DN_TEMPLATE', # Optional: User DN template
'USER_ATTR_MAP', # Optional: User attribute mapping
'GROUP_SEARCH', # Optional: Group search configuration
'GROUP_TYPE', # Optional: Group type class
'GROUP_TYPE_PARAMS', # Optional: Group type parameters
'REQUIRE_GROUP', # Optional: Required group DN
'DENY_GROUP', # Optional: Denied group DN
'USER_FLAGS_BY_GROUP', # Optional: User flags mapping
'ORGANIZATION_MAP', # Optional: Organization mapping
'TEAM_MAP', # Optional: Team mapping
]
config_data = {}
for key in ldap_keys:
setting_name = f"{prefix}{key}"
value = getattr(settings, setting_name, None)
# Handle special field types that need conversion
if key == 'GROUP_TYPE' and value:
# Convert GROUP_TYPE class to string representation
config_data[key] = type(value).__name__
elif key == 'SERVER_URI' and value:
# Convert SERVER_URI to list format if it's a comma-separated string
config_data[key] = [uri.strip() for uri in value.split(',')]
elif key in ['USER_SEARCH', 'GROUP_SEARCH'] and value:
# Convert LDAPSearch objects to list format [base_dn, scope, filter]
if hasattr(value, 'base_dn') and hasattr(value, 'filterstr'):
# Get the actual scope instead of hardcoding SCOPE_SUBTREE
scope = getattr(value, 'scope', ldap.SCOPE_SUBTREE) # 2 is SCOPE_SUBTREE default
scope_name = {ldap.SCOPE_BASE: 'SCOPE_BASE', ldap.SCOPE_ONELEVEL: 'SCOPE_ONELEVEL', ldap.SCOPE_SUBTREE: 'SCOPE_SUBTREE'}.get(
scope, 'SCOPE_SUBTREE'
)
config_data[key] = [value.base_dn, scope_name, value.filterstr]
else:
config_data[key] = value
elif key in ['USER_ATTR_MAP', 'GROUP_TYPE_PARAMS', 'USER_FLAGS_BY_GROUP', 'ORGANIZATION_MAP', 'TEAM_MAP']:
# Ensure dict fields are properly handled
config_data[key] = value if value is not None else {}
elif key == 'CONNECTION_OPTIONS' and value:
# CONNECTION_OPTIONS is a dict of LDAP options
config_data[key] = value if value is not None else {}
else:
# Store the value as-is for other fields
config_data[key] = value
return config_data
def create_gateway_authenticator(self, config):
"""Create an LDAP authenticator in Gateway."""
# TODO: Implement LDAP authenticator creation
# When implementing, use this pattern for slug generation:
# server_uri = settings.get('AUTH_LDAP_SERVER_URI', 'ldap')
# authenticator_slug = self._generate_authenticator_slug('ldap', category, server_uri)
# LDAP requires:
# - Server URI and connection settings
# - Bind DN and password for authentication
# - User and group search configurations
# - Attribute mapping for user fields
# - Group type and parameters
self._write_output('LDAP authenticator creation not yet implemented', 'warning')
return False
category = config['category']
settings = config['settings']
# Extract the first server URI for slug generation
authenticator_slug = self._generate_authenticator_slug('ldap', category)
# Build the gateway payload
gateway_config = {
'name': category,
'slug': authenticator_slug,
'type': 'ansible_base.authentication.authenticator_plugins.ldap',
'create_objects': True,
'remove_users': False,
'enabled': True,
'configuration': self._build_ldap_configuration(settings),
}
self._write_output(f'Creating LDAP authenticator: {gateway_config["name"]}')
# LDAP authenticators have auto-generated fields that should be ignored during comparison
# BIND_PASSWORD - encrypted value, can't be compared
ignore_keys = ['BIND_PASSWORD']
# Submit the authenticator using the base class method
return self.submit_authenticator(gateway_config, config=config, ignore_keys=ignore_keys)
def _build_ldap_configuration(self, settings):
"""Build the LDAP configuration section for Gateway."""
config = {}
# Server URI is required
if settings.get('SERVER_URI'):
config['SERVER_URI'] = settings['SERVER_URI']
# Authentication settings
if settings.get('BIND_DN'):
config['BIND_DN'] = settings['BIND_DN']
if settings.get('BIND_PASSWORD'):
config['BIND_PASSWORD'] = settings['BIND_PASSWORD']
# TLS settings
if settings.get('START_TLS') is not None:
config['START_TLS'] = settings['START_TLS']
# User search configuration
if settings.get('USER_SEARCH'):
config['USER_SEARCH'] = settings['USER_SEARCH']
# User attribute mapping
if settings.get('USER_ATTR_MAP'):
config['USER_ATTR_MAP'] = settings['USER_ATTR_MAP']
# Group search configuration
if settings.get('GROUP_SEARCH'):
config['GROUP_SEARCH'] = settings['GROUP_SEARCH']
# Group type and parameters
if settings.get('GROUP_TYPE'):
config['GROUP_TYPE'] = settings['GROUP_TYPE']
if settings.get('GROUP_TYPE_PARAMS'):
config['GROUP_TYPE_PARAMS'] = settings['GROUP_TYPE_PARAMS']
# Connection options - convert numeric LDAP constants to string keys
if settings.get('CONNECTION_OPTIONS'):
config['CONNECTION_OPTIONS'] = self._convert_ldap_connection_options(settings['CONNECTION_OPTIONS'])
# User DN template
if settings.get('USER_DN_TEMPLATE'):
config['USER_DN_TEMPLATE'] = settings['USER_DN_TEMPLATE']
# REQUIRE_GROUP and DENY_GROUP are handled as allow mappers, not included in config
# USER_FLAGS_BY_GROUP is handled as role mappers, not included in config
return config
def _convert_ldap_connection_options(self, connection_options):
"""
Convert numeric LDAP connection option constants to their string representations.
Uses the actual constants from the python-ldap library.
Args:
connection_options: Dictionary with numeric LDAP option keys
Returns:
dict: Dictionary with string LDAP option keys
"""
# Comprehensive mapping using LDAP constants as keys
ldap_option_map = {
# Basic LDAP options
ldap.OPT_API_INFO: 'OPT_API_INFO',
ldap.OPT_DEREF: 'OPT_DEREF',
ldap.OPT_SIZELIMIT: 'OPT_SIZELIMIT',
ldap.OPT_TIMELIMIT: 'OPT_TIMELIMIT',
ldap.OPT_REFERRALS: 'OPT_REFERRALS',
ldap.OPT_RESULT_CODE: 'OPT_RESULT_CODE',
ldap.OPT_ERROR_NUMBER: 'OPT_ERROR_NUMBER',
ldap.OPT_RESTART: 'OPT_RESTART',
ldap.OPT_PROTOCOL_VERSION: 'OPT_PROTOCOL_VERSION',
ldap.OPT_SERVER_CONTROLS: 'OPT_SERVER_CONTROLS',
ldap.OPT_CLIENT_CONTROLS: 'OPT_CLIENT_CONTROLS',
ldap.OPT_API_FEATURE_INFO: 'OPT_API_FEATURE_INFO',
ldap.OPT_HOST_NAME: 'OPT_HOST_NAME',
ldap.OPT_DESC: 'OPT_DESC',
ldap.OPT_DIAGNOSTIC_MESSAGE: 'OPT_DIAGNOSTIC_MESSAGE',
ldap.OPT_ERROR_STRING: 'OPT_ERROR_STRING',
ldap.OPT_MATCHED_DN: 'OPT_MATCHED_DN',
ldap.OPT_DEBUG_LEVEL: 'OPT_DEBUG_LEVEL',
ldap.OPT_TIMEOUT: 'OPT_TIMEOUT',
ldap.OPT_REFHOPLIMIT: 'OPT_REFHOPLIMIT',
ldap.OPT_NETWORK_TIMEOUT: 'OPT_NETWORK_TIMEOUT',
ldap.OPT_URI: 'OPT_URI',
# TLS options
ldap.OPT_X_TLS: 'OPT_X_TLS',
ldap.OPT_X_TLS_CTX: 'OPT_X_TLS_CTX',
ldap.OPT_X_TLS_CACERTFILE: 'OPT_X_TLS_CACERTFILE',
ldap.OPT_X_TLS_CACERTDIR: 'OPT_X_TLS_CACERTDIR',
ldap.OPT_X_TLS_CERTFILE: 'OPT_X_TLS_CERTFILE',
ldap.OPT_X_TLS_KEYFILE: 'OPT_X_TLS_KEYFILE',
ldap.OPT_X_TLS_REQUIRE_CERT: 'OPT_X_TLS_REQUIRE_CERT',
ldap.OPT_X_TLS_CIPHER_SUITE: 'OPT_X_TLS_CIPHER_SUITE',
ldap.OPT_X_TLS_RANDOM_FILE: 'OPT_X_TLS_RANDOM_FILE',
ldap.OPT_X_TLS_DHFILE: 'OPT_X_TLS_DHFILE',
ldap.OPT_X_TLS_NEVER: 'OPT_X_TLS_NEVER',
ldap.OPT_X_TLS_HARD: 'OPT_X_TLS_HARD',
ldap.OPT_X_TLS_DEMAND: 'OPT_X_TLS_DEMAND',
ldap.OPT_X_TLS_ALLOW: 'OPT_X_TLS_ALLOW',
ldap.OPT_X_TLS_TRY: 'OPT_X_TLS_TRY',
ldap.OPT_X_TLS_CRL_NONE: 'OPT_X_TLS_CRL_NONE',
ldap.OPT_X_TLS_CRL_PEER: 'OPT_X_TLS_CRL_PEER',
ldap.OPT_X_TLS_CRL_ALL: 'OPT_X_TLS_CRL_ALL',
# SASL options
ldap.OPT_X_SASL_MECH: 'OPT_X_SASL_MECH',
ldap.OPT_X_SASL_REALM: 'OPT_X_SASL_REALM',
ldap.OPT_X_SASL_AUTHCID: 'OPT_X_SASL_AUTHCID',
ldap.OPT_X_SASL_AUTHZID: 'OPT_X_SASL_AUTHZID',
ldap.OPT_X_SASL_SSF: 'OPT_X_SASL_SSF',
ldap.OPT_X_SASL_SSF_EXTERNAL: 'OPT_X_SASL_SSF_EXTERNAL',
ldap.OPT_X_SASL_SECPROPS: 'OPT_X_SASL_SECPROPS',
ldap.OPT_X_SASL_SSF_MIN: 'OPT_X_SASL_SSF_MIN',
ldap.OPT_X_SASL_SSF_MAX: 'OPT_X_SASL_SSF_MAX',
}
# Add optional options that may not be available in all versions
optional_options = [
'OPT_TCP_USER_TIMEOUT',
'OPT_DEFBASE',
'OPT_X_TLS_VERSION',
'OPT_X_TLS_CIPHER',
'OPT_X_TLS_PEERCERT',
'OPT_X_TLS_CRLCHECK',
'OPT_X_TLS_CRLFILE',
'OPT_X_TLS_NEWCTX',
'OPT_X_TLS_PROTOCOL_MIN',
'OPT_X_TLS_PACKAGE',
'OPT_X_TLS_ECNAME',
'OPT_X_TLS_REQUIRE_SAN',
'OPT_X_TLS_PROTOCOL_MAX',
'OPT_X_TLS_PROTOCOL_SSL3',
'OPT_X_TLS_PROTOCOL_TLS1_0',
'OPT_X_TLS_PROTOCOL_TLS1_1',
'OPT_X_TLS_PROTOCOL_TLS1_2',
'OPT_X_TLS_PROTOCOL_TLS1_3',
'OPT_X_SASL_NOCANON',
'OPT_X_SASL_USERNAME',
'OPT_CONNECT_ASYNC',
'OPT_X_KEEPALIVE_IDLE',
'OPT_X_KEEPALIVE_PROBES',
'OPT_X_KEEPALIVE_INTERVAL',
]
for option_name in optional_options:
if hasattr(ldap, option_name):
ldap_option_map[getattr(ldap, option_name)] = option_name
converted_options = {}
for key, value in connection_options.items():
if key in ldap_option_map:
converted_options[ldap_option_map[key]] = value
return converted_options
def _ldap_group_allow_to_gateway_format(self, result: list, ldap_group: str, deny=False, start_order=1):
"""Convert an LDAP require or deny group to a Gateway mapper
Args:
result: array to append the mapper to
ldap_group: An LDAP group query
deny: Whether the mapper denies or requires users to be in the group
start_order: Starting order value for the mappers
Returns:
tuple: (List of Gateway-compatible organization mappers, next_order)
"""
if ldap_group is None:
return result, start_order
if deny:
result.append(
{
"name": "LDAP-DenyGroup",
"authenticator": -1,
"map_type": "allow",
"revoke": True,
"triggers": {"groups": {"has_or": [ldap_group]}},
"order": start_order,
}
)
else:
result.append(
{
"name": "LDAP-RequireGroup",
"authenticator": -1,
"map_type": "allow",
"revoke": False,
"triggers": {"groups": {"has_and": [ldap_group]}},
"order": start_order,
}
)
return result, start_order + 1

View File

@@ -111,7 +111,7 @@ class SAMLMigrator(BaseAuthenticatorMigrator):
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('-', '_').title()}-{name}"
authenticator_slug = self._generate_authenticator_slug("saml", category, name)
authenticator_slug = self._generate_authenticator_slug("saml", category)
self._write_output(f"\n--- Processing {category} authenticator ---")
self._write_output(f"Name: {authenticator_name}")