[AAP-51531] Fix LDAP authentication mapping and bug in LDAP migration (#7061)

* Add LDAP support to gateway_mapping and expand test coverage

- Add new process_ldap_user_list function for LDAP group processing
- Add auth_type parameter to org_map_to_gateway_format and team_map_to_gateway_format
- Support both 'sso' and 'ldap' authentication types in mapping functions
- Fix syntax error and logic bug in existing code
- Add comprehensive unit tests for process_ldap_user_list function (13 test cases)
- Add unit tests for auth_type parameter functionality
- Update helper functions to support new auth_type parameter
- All tests pass and maintain backward compatibility

Technical changes:
- process_ldap_user_list handles None, boolean, string, and list inputs
- Proper type hints with mypy compatibility
- LDAP groups use 'has_or' trigger format vs SSO attribute matching
- Boolean True/False create Always/Never Allow triggers for LDAP
- Maintains proper ordering and mapper structure

Co-authored-by: Claude (Anthropic AI Assistant) <claude@anthropic.com>

* Fix empty list bug in process_ldap_user_list and add comprehensive tests

- Fix process_ldap_user_list to return empty list for empty input instead of creating invalid trigger
- Empty list [] now correctly returns no triggers instead of trigger with empty has_or array
- Add test case for empty list behavior in both LDAP and SSO functions
- Update existing test_empty_list to expect correct behavior (0 triggers)
- Maintain backward compatibility for all other input types
- Comprehensive testing confirms no regression in existing functionality

Bug Details:
- Before: process_ldap_user_list([]) returned [{'name': 'Match User Groups', 'trigger': {'groups': {'has_or': []}}}]
- After: process_ldap_user_list([]) returns [] (correct behavior)
- SSO function already handled this correctly

This prevents potential Gateway issues with empty has_or arrays and ensures logical consistency.

Co-authored-by: Claude (Anthropic AI Assistant) <claude@anthropic.com>

* Add comprehensive LDAP migrator tests and fix category handling

- Add comprehensive unit test suite for LDAPMigrator class (26 tests)
- Test LDAP configuration scenarios including multiple instances, mappings, and edge cases
- Add tests for mixed boolean/group mappings, special characters in org names, and empty configs
- Fix LDAP authenticator category to always be 'ldap' (not 'ldap<suffix>')
- Add auth_type='ldap' parameter to org_map_to_gateway_format and team_map_to_gateway_format calls
- Include AAP-51531 reference comments for specific test cases
- All tests passing (26/26)

Co-authored-by: Claude <claude@anthropic.com>

---------

Co-authored-by: Claude (Anthropic AI Assistant) <claude@anthropic.com>
This commit is contained in:
John Westcott IV
2025-08-14 08:52:30 -04:00
committed by thedoubl3j
parent e9928ff513
commit a7eb1ef763
4 changed files with 1294 additions and 14 deletions

View File

@@ -4,18 +4,24 @@ Unit tests for auth migration utilities.
import pytest
import re
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format, role_map_to_gateway_format, process_sso_user_list
from awx.main.utils.gateway_mapping import (
org_map_to_gateway_format,
team_map_to_gateway_format,
role_map_to_gateway_format,
process_sso_user_list,
process_ldap_user_list,
)
def get_org_mappers(org_map, start_order=1):
def get_org_mappers(org_map, start_order=1, auth_type='sso'):
"""Helper function to get just the mappers from org_map_to_gateway_format."""
result, _ = org_map_to_gateway_format(org_map, start_order)
result, _ = org_map_to_gateway_format(org_map, start_order, auth_type=auth_type)
return result
def get_team_mappers(team_map, start_order=1):
def get_team_mappers(team_map, start_order=1, auth_type='sso'):
"""Helper function to get just the mappers from team_map_to_gateway_format."""
result, _ = team_map_to_gateway_format(team_map, start_order)
result, _ = team_map_to_gateway_format(team_map, start_order, auth_type=auth_type)
return result
@@ -107,6 +113,100 @@ class TestProcessSSOUserList:
assert result[0]["trigger"]["attributes"]["username"]["matches"] == "/^admin.*@example\\.com$/"
assert result[1]["trigger"]["attributes"]["email"]["matches"] == "/^admin.*@example\\.com$/"
def test_empty_list(self):
"""Test that empty list creates no triggers."""
result = process_sso_user_list([])
assert len(result) == 0
class TestProcessLdapUserList:
"""Tests for the process_ldap_user_list function."""
def test_none_input(self):
"""Test that None creates no triggers (empty list)."""
result = process_ldap_user_list(None)
assert len(result) == 0
def test_none_in_list(self):
"""Test that [None] creates no triggers (empty list)."""
result = process_ldap_user_list([None])
assert len(result) == 0
def test_true_boolean(self):
"""Test that True creates 'Always Allow' trigger."""
result = process_ldap_user_list(True)
assert len(result) == 1
assert result[0]["name"] == "Always Allow"
assert result[0]["trigger"] == {"always": {}}
def test_true_boolean_in_list(self):
"""Test that [True] creates 'Always Allow' trigger."""
result = process_ldap_user_list([True])
assert len(result) == 1
assert result[0]["name"] == "Always Allow"
assert result[0]["trigger"] == {"always": {}}
def test_false_boolean(self):
"""Test that False creates 'Never Allow' trigger."""
result = process_ldap_user_list(False)
assert len(result) == 1
assert result[0]["name"] == "Never Allow"
assert result[0]["trigger"] == {"never": {}}
def test_false_boolean_in_list(self):
"""Test that [False] creates 'Never Allow' trigger."""
result = process_ldap_user_list([False])
assert len(result) == 1
assert result[0]["name"] == "Never Allow"
assert result[0]["trigger"] == {"never": {}}
def test_single_string_group(self):
"""Test that a single string creates group match trigger."""
result = process_ldap_user_list("admin_group")
assert len(result) == 1
assert result[0]["name"] == "Match User Groups"
assert result[0]["trigger"]["groups"]["has_or"] == ["admin_group"]
def test_single_string_group_in_list(self):
"""Test that a single string in list creates group match trigger."""
result = process_ldap_user_list(["admin_group"])
assert len(result) == 1
assert result[0]["name"] == "Match User Groups"
assert result[0]["trigger"]["groups"]["has_or"] == ["admin_group"]
def test_multiple_groups(self):
"""Test that multiple groups create single trigger with all groups."""
result = process_ldap_user_list(["group1", "group2", "group3"])
assert len(result) == 1
assert result[0]["name"] == "Match User Groups"
assert result[0]["trigger"]["groups"]["has_or"] == ["group1", "group2", "group3"]
def test_mixed_types_with_none(self):
"""Test that mixed types including None are handled correctly."""
result = process_ldap_user_list(["group1", None, "group2"])
assert len(result) == 1
assert result[0]["name"] == "Match User Groups"
assert result[0]["trigger"]["groups"]["has_or"] == ["group1", None, "group2"]
def test_mixed_types_with_boolean_string(self):
"""Test that boolean values mixed with strings are handled correctly."""
result = process_ldap_user_list(["group1", False, "group2"])
assert len(result) == 1
assert result[0]["name"] == "Match User Groups"
assert result[0]["trigger"]["groups"]["has_or"] == ["group1", False, "group2"]
def test_empty_list(self):
"""Test that empty list creates no triggers."""
result = process_ldap_user_list([])
assert len(result) == 0
def test_numeric_values(self):
"""Test that numeric values are handled correctly."""
result = process_ldap_user_list([123, "group1"])
assert len(result) == 1
assert result[0]["name"] == "Match User Groups"
assert result[0]["trigger"]["groups"]["has_or"] == [123, "group1"]
class TestOrgMapToGatewayFormat:
@@ -360,6 +460,44 @@ class TestOrgMapToGatewayFormat:
assert len(result) == 2
assert next_order == 3
def test_org_with_auth_type_sso(self):
"""Test org mapping with auth_type='sso' (default behavior)."""
org_map = {"myorg": {"users": ["testuser"]}}
result, _ = org_map_to_gateway_format(org_map, auth_type='sso')
assert len(result) == 1
mapping = result[0]
assert "Username equals testuser" in mapping["name"]
assert mapping["triggers"]["attributes"]["username"]["equals"] == "testuser"
def test_org_with_auth_type_ldap(self):
"""Test org mapping with auth_type='ldap'."""
org_map = {"myorg": {"users": ["admin_group"]}}
result, _ = org_map_to_gateway_format(org_map, auth_type='ldap')
assert len(result) == 1
mapping = result[0]
assert "Match User Groups" in mapping["name"]
assert mapping["triggers"]["groups"]["has_or"] == ["admin_group"]
def test_org_with_auth_type_ldap_boolean(self):
"""Test org mapping with auth_type='ldap' and boolean values."""
org_map = {"myorg": {"users": True, "admins": False}}
result, _ = org_map_to_gateway_format(org_map, auth_type='ldap')
assert len(result) == 2
user_mapping = next(m for m in result if "Users" in m["name"])
admin_mapping = next(m for m in result if "Admins" in m["name"])
assert "Always Allow" in user_mapping["name"]
assert user_mapping["triggers"]["always"] == {}
assert "Never Allow" in admin_mapping["name"]
assert admin_mapping["triggers"]["never"] == {}
class TestTeamMapToGatewayFormat:
"""Tests for team_map_to_gateway_format function."""
@@ -817,6 +955,44 @@ class TestTeamMapToGatewayFormat:
assert "team3" in teams
assert teams.count("team2") == 2 # team2 should appear twice
def test_team_with_auth_type_sso(self):
"""Test team mapping with auth_type='sso' (default behavior)."""
team_map = {"testteam": {"organization": "testorg", "users": ["testuser"]}}
result, _ = team_map_to_gateway_format(team_map, auth_type='sso')
assert len(result) == 1
mapping = result[0]
assert "Username equals testuser" in mapping["name"]
assert mapping["triggers"]["attributes"]["username"]["equals"] == "testuser"
def test_team_with_auth_type_ldap(self):
"""Test team mapping with auth_type='ldap'."""
team_map = {"testteam": {"organization": "testorg", "users": ["admin_group"]}}
result, _ = team_map_to_gateway_format(team_map, auth_type='ldap')
assert len(result) == 1
mapping = result[0]
assert "Match User Groups" in mapping["name"]
assert mapping["triggers"]["groups"]["has_or"] == ["admin_group"]
def test_team_with_auth_type_ldap_boolean(self):
"""Test team mapping with auth_type='ldap' and boolean values."""
team_map_true = {"testteam": {"organization": "testorg", "users": True}}
team_map_false = {"testteam": {"organization": "testorg", "users": False}}
result_true, _ = team_map_to_gateway_format(team_map_true, auth_type='ldap')
result_false, _ = team_map_to_gateway_format(team_map_false, auth_type='ldap')
assert len(result_true) == 1
assert "Always Allow" in result_true[0]["name"]
assert result_true[0]["triggers"]["always"] == {}
assert len(result_false) == 1
assert "Never Allow" in result_false[0]["name"]
assert result_false[0]["triggers"]["never"] == {}
# Parametrized tests for edge cases
@pytest.mark.parametrize(
@@ -927,3 +1103,40 @@ def test_team_gateway_format_compliance(team_map):
assert isinstance(mapping["role"], str)
assert isinstance(mapping["revoke"], bool)
assert isinstance(mapping["order"], int)
class TestAAP51531SpecificCase:
"""Test case specifically for JIRA AAP-51531 requirements."""
def test_ldap_networking_org_mapping_aap_51531(self):
"""Test the specific LDAP organization mapping case for JIRA AAP-51531."""
# This case is added for JIRA AAP-51531
org_map = {"Networking": {"admins": "cn=networkadmins,ou=groups,dc=example,dc=com", "users": True, "remove_admins": True, "remove_users": True}}
result = get_org_mappers(org_map, auth_type='ldap')
# Should create 2 mappers: one for admins, one for users
assert len(result) == 2
# Find admin and user mappers
admin_mapper = next((m for m in result if m['role'] == 'Organization Admin'), None)
user_mapper = next((m for m in result if m['role'] == 'Organization Member'), None)
assert admin_mapper is not None
assert user_mapper is not None
# Verify admin mapper details
assert admin_mapper['organization'] == 'Networking'
assert admin_mapper['revoke'] is True # remove_admins: true
assert 'Match User Groups' in admin_mapper['name']
assert admin_mapper['triggers']['groups']['has_or'] == ['cn=networkadmins,ou=groups,dc=example,dc=com']
# Verify user mapper details
assert user_mapper['organization'] == 'Networking'
assert user_mapper['revoke'] is True # remove_users: true
assert 'Always Allow' in user_mapper['name']
assert user_mapper['triggers']['always'] == {}
# Verify both mappers have correct map_type
assert admin_mapper['map_type'] == 'organization'
assert user_mapper['map_type'] == 'organization'

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@ This module contains functions to convert AWX authentication mappings
"""
import re
from typing import Union, Pattern, Any, cast
from typing import cast, Any, Literal, Pattern, Union
email_regex = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
@@ -29,8 +29,40 @@ def pattern_to_slash_format(pattern: Any) -> str:
return f"/{pattern.pattern}/{flags_str}"
def process_ldap_user_list(
groups: Union[None, str, bool, list[Union[None, str, bool]]],
) -> list[dict[str, Any]]:
if not isinstance(groups, list):
groups = [groups]
# Type cast to help mypy understand the type after conversion
groups_list: list[Union[str, bool, None]] = cast(list[Union[str, bool, None]], groups)
triggers = []
if groups_list == [None]:
# A None value means we shouldn't update whatever this is based on LDAP values
pass
elif groups_list == []:
# Empty list means no triggers should be created
pass
elif groups_list == [True]:
triggers.append({"name": "Always Allow", "trigger": {"always": {}}})
elif groups_list == [False]:
triggers.append(
{
"name": "Never Allow",
"trigger": {"never": {}},
}
)
else:
triggers.append({"name": "Match User Groups", "trigger": {"groups": {"has_or": groups_list}}})
return triggers
def process_sso_user_list(
users: Union[str, bool, Pattern[str], list[Union[str, bool, Pattern[str]]]], email_attr: str = 'email', username_attr: str = 'username'
users: Union[str, bool, Pattern[str], list[Union[str, bool, Pattern[str]]]],
email_attr: str = 'email',
username_attr: str = 'username',
) -> list[dict[str, Any]]:
if not isinstance(users, list):
users = [users]
@@ -87,7 +119,7 @@ def process_sso_user_list(
return triggers
def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username'):
def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username', auth_type: Literal['sso', 'ldap'] = 'sso'):
"""Convert AWX team mapping to Gateway authenticator format.
Args:
@@ -117,7 +149,12 @@ def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email
# Check for remove flag
revoke = team.get('remove', False)
for trigger in process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr):
if auth_type == 'sso':
triggers = process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr)
else:
triggers = process_ldap_user_list(team['users'])
for trigger in triggers:
result.append(
{
"name": f"{organization_name} - {team_name} {trigger['name']}",
@@ -136,7 +173,7 @@ def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email
return result, order
def org_map_to_gateway_format(org_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username'):
def org_map_to_gateway_format(org_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username', auth_type: Literal['sso', 'ldap'] = 'sso'):
"""Convert AWX organization mapping to Gateway authenticator format.
Args:
@@ -172,7 +209,12 @@ def org_map_to_gateway_format(org_map, start_order=1, email_attr: str = 'email',
if organization.get(f"remove_{user_type}"):
revoke = True
for trigger in process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr):
if auth_type == 'sso':
triggers = process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr)
else:
triggers = process_ldap_user_list(organization[user_type])
for trigger in triggers:
result.append(
{
"name": f"{organization_name} - {permission_type} {trigger['name']}",

View File

@@ -34,7 +34,8 @@ class LDAPMigrator(BaseAuthenticatorMigrator):
for instance in ldap_instances:
# Build the prefix for this LDAP instance
prefix = f"AUTH_LDAP_{instance}_" if instance is not None else "AUTH_LDAP_"
category = f"ldap{instance}" if instance is not None else "ldap"
# The authenticator category is always "ldap"
category = "ldap"
try:
# Get all LDAP settings for this instance
@@ -59,8 +60,8 @@ class LDAPMigrator(BaseAuthenticatorMigrator):
allow_mappers, next_order = self._ldap_group_allow_to_gateway_format(allow_mappers, deny_group_value, deny=True, start_order=1)
allow_mappers, next_order = self._ldap_group_allow_to_gateway_format(allow_mappers, require_group_value, deny=False, start_order=next_order)
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=next_order)
team_mappers, next_order = team_map_to_gateway_format(team_map_value, start_order=next_order)
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=next_order, auth_type='ldap')
team_mappers, next_order = team_map_to_gateway_format(team_map_value, start_order=next_order, auth_type='ldap')
role_mappers, _ = role_map_to_gateway_format(role_map_value, start_order=next_order)
found_configs.append(