feat: comprehensive refactor of SSO org/team mapping for Gateway authentication export (#7047)

This commit completely refactors how SSO organization and team mappings are processed
and exported for Gateway authentication, moving from a group-based approach to a more
flexible attribute-based system.

Key Changes:
- Introduced new process_sso_user_list() function for centralized user processing
- Enhanced boolean handling to support both native booleans and string representations
- Added email detection and regex pattern support for flexible user matching
- Refactored trigger generation from groups-based to attributes-based system

Gateway Mapping Enhancements (awx/main/utils/gateway_mapping.py):
- Added email regex detection for automatic email vs username classification
- Added pattern_to_slash_format() for regex pattern conversion
- Enhanced process_sso_user_list() with support for:
  - Boolean values: True/False and ["true"]/["false"]
  - String usernames and email addresses with automatic detection
  - Regex patterns with both username and email matching
  - Custom email_attr and username_attr parameters
- Refactored team_map_to_gateway_format() to use new processing system
- Refactored org_map_to_gateway_format() to use new processing system
- Changed trigger structure from {"groups": {"has_or": [...]}} to attribute-based triggers
- Improved naming convention to include trigger type in mapping names

Comprehensive Test Coverage (awx/main/tests/unit/utils/test_auth_migration.py):
- Added complete TestProcessSSOUserList class with 8 comprehensive test methods
- Enhanced TestOrgMapToGatewayFormat with string boolean and new functionality tests
- Enhanced TestTeamMapToGatewayFormat with string boolean and new functionality tests
- Added tests for email detection, regex patterns, and custom attributes
- Verified backward compatibility and integration functionality
- All existing tests updated to work with new attribute-based trigger system

Breaking Changes:
- Trigger structure changed from group-based to attribute-based
- Mapping names now include trigger descriptions for better clarity
- Function signatures updated to include email_attr and username_attr parameters

Co-Authored with Claude-4 via Cursor

Co-authored-by: Peter Braun <pbraun@redhat.com>
This commit is contained in:
John Westcott IV
2025-08-12 07:44:51 -04:00
committed by thedoubl3j
parent 4f2d28db51
commit 505ec560c8
2 changed files with 836 additions and 585 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -5,13 +5,93 @@ This module contains functions to convert AWX authentication mappings
(organization and team mappings) to AAP Gateway format. (organization and team mappings) to AAP Gateway format.
""" """
import re
from typing import Union
def team_map_to_gateway_format(team_map, start_order=1): email_regex = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
def pattern_to_slash_format(pattern):
"""Convert a re.Pattern object to /pattern/flags format."""
if not isinstance(pattern, re.Pattern):
return str(pattern)
flags_str = ""
if pattern.flags & re.IGNORECASE:
flags_str += "i"
if pattern.flags & re.MULTILINE:
flags_str += "m"
if pattern.flags & re.DOTALL:
flags_str += "s"
if pattern.flags & re.VERBOSE:
flags_str += "x"
return f"/{pattern.pattern}/{flags_str}"
def process_sso_user_list(
users: Union[str, list[str], bool], email_attr: str = 'email', username_attr: str = 'username'
) -> list[dict[str : Union[str, dict[str : dict[str:str]]]]]:
if type(users) is str:
users = [users]
triggers = []
if users in [False, ["false"]]:
triggers.append(
{
"name": "Never Allow",
"trigger": {"never": {}},
}
)
elif users in [True, ["true"]]:
triggers.append({"name": "Always Allow", "trigger": {"always": {}}})
else:
for user_or_email in users:
if isinstance(user_or_email, re.Pattern):
user_or_email = pattern_to_slash_format(user_or_email)
# If we got a regex it could be either a username or an email object
triggers.append(
{"name": f"Match Username {user_or_email}", "trigger": {"attributes": {"join_condition": "or", username_attr: {"matches": user_or_email}}}}
)
triggers.append(
{"name": f"Match Email {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"matches": user_or_email}}}}
)
elif isinstance(user_or_email, str):
# If we got a string its a direct match for either a username or an email
if email_regex.match(user_or_email):
triggers.append(
{"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": user_or_email}}}}
)
else:
triggers.append(
{
"name": f"Username equals {user_or_email}",
"trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": user_or_email}}},
}
)
else:
# Convert other objects to string representation and assume it could be either an email or a username
# The other option we could take here would be to just error out
triggers.append(
{
"name": f"Username Equals {user_or_email}",
"trigger": {"attributes": {"join_condition": "or", username_attr: {"equals": str(user_or_email)}}},
}
)
triggers.append(
{"name": f"Email Equals {user_or_email}", "trigger": {"attributes": {"join_condition": "or", email_attr: {"equals": str(user_or_email)}}}}
)
return triggers
def team_map_to_gateway_format(team_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username'):
"""Convert AWX team mapping to Gateway authenticator format. """Convert AWX team mapping to Gateway authenticator format.
Args: Args:
team_map: The SOCIAL_AUTH_*_TEAM_MAP setting value team_map: The SOCIAL_AUTH_*_TEAM_MAP setting value
start_order: Starting order value for the mappers start_order: Starting order value for the mappers
email_attr: The attribute representing the email
username_attr: The attribute representing the username
Returns: Returns:
tuple: (List of Gateway-compatible team mappers, next_order) tuple: (List of Gateway-compatible team mappers, next_order)
@@ -28,65 +108,39 @@ def team_map_to_gateway_format(team_map, start_order=1):
if team['users'] is None: if team['users'] is None:
continue continue
if team['users'] is False: # Get the organization name
triggers = {"never": {}}
elif team['users'] is True:
triggers = {"always": {}}
else:
import re
# Handle the case where the value itself is a regex pattern
if isinstance(team['users'], re.Pattern):
# Convert single regex pattern to string in a list
triggers = {"groups": {"has_or": [str(team['users'])]}}
else:
# Handle list or string values
if type(team['users']) is str:
team['users'] = [team['users']]
# Convert any non-string items to strings (e.g., regex patterns)
users_list = []
for user in team['users']:
if isinstance(user, str):
users_list.append(user)
elif isinstance(user, re.Pattern):
# Convert regex patterns to string representation
users_list.append(str(user.pattern))
else:
# Convert other objects to string representation
users_list.append(str(user))
triggers = {"groups": {"has_or": users_list}}
organization_name = team.get('organization', 'Unknown') organization_name = team.get('organization', 'Unknown')
# Check for remove flag # Check for remove flag
revoke = team.get('remove', False) revoke = team.get('remove', False)
result.append( for trigger in process_sso_user_list(team['users'], email_attr=email_attr, username_attr=username_attr):
{ result.append(
"name": f"{organization_name} - {team_name}", {
"map_type": "team", "name": f"{organization_name} - {team_name} {trigger['name']}",
"order": order, "map_type": "team",
"authenticator": -1, # Will be updated when creating the mapper "order": order,
"triggers": triggers, "authenticator": -1, # Will be updated when creating the mapper
"organization": organization_name, "triggers": trigger['trigger'],
"team": team_name, "organization": organization_name,
"role": "Team Member", # Gateway team member role "team": team_name,
"revoke": revoke, "role": "Team Member", # Gateway team member role
} "revoke": revoke,
) }
)
order += 1 order += 1
return result, order return result, order
def org_map_to_gateway_format(org_map, start_order=1): def org_map_to_gateway_format(org_map, start_order=1, email_attr: str = 'email', username_attr: str = 'username'):
"""Convert AWX organization mapping to Gateway authenticator format. """Convert AWX organization mapping to Gateway authenticator format.
Args: Args:
org_map: The SOCIAL_AUTH_*_ORGANIZATION_MAP setting value org_map: The SOCIAL_AUTH_*_ORGANIZATION_MAP setting value
start_order: Starting order value for the mappers start_order: Starting order value for the mappers
email_attr: The attribute representing the email
username_attr: The attribute representing the username
Returns: Returns:
tuple: (List of Gateway-compatible organization mappers, next_order) tuple: (List of Gateway-compatible organization mappers, next_order)
@@ -100,66 +154,35 @@ def org_map_to_gateway_format(org_map, start_order=1):
for organization_name in org_map.keys(): for organization_name in org_map.keys():
organization = org_map[organization_name] organization = org_map[organization_name]
for user_type in ['admins', 'users']: for user_type in ['admins', 'users']:
if user_type in organization: if organization.get(user_type, None) is None:
# TODO: Confirm that if we have None with remove we still won't remove # TODO: Confirm that if we have None with remove we still won't remove
if organization[user_type] is None: continue
continue
if organization[user_type] is False: # Get the permission type
triggers = {"never": {}} permission_type = user_type.title()
elif organization[user_type] is True:
triggers = {"always": {}}
else:
import re
# Handle the case where the value itself is a regex pattern # Map AWX admin/users to appropriate Gateway organization roles
if isinstance(organization[user_type], re.Pattern): role = "Organization Admin" if user_type == "admins" else "Organization Member"
# Convert single regex pattern to string in a list
triggers = {"groups": {"has_or": [str(organization[user_type])]}}
else:
# Handle list or string values
if type(organization[user_type]) is str:
organization[user_type] = [organization[user_type]]
# Convert any non-string items to strings (e.g., regex patterns) # Check for remove flags
users_list = [] revoke = False
for user in organization[user_type]: if organization.get(f"remove_{user_type}"):
if isinstance(user, str): revoke = True
users_list.append(user)
elif isinstance(user, re.Pattern):
# Convert regex patterns to string representation
users_list.append(str(user.pattern))
else:
# Convert other objects to string representation
users_list.append(str(user))
triggers = {"groups": {"has_or": users_list}}
team_name = f"Organization {user_type.title()}"
# Map AWX admin/users to appropriate Gateway organization roles
role = "Organization Admin" if user_type == "admins" else "Organization Member"
# Check for remove flags
revoke = False
if user_type == "admins" and organization.get("remove_admins"):
revoke = True
elif user_type == "users" and organization.get("remove_users"):
revoke = True
for trigger in process_sso_user_list(organization[user_type], email_attr=email_attr, username_attr=username_attr):
result.append( result.append(
{ {
"name": f"{organization_name} - {team_name}", "name": f"{organization_name} - {permission_type} {trigger['name']}",
"map_type": "organization", "map_type": "organization",
"order": order, "order": order,
"authenticator": -1, # Will be updated when creating the mapper "authenticator": -1, # Will be updated when creating the mapper
"triggers": triggers, "triggers": trigger['trigger'],
"organization": organization_name, "organization": organization_name,
"team": None, # Organization-level mapping, not team-specific "team": None, # Organization-level mapping, not team-specific
"role": role, "role": role,
"revoke": revoke, "revoke": revoke,
} }
) )
order += 1 order += 1
return result, order return result, order