Split up migrators (#6986)

* split up migration into classes for each authenticator

* remove unused import

* remove unused code

* remove unused class
This commit is contained in:
Peter Braun 2025-07-01 16:26:00 +02:00 committed by thedoubl3j
parent 46710c4d86
commit e4758e8b4b
No known key found for this signature in database
GPG Key ID: E84C42ACF75B0768
10 changed files with 725 additions and 903 deletions

View File

@ -2,8 +2,8 @@ import sys
import os
from django.core.management.base import BaseCommand
from awx.sso.utils.auth_migration import AuthConfigMigrator
from awx.main.utils.auth_exporter import AuthConfigExporter
from awx.sso.utils.github_migrator import GitHubMigrator
from awx.sso.utils.oidc_migrator import OIDCMigrator
from awx.main.utils.gateway_client import GatewayClient, GatewayAPIError
@ -39,59 +39,59 @@ class Command(BaseCommand):
self.stdout.write(self.style.SUCCESS(f'Gateway Password: {"*" * len(gateway_password)}'))
self.stdout.write(self.style.SUCCESS(f'Skip SSL Verification: {gateway_skip_verify}'))
# Initialize the auth config migrator
migrator = AuthConfigMigrator()
# Gather all authentication configurations
# Create Gateway client and run migrations
try:
# Retrieve GitHub OIDC configuration
github_oidc_configs = migrator.get_github_oidc_config() if not skip_oidc else []
# Retrieve LDAP configuration
# ldap_configs = migrator.get_ldap_config() if not skip_ldap else []
# Create Gateway client and export configurations
self.stdout.write(self.style.SUCCESS('\n=== Connecting to Gateway ==='))
try:
with GatewayClient(
base_url=gateway_base_url, username=gateway_user, password=gateway_password, skip_verify=gateway_skip_verify
) as gateway_client:
with GatewayClient(base_url=gateway_base_url, username=gateway_user, password=gateway_password, skip_verify=gateway_skip_verify) as gateway_client:
self.stdout.write(self.style.SUCCESS('Successfully connected to Gateway'))
self.stdout.write(self.style.SUCCESS('Successfully connected to Gateway'))
# Initialize the auth config exporter
exporter = AuthConfigExporter(gateway_client, self)
# Initialize migrators
migrators = []
if not skip_oidc:
migrators.append(GitHubMigrator(gateway_client, self))
migrators.append(OIDCMigrator(gateway_client, self))
# if not skip_ldap:
# migrators.append(LDAPMigrator(gateway_client, self))
# Export GitHub configurations
if github_oidc_configs:
self.stdout.write(self.style.SUCCESS('\n=== Exporting GitHub Configurations ==='))
github_result = exporter.export_configs(github_oidc_configs, 'github')
self._print_export_summary('GitHub', github_result)
# Run migrations
total_results = {
'created': 0,
'failed': 0,
'mappers_created': 0,
'mappers_failed': 0,
}
# Export LDAP configurations
# if ldap_configs:
# self.stdout.write(self.style.SUCCESS('\n=== Exporting LDAP Configurations ==='))
# ldap_result = exporter.export_configs(ldap_configs, 'ldap')
# self._print_export_summary('LDAP', ldap_result)
if not migrators:
self.stdout.write(self.style.WARNING('No authentication configurations found to migrate.'))
else:
for migrator in migrators:
self.stdout.write(self.style.SUCCESS(f'\n=== Migrating {migrator.get_authenticator_type()} Configurations ==='))
result = migrator.migrate()
self._print_export_summary(migrator.get_authenticator_type(), result)
# Accumulate results
for key in total_results:
total_results[key] += result.get(key, 0)
# Overall summary
if not github_oidc_configs: # and not ldap_configs:
self.stdout.write(self.style.WARNING('No authentication configurations found to migrate.'))
except GatewayAPIError as e:
self.stdout.write(self.style.ERROR(f'Gateway API Error: {e.message}'))
if e.status_code:
self.stdout.write(self.style.ERROR(f'Status Code: {e.status_code}'))
if e.response_data:
self.stdout.write(self.style.ERROR(f'Response: {e.response_data}'))
return
except Exception as e:
self.stdout.write(self.style.ERROR(f'Unexpected error connecting to Gateway: {str(e)}'))
return
self.stdout.write(self.style.SUCCESS('\n=== Migration Summary ==='))
self.stdout.write(f'Total authenticators created: {total_results["created"]}')
self.stdout.write(f'Total authenticators failed: {total_results["failed"]}')
self.stdout.write(f'Total mappers created: {total_results["mappers_created"]}')
self.stdout.write(f'Total mappers failed: {total_results["mappers_failed"]}')
except GatewayAPIError as e:
self.stdout.write(self.style.ERROR(f'Gateway API Error: {e.message}'))
if e.status_code:
self.stdout.write(self.style.ERROR(f'Status Code: {e.status_code}'))
if e.response_data:
self.stdout.write(self.style.ERROR(f'Response: {e.response_data}'))
return
except Exception as e:
self.stdout.write(self.style.ERROR(f'Error retrieving authentication configs: {str(e)}'))
self.stdout.write(self.style.ERROR(f'Unexpected error during migration: {str(e)}'))
return
def _print_export_summary(self, config_type, result):
"""Print a summary of the export results."""

View File

@ -1,590 +0,0 @@
"""
Authentication configuration exporter for AAP Gateway.
This module handles the conversion and export of AWX authentication
configurations to AAP Gateway via REST API calls.
"""
import re
import hashlib
from awx.main.utils.gateway_client import GatewayClient, GatewayAPIError
class AuthConfigExporter:
"""
Handles the export of authentication configurations from AWX to Gateway.
Converts AWX configurations to Gateway format and creates authenticators and mappers.
"""
def __init__(self, gateway_client: GatewayClient, command=None):
"""
Initialize the auth config exporter.
Args:
gateway_client: GatewayClient instance for API calls
command: Optional Django management command instance (for styled output)
"""
self.gateway_client = gateway_client
self.command = command
def export_configs(self, auth_configs, config_type='github'):
"""
Export a list of authentication configurations to Gateway.
Args:
auth_configs: List of authentication configurations from AuthConfigMigrator
config_type: Type of configuration ('github', 'ldap', etc.)
Returns:
dict: Summary of export results
"""
if not auth_configs:
self._write_output(f'No {config_type} authenticators found to migrate.', 'warning')
return {'created': 0, 'failed': 0, 'mappers_created': 0, 'mappers_failed': 0}
self._write_output(f'Found {len(auth_configs)} {config_type} authentication configuration(s).', 'success')
# Process each authenticator configuration
created_authenticators = []
for config in auth_configs:
if self._create_gateway_authenticator(config, config_type):
created_authenticators.append(config)
# Create mappers for successfully created authenticators
mappers_created = 0
mappers_failed = 0
if created_authenticators:
self._write_output('\n=== Creating Authenticator Mappers ===', 'success')
for config in created_authenticators:
mapper_result = self._create_gateway_mappers(config)
mappers_created += mapper_result['created']
mappers_failed += mapper_result['failed']
return {
'created': len(created_authenticators),
'failed': len(auth_configs) - len(created_authenticators),
'mappers_created': mappers_created,
'mappers_failed': mappers_failed,
}
def _create_gateway_authenticator(self, config, config_type):
"""Create a single authenticator in Gateway from AWX config.
Args:
config: AWX authenticator configuration dict
config_type: Type of configuration ('github', 'ldap', etc.)
Returns:
bool: True if authenticator was created successfully, False otherwise
"""
category = config['category']
settings = config['settings']
# Handle different config types
if config_type == 'github':
return self._create_github_authenticator(config, category, settings)
elif config_type == 'ldap':
return self._create_ldap_authenticator(config, category, settings)
elif config_type == 'saml':
return self._create_saml_authenticator(config, category, settings)
elif config_type == 'google_oauth2':
return self._create_google_oauth2_authenticator(config, category, settings)
elif config_type == 'azure_ad':
return self._create_azure_ad_authenticator(config, category, settings)
elif config_type == 'radius':
return self._create_radius_authenticator(config, category, settings)
elif config_type == 'tacacs_plus':
return self._create_tacacs_plus_authenticator(config, category, settings)
else:
self._write_output(f'Unknown config type {config_type}, skipping', 'warning')
return False
def _create_github_authenticator(self, config, category, settings):
"""Create a GitHub authenticator in Gateway."""
# Extract the OAuth2 credentials
key_value = None
secret_value = None
for setting_name, value in settings.items():
if setting_name.endswith('_KEY') and value:
key_value = value
elif setting_name.endswith('_SECRET') and value:
secret_value = value
if not key_value or not secret_value:
self._write_output(f'Skipping {category}: missing OAuth2 credentials', 'warning')
return False
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('-', '_').title()}"
authenticator_slug = self._generate_authenticator_slug('github', category, key_value)
# Map AWX category to Gateway authenticator type
type_mapping = {
'github': 'ansible_base.authentication.authenticator_plugins.github',
'github-org': 'ansible_base.authentication.authenticator_plugins.github_org',
'github-team': 'ansible_base.authentication.authenticator_plugins.github_team',
'github-enterprise': 'ansible_base.authentication.authenticator_plugins.github_enterprise',
'github-enterprise-org': 'ansible_base.authentication.authenticator_plugins.github_enterprise_org',
'github-enterprise-team': 'ansible_base.authentication.authenticator_plugins.github_enterprise_team',
'oidc': 'ansible_base.authentication.authenticator_plugins.oidc',
}
authenticator_type = type_mapping.get(category)
if not authenticator_type:
self._write_output(f'Unknown category {category}, skipping', 'warning')
return False
self._write_output(f'\n--- Processing {category} authenticator ---')
self._write_output(f'Name: {authenticator_name}')
self._write_output(f'Slug: {authenticator_slug}')
self._write_output(f'Type: {authenticator_type}')
self._write_output(f'Client ID: {key_value}')
self._write_output(f'Client Secret: {"*" * 8}')
try:
# Check if authenticator already exists by slug
existing_authenticators = self.gateway_client.get_authenticators()
existing_authenticator = None
for auth in existing_authenticators:
if auth.get('slug') == authenticator_slug:
existing_authenticator = auth
break
if existing_authenticator:
# Authenticator already exists, use it
authenticator_id = existing_authenticator.get('id')
self._write_output(f'⚠ Authenticator already exists with ID: {authenticator_id}', 'warning')
# Store the existing result for mapper creation
config['gateway_authenticator_id'] = authenticator_id
config['gateway_authenticator'] = existing_authenticator
return True
else:
# Authenticator doesn't exist, create it
self._write_output('Creating new authenticator...')
# Build Gateway authenticator configuration
gateway_config = {
"name": authenticator_name,
"slug": authenticator_slug,
"type": authenticator_type,
"enabled": True,
"create_objects": True, # Allow Gateway to create users/orgs/teams
"remove_users": False, # Don't remove users by default
"configuration": {"KEY": key_value, "SECRET": secret_value},
}
# Add any additional configuration based on AWX settings
additional_config = self._build_additional_config(category, settings)
gateway_config["configuration"].update(additional_config)
# Create the authenticator
result = self.gateway_client.create_authenticator(gateway_config)
self._write_output(f'✓ Successfully created authenticator with ID: {result.get("id")}', 'success')
# Store the result for potential mapper creation later
config['gateway_authenticator_id'] = result.get('id')
config['gateway_authenticator'] = result
return True
except GatewayAPIError as e:
self._write_output(f'✗ Failed to create {category} authenticator: {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
self._write_output(f'✗ Unexpected error creating {category} authenticator: {str(e)}', 'error')
return False
def _create_ldap_authenticator(self, config, category, settings):
"""Create an LDAP authenticator in Gateway."""
# Extract the LDAP server URI as the identifier
server_uri = None
for setting_name, value in settings.items():
if setting_name.endswith('_SERVER_URI') and value:
server_uri = value
break
if not server_uri:
self._write_output(f'Skipping {category}: missing LDAP server URI', 'warning')
return False
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('_', '-').title()}"
authenticator_slug = self._generate_authenticator_slug('ldap', category, server_uri)
# Get LDAP authenticator type
authenticator_type = 'ansible_base.authentication.authenticator_plugins.ldap'
self._write_output(f'\n--- Processing {category} authenticator ---')
self._write_output(f'Name: {authenticator_name}')
self._write_output(f'Slug: {authenticator_slug}')
self._write_output(f'Type: {authenticator_type}')
self._write_output(f'Server URI: {server_uri}')
try:
# Check if authenticator already exists by slug
existing_authenticators = self.gateway_client.get_authenticators()
existing_authenticator = None
for auth in existing_authenticators:
if auth.get('slug') == authenticator_slug:
existing_authenticator = auth
break
if existing_authenticator:
# Authenticator already exists, use it
authenticator_id = existing_authenticator.get('id')
self._write_output(f'⚠ Authenticator already exists with ID: {authenticator_id}', 'warning')
# Store the existing result for mapper creation
config['gateway_authenticator_id'] = authenticator_id
config['gateway_authenticator'] = existing_authenticator
return True
else:
# Authenticator doesn't exist, create it
self._write_output('Creating new LDAP authenticator...')
# Build Gateway LDAP authenticator configuration
gateway_config = {
"name": authenticator_name,
"slug": authenticator_slug,
"type": authenticator_type,
"enabled": True,
"create_objects": True, # Allow Gateway to create users/orgs/teams
"remove_users": False, # Don't remove users by default
"configuration": self._build_ldap_configuration(settings),
}
# Create the authenticator
result = self.gateway_client.create_authenticator(gateway_config)
self._write_output(f'✓ Successfully created LDAP authenticator with ID: {result.get("id")}', 'success')
# Store the result for potential mapper creation later
config['gateway_authenticator_id'] = result.get('id')
config['gateway_authenticator'] = result
return True
except GatewayAPIError as e:
self._write_output(f'✗ Failed to create {category} authenticator: {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
self._write_output(f'✗ Unexpected error creating {category} authenticator: {str(e)}', 'error')
return False
def _create_saml_authenticator(self, config, category, settings):
"""Create a SAML authenticator in Gateway."""
# TODO: Implement SAML authenticator creation
# When implementing, use this pattern for slug generation:
# entity_id = settings.get('SOCIAL_AUTH_SAML_SP_ENTITY_ID', 'saml')
# authenticator_slug = self._generate_authenticator_slug('saml', category, entity_id)
# SAML requires complex configuration including:
# - SP entity ID, certificates, metadata
# - IdP configuration and metadata
# - Attribute mapping
self._write_output(f'SAML authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_google_oauth2_authenticator(self, config, category, settings):
"""Create a Google OAuth2 authenticator in Gateway."""
# TODO: Implement Google OAuth2 authenticator creation
# When implementing, use this pattern for slug generation:
# client_id = settings.get('SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', 'google')
# authenticator_slug = self._generate_authenticator_slug('google_oauth2', category, client_id)
# Similar to GitHub OAuth2 but with Google-specific endpoints
# - Extract GOOGLE_OAUTH2_KEY and GOOGLE_OAUTH2_SECRET
# - Handle whitelisted domains/emails
# - Configure Google OAuth2 scope
self._write_output(f'Google OAuth2 authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_azure_ad_authenticator(self, config, category, settings):
"""Create an Azure AD authenticator in Gateway."""
# TODO: Implement Azure AD authenticator creation
# When implementing, use this pattern for slug generation:
# client_id = settings.get('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', 'azure')
# authenticator_slug = self._generate_authenticator_slug('azure_ad', category, client_id)
# Azure AD requires:
# - Application ID and secret
# - Tenant ID (for tenant-specific auth)
# - Proper OAuth2 endpoints for Azure
self._write_output(f'Azure AD authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_radius_authenticator(self, config, category, settings):
"""Create a RADIUS authenticator in Gateway."""
# TODO: Implement RADIUS authenticator creation
# When implementing, use this pattern for slug generation:
# server_host = settings.get('RADIUS_SERVER', 'radius')
# authenticator_slug = self._generate_authenticator_slug('radius', category, server_host)
# RADIUS is a different authentication protocol than OAuth2/SAML
# - Server host and port
# - Shared secret
# - NAS identifier
# - Timeout and retry settings
self._write_output(f'RADIUS authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_tacacs_plus_authenticator(self, config, category, settings):
"""Create a TACACS+ authenticator in Gateway."""
# TODO: Implement TACACS+ authenticator creation
# When implementing, use this pattern for slug generation:
# server_host = settings.get('TACACSPLUS_HOST', 'tacacs')
# authenticator_slug = self._generate_authenticator_slug('tacacs_plus', category, server_host)
# TACACS+ is a Cisco-developed authentication protocol
# - Server host and port
# - Shared secret
# - Authentication protocol settings
# - Session timeout
self._write_output(f'TACACS+ authenticator creation not yet implemented for {category}', 'warning')
return False
def _build_additional_config(self, category, settings):
"""Build additional configuration for specific authenticator types.
Args:
category: AWX category (github, github-org, etc.)
settings: AWX settings dictionary
Returns:
dict: Additional configuration parameters
"""
additional_config = {}
# Add scope configuration if present
for setting_name, value in settings.items():
if setting_name.endswith('_SCOPE') and value:
additional_config['SCOPE'] = value
break
# Add GitHub Enterprise URL if present
if 'enterprise' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_URL') and value:
additional_config['URL'] = value
elif setting_name.endswith('_API_URL') and value:
additional_config['API_URL'] = value
# Add organization name for org-specific authenticators
if 'org' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_NAME') and value:
additional_config['ORG_NAME'] = value
break
# Add team ID for team-specific authenticators
if 'team' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_ID') and value:
additional_config['TEAM_ID'] = value
break
# Add OIDC endpoint for generic OIDC
if category == 'oidc':
for setting_name, value in settings.items():
if setting_name.endswith('_OIDC_ENDPOINT') and value:
additional_config['OIDC_ENDPOINT'] = value
elif setting_name.endswith('_VERIFY_SSL') and value is not None:
additional_config['VERIFY_SSL'] = value
return additional_config
def _build_ldap_configuration(self, settings):
"""Build LDAP configuration for Gateway from AWX settings.
Args:
settings: AWX LDAP settings dictionary
Returns:
dict: Gateway-compatible LDAP configuration
"""
config = {}
# Required settings
for setting_name, value in settings.items():
if setting_name.endswith('_SERVER_URI') and value:
# Gateway expects SERVER_URI as a list
config['SERVER_URI'] = [value] if isinstance(value, str) else value
elif setting_name.endswith('_BIND_DN') and value:
config['BIND_DN'] = value
elif setting_name.endswith('_BIND_PASSWORD') and value:
config['BIND_PASSWORD'] = value
elif setting_name.endswith('_START_TLS') and value is not None:
config['START_TLS'] = bool(value)
# User search configuration
for setting_name, value in settings.items():
if setting_name.endswith('_USER_SEARCH') and value:
# AWX stores USER_SEARCH as a tuple/list like (base_dn, scope, filter)
if isinstance(value, (list, tuple)) and len(value) >= 3:
config['USER_SEARCH'] = [value[0], value[1], value[2]]
# User attribute mapping
for setting_name, value in settings.items():
if setting_name.endswith('_USER_ATTR_MAP') and value:
config['USER_ATTR_MAP'] = value
# Group search configuration
for setting_name, value in settings.items():
if setting_name.endswith('_GROUP_SEARCH') and value:
# AWX stores GROUP_SEARCH as a tuple/list like (base_dn, scope, filter)
if isinstance(value, (list, tuple)) and len(value) >= 3:
config['GROUP_SEARCH'] = [value[0], value[1], value[2]]
# Group type configuration
for setting_name, value in settings.items():
if setting_name.endswith('_GROUP_TYPE') and value:
# Convert AWX group type class to string if needed
if hasattr(value, '__name__'):
config['GROUP_TYPE'] = value.__name__
else:
config['GROUP_TYPE'] = str(value)
elif setting_name.endswith('_GROUP_TYPE_PARAMS') and value:
config['GROUP_TYPE_PARAMS'] = value
# Connection options
for setting_name, value in settings.items():
if setting_name.endswith('_CONNECTION_OPTIONS') and value:
config['CONNECTION_OPTIONS'] = value
# Other LDAP settings
for setting_name, value in settings.items():
if setting_name.endswith('_USER_DN_TEMPLATE') and value:
config['USER_DN_TEMPLATE'] = value
elif setting_name.endswith('_REQUIRE_GROUP') and value:
config['REQUIRE_GROUP'] = value
elif setting_name.endswith('_DENY_GROUP') and value:
config['DENY_GROUP'] = value
elif setting_name.endswith('_USER_FLAGS_BY_GROUP') and value:
config['USER_FLAGS_BY_GROUP'] = value
return config
def _generate_authenticator_slug(self, auth_type, category, identifier):
"""Generate a deterministic slug for an authenticator.
Args:
auth_type: Type of authenticator ('github', 'ldap', etc.)
category: AWX category (github, github-org, ldap, etc.)
identifier: Unique identifier (client ID, server URI, etc.)
Returns:
str: Deterministic slug for the authenticator
"""
# Create a base string from the components
base_string = f"awx-{auth_type}-{category}-{identifier}"
# Clean the string: lowercase, replace non-alphanumeric with hyphens
cleaned = re.sub(r'[^a-zA-Z0-9]+', '-', base_string.lower())
# Remove leading/trailing hyphens and ensure no double hyphens
cleaned = re.sub(r'^-+|-+$', '', cleaned)
cleaned = re.sub(r'-+', '-', cleaned)
# Generate a hash of the cleaned string for consistent length
slug_hash = hashlib.md5(cleaned.encode('utf-8')).hexdigest()[:8]
# Combine type and hash for the final slug
final_slug = f"awx-{auth_type}-{slug_hash}"
return final_slug
def _create_gateway_mappers(self, config):
"""Create authenticator mappers in Gateway from AWX config.
Args:
config: AWX authenticator configuration dict with gateway_authenticator_id
Returns:
dict: Summary with 'created' and 'failed' counts
"""
authenticator_id = config.get('gateway_authenticator_id')
if not authenticator_id:
self._write_output(f'No authenticator ID found for {config["category"]}, skipping mappers', 'error')
return {'created': 0, 'failed': 0}
category = config['category']
org_mappers = config.get('org_mappers', [])
team_mappers = config.get('team_mappers', [])
total_mappers = len(org_mappers) + len(team_mappers)
if total_mappers == 0:
self._write_output(f'No mappers to create for {category} authenticator')
return {'created': 0, 'failed': 0}
self._write_output(f'\n--- Creating mappers for {category} authenticator (ID: {authenticator_id}) ---')
self._write_output(f'Organization mappers: {len(org_mappers)}')
self._write_output(f'Team mappers: {len(team_mappers)}')
created_count = 0
failed_count = 0
# Create organization mappers
for mapper in org_mappers:
if self._create_single_mapper(authenticator_id, mapper, 'organization'):
created_count += 1
else:
failed_count += 1
# Create team mappers
for mapper in team_mappers:
if self._create_single_mapper(authenticator_id, mapper, 'team'):
created_count += 1
else:
failed_count += 1
# Summary
self._write_output(f'Mappers created: {created_count}, failed: {failed_count}')
return {'created': created_count, 'failed': failed_count}
def _create_single_mapper(self, authenticator_id, mapper_config, mapper_type):
"""Create a single mapper in Gateway.
Args:
authenticator_id: ID of the authenticator to create mapper for
mapper_config: Mapper configuration dictionary
mapper_type: Type of mapper ('organization' or 'team')
Returns:
bool: True if mapper was created successfully, False otherwise
"""
try:
# Update the mapper config with the correct authenticator ID
mapper_config = mapper_config.copy() # Don't modify the original
mapper_config['authenticator'] = authenticator_id
# Create the mapper
self.gateway_client.create_authenticator_map(authenticator_id, mapper_config)
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✓ Created {mapper_type} mapper: {mapper_name}', 'success')
return True
except GatewayAPIError as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✗ Failed to create {mapper_type} mapper "{mapper_name}": {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✗ Unexpected error creating {mapper_type} mapper "{mapper_name}": {str(e)}', 'error')
return False
def _write_output(self, message, style=None):
"""Write output message if command is available."""
if self.command:
if style == 'success':
self.command.stdout.write(self.command.style.SUCCESS(message))
elif style == 'warning':
self.command.stdout.write(self.command.style.WARNING(message))
elif style == 'error':
self.command.stdout.write(self.command.style.ERROR(message))
else:
self.command.stdout.write(message)

View File

@ -1,269 +0,0 @@
from django.conf import settings
from awx.conf import settings_registry
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format
class AuthConfigMigrator:
"""
Handles the migration of authenticators from AWX to Gateway
TODO: this is a work in progress
"""
"""
Export all GitHub OIDC authenticators. An OIDC authenticator is only exported if both,
id and secret, are defined. Otherwise it will be skipped.
Returns:
list: List of configured GitHub authentication providers with their settings
"""
def get_github_oidc_config(self):
github_categories = ['github', 'github-org', 'github-team', 'github-enterprise', 'github-enterprise-org', 'github-enterprise-team', 'oidc']
found_configs = []
for category in github_categories:
try:
category_settings = settings_registry.get_registered_settings(category_slug=category)
if category_settings:
config_data = {}
key_setting = None
secret_setting = None
# Ensure category_settings is iterable and contains strings
import re
if isinstance(category_settings, re.Pattern) or not hasattr(category_settings, '__iter__') or isinstance(category_settings, str):
continue
for setting_name in category_settings:
# Skip if setting_name is not a string (e.g., regex pattern)
if not isinstance(setting_name, str):
continue
if setting_name.endswith('_KEY'):
key_setting = setting_name
elif setting_name.endswith('_SECRET'):
secret_setting = setting_name
# Skip this category if KEY or SECRET is missing or empty
if not key_setting or not secret_setting:
continue
key_value = getattr(settings, key_setting, None)
secret_value = getattr(settings, secret_setting, None)
# Skip this category if OIDC Key and/or Secret are not configured
if not key_value or not secret_value:
continue
# If we have both key and secret, collect all settings
org_map_value = None
team_map_value = None
for setting_name in category_settings:
# Skip if setting_name is not a string (e.g., regex pattern)
if not isinstance(setting_name, str):
continue
value = getattr(settings, setting_name, None)
config_data[setting_name] = value
# Capture org and team map values for special processing
if setting_name.endswith('_ORGANIZATION_MAP'):
org_map_value = value
elif setting_name.endswith('_TEAM_MAP'):
team_map_value = value
# Convert GitHub org and team mappings from AWX to the Gateway format
# Start with order 1 and maintain sequence across both org and team mappers
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=1)
team_mappers, _ = team_map_to_gateway_format(team_map_value, start_order=next_order)
found_configs.append({'category': category, 'settings': config_data, 'org_mappers': org_mappers, 'team_mappers': team_mappers})
except Exception as e:
raise Exception(f'Could not retrieve {category} settings: {str(e)}')
return found_configs
def get_ldap_config(self):
"""
Export all LDAP authenticators. An LDAP authenticator is only exported if
SERVER_URI is configured. Otherwise it will be skipped.
Returns:
list: List of configured LDAP authentication providers with their settings
"""
found_configs = []
# AWX supports up to 6 LDAP configurations: AUTH_LDAP (default) and AUTH_LDAP_1 through AUTH_LDAP_5
ldap_instances = [''] + [f'_{i}' for i in range(1, 6)] # ['', '_1', '_2', '_3', '_4', '_5']
for instance in ldap_instances:
try:
# Build the setting prefix (AUTH_LDAP or AUTH_LDAP_1, etc.)
prefix = f'AUTH_LDAP{instance}'
# Check if this LDAP instance is configured by looking for SERVER_URI
server_uri_setting = f'{prefix}_SERVER_URI'
server_uri = getattr(settings, server_uri_setting, None)
# Skip this instance if SERVER_URI is not configured or empty
if not server_uri or server_uri.strip() == '':
continue
config_data = {}
org_map_value = None
team_map_value = None
# Define all LDAP settings we want to collect
ldap_settings = [
'SERVER_URI',
'BIND_DN',
'BIND_PASSWORD',
'START_TLS',
'CONNECTION_OPTIONS',
'USER_SEARCH',
'USER_DN_TEMPLATE',
'USER_ATTR_MAP',
'GROUP_SEARCH',
'GROUP_TYPE',
'GROUP_TYPE_PARAMS',
'REQUIRE_GROUP',
'DENY_GROUP',
'USER_FLAGS_BY_GROUP',
'ORGANIZATION_MAP',
'TEAM_MAP',
]
# Collect all settings for this LDAP instance
for setting in ldap_settings:
setting_name = f'{prefix}_{setting}'
value = getattr(settings, setting_name, None)
config_data[setting_name] = value
# Capture org and team map values for special processing
if setting == 'ORGANIZATION_MAP':
org_map_value = value
elif setting == 'TEAM_MAP':
team_map_value = value
# Convert LDAP org and team mappings from AWX to the Gateway format
# Start with order 1 and maintain sequence across both org and team mappers
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=1)
team_mappers, _ = team_map_to_gateway_format(team_map_value, start_order=next_order)
# Determine instance name for identification
instance_name = 'ldap' if instance == '' else f'ldap{instance}'
found_configs.append({'category': instance_name, 'settings': config_data, 'org_mappers': org_mappers, 'team_mappers': team_mappers})
except Exception as e:
instance_name = 'ldap' if instance == '' else f'ldap{instance}'
raise Exception(f'Could not retrieve {instance_name} settings: {str(e)}')
return found_configs
def get_saml_config(self):
"""
Export SAML authenticators. A SAML authenticator is only exported if
required configuration is present.
Returns:
list: List of configured SAML authentication providers with their settings
"""
# TODO: Implement SAML configuration retrieval
# SAML settings typically include:
# - SOCIAL_AUTH_SAML_SP_ENTITY_ID
# - SOCIAL_AUTH_SAML_SP_PUBLIC_CERT
# - SOCIAL_AUTH_SAML_SP_PRIVATE_KEY
# - SOCIAL_AUTH_SAML_ORG_INFO
# - SOCIAL_AUTH_SAML_TECHNICAL_CONTACT
# - SOCIAL_AUTH_SAML_SUPPORT_CONTACT
# - SOCIAL_AUTH_SAML_ENABLED_IDPS
# - SOCIAL_AUTH_SAML_ORGANIZATION_MAP
# - SOCIAL_AUTH_SAML_TEAM_MAP
found_configs = []
return found_configs
def get_google_oauth2_config(self):
"""
Export Google OAuth2 authenticators. A Google OAuth2 authenticator is only exported if
KEY and SECRET are configured.
Returns:
list: List of configured Google OAuth2 authentication providers with their settings
"""
# TODO: Implement Google OAuth2 configuration retrieval
# Google OAuth2 settings typically include:
# - SOCIAL_AUTH_GOOGLE_OAUTH2_KEY
# - SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET
# - SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE
# - SOCIAL_AUTH_GOOGLE_OAUTH2_WHITELISTED_DOMAINS
# - SOCIAL_AUTH_GOOGLE_OAUTH2_WHITELISTED_EMAILS
# - SOCIAL_AUTH_GOOGLE_OAUTH2_ORGANIZATION_MAP
# - SOCIAL_AUTH_GOOGLE_OAUTH2_TEAM_MAP
found_configs = []
return found_configs
def get_azure_ad_config(self):
"""
Export Azure AD authenticators. An Azure AD authenticator is only exported if
KEY and SECRET are configured.
Returns:
list: List of configured Azure AD authentication providers with their settings
"""
# TODO: Implement Azure AD configuration retrieval
# Azure AD settings typically include:
# - SOCIAL_AUTH_AZUREAD_OAUTH2_KEY
# - SOCIAL_AUTH_AZUREAD_OAUTH2_SECRET
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_KEY
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_SECRET
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_TENANT_ID
# - SOCIAL_AUTH_AZUREAD_OAUTH2_ORGANIZATION_MAP
# - SOCIAL_AUTH_AZUREAD_OAUTH2_TEAM_MAP
found_configs = []
return found_configs
def get_radius_config(self):
"""
Export RADIUS authenticators. A RADIUS authenticator is only exported if
server configuration is present.
Returns:
list: List of configured RADIUS authentication providers with their settings
"""
# TODO: Implement RADIUS configuration retrieval
# RADIUS settings typically include:
# - RADIUS_SERVER
# - RADIUS_PORT
# - RADIUS_SECRET
# - RADIUS_NASID
# - RADIUS_TIMEOUT
# - RADIUS_RETRIES
# - RADIUS_GROUP_TYPE
# - RADIUS_GROUP_TYPE_PARAMS
# - RADIUS_ORGANIZATION_MAP
# - RADIUS_TEAM_MAP
found_configs = []
return found_configs
def get_tacacs_plus_config(self):
"""
Export TACACS+ authenticators. A TACACS+ authenticator is only exported if
server configuration is present.
Returns:
list: List of configured TACACS+ authentication providers with their settings
"""
# TODO: Implement TACACS+ configuration retrieval
# TACACS+ settings typically include:
# - TACACSPLUS_HOST
# - TACACSPLUS_PORT
# - TACACSPLUS_SECRET
# - TACACSPLUS_SESSION_TIMEOUT
# - TACACSPLUS_AUTH_PROTOCOL
# - TACACSPLUS_ORGANIZATION_MAP
# - TACACSPLUS_TEAM_MAP
found_configs = []
return found_configs

View File

@ -0,0 +1,50 @@
"""
Azure AD authenticator migrator.
This module handles the migration of Azure AD authenticators from AWX to Gateway.
"""
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
class AzureADMigrator(BaseAuthenticatorMigrator):
"""
Handles the migration of Azure AD authenticators from AWX to Gateway.
"""
def get_authenticator_type(self):
"""Get the human-readable authenticator type name."""
return "Azure AD"
def get_controller_config(self):
"""
Export Azure AD authenticators. An Azure AD authenticator is only exported if
KEY and SECRET are configured.
Returns:
list: List of configured Azure AD authentication providers with their settings
"""
# TODO: Implement Azure AD configuration retrieval
# Azure AD settings typically include:
# - SOCIAL_AUTH_AZUREAD_OAUTH2_KEY
# - SOCIAL_AUTH_AZUREAD_OAUTH2_SECRET
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_KEY
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_SECRET
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_TENANT_ID
# - SOCIAL_AUTH_AZUREAD_OAUTH2_ORGANIZATION_MAP
# - SOCIAL_AUTH_AZUREAD_OAUTH2_TEAM_MAP
found_configs = []
return found_configs
def create_gateway_authenticator(self, config):
"""Create an Azure AD authenticator in Gateway."""
# TODO: Implement Azure AD authenticator creation
# When implementing, use this pattern for slug generation:
# client_id = settings.get('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', 'azure')
# authenticator_slug = self._generate_authenticator_slug('azure_ad', category, client_id)
# Azure AD requires:
# - Application ID and secret
# - Tenant ID (for tenant-specific auth)
# - Proper OAuth2 endpoints for Azure
self._write_output('Azure AD authenticator creation not yet implemented', 'warning')
return False

View File

@ -0,0 +1,184 @@
"""
Base authenticator migrator class.
This module defines the contract that all specific authenticator migrators must follow.
"""
from awx.main.utils.gateway_client import GatewayAPIError
import re
import hashlib
class BaseAuthenticatorMigrator:
"""
Base class for all authenticator migrators.
Defines the contract that all specific authenticator migrators must follow.
"""
def __init__(self, gateway_client=None, command=None):
"""
Initialize the authenticator migrator.
Args:
gateway_client: GatewayClient instance for API calls
command: Optional Django management command instance (for styled output)
"""
self.gateway_client = gateway_client
self.command = command
def migrate(self):
"""
Main entry point - orchestrates the migration process.
Returns:
dict: Summary of migration results
"""
# Get configuration from AWX/Controller
configs = self.get_controller_config()
if not configs:
self._write_output(f'No {self.get_authenticator_type()} authenticators found to migrate.', 'warning')
return {'created': 0, 'failed': 0, 'mappers_created': 0, 'mappers_failed': 0}
self._write_output(f'Found {len(configs)} {self.get_authenticator_type()} authentication configuration(s).', 'success')
# Process each authenticator configuration
created_authenticators = []
for config in configs:
if self.create_gateway_authenticator(config):
created_authenticators.append(config)
# Create mappers for successfully created authenticators
mappers_created = 0
mappers_failed = 0
if created_authenticators:
self._write_output('\n=== Creating Authenticator Mappers ===', 'success')
for config in created_authenticators:
mapper_result = self._create_gateway_mappers(config)
mappers_created += mapper_result['created']
mappers_failed += mapper_result['failed']
return {
'created': len(created_authenticators),
'failed': len(configs) - len(created_authenticators),
'mappers_created': mappers_created,
'mappers_failed': mappers_failed,
}
def get_controller_config(self):
"""
Gather configuration from AWX/Controller.
Returns:
list: List of configuration dictionaries
"""
raise NotImplementedError("Subclasses must implement get_controller_config()")
def create_gateway_authenticator(self, config):
"""
Create authenticator in Gateway.
Args:
config: Configuration dictionary from get_controller_config()
Returns:
bool: True if authenticator was created successfully, False otherwise
"""
raise NotImplementedError("Subclasses must implement create_gateway_authenticator()")
def get_authenticator_type(self):
"""
Get the human-readable authenticator type name.
Returns:
str: Authenticator type name for logging
"""
raise NotImplementedError("Subclasses must implement get_authenticator_type()")
def _generate_authenticator_slug(self, auth_type, category, identifier):
"""Generate a deterministic slug for an authenticator."""
base_string = f"awx-{auth_type}-{category}-{identifier}"
cleaned = re.sub(r'[^a-zA-Z0-9]+', '-', base_string.lower())
cleaned = re.sub(r'^-+|-+$', '', cleaned)
cleaned = re.sub(r'-+', '-', cleaned)
slug_hash = hashlib.md5(cleaned.encode('utf-8')).hexdigest()[:8]
final_slug = f"awx-{auth_type}-{slug_hash}"
return final_slug
def _create_gateway_mappers(self, config):
"""Create authenticator mappers in Gateway from AWX config."""
authenticator_id = config.get('gateway_authenticator_id')
if not authenticator_id:
self._write_output(f'No authenticator ID found for {config["category"]}, skipping mappers', 'error')
return {'created': 0, 'failed': 0}
category = config['category']
org_mappers = config.get('org_mappers', [])
team_mappers = config.get('team_mappers', [])
total_mappers = len(org_mappers) + len(team_mappers)
if total_mappers == 0:
self._write_output(f'No mappers to create for {category} authenticator')
return {'created': 0, 'failed': 0}
self._write_output(f'\n--- Creating mappers for {category} authenticator (ID: {authenticator_id}) ---')
self._write_output(f'Organization mappers: {len(org_mappers)}')
self._write_output(f'Team mappers: {len(team_mappers)}')
created_count = 0
failed_count = 0
# Create organization mappers
for mapper in org_mappers:
if self._create_single_mapper(authenticator_id, mapper, 'organization'):
created_count += 1
else:
failed_count += 1
# Create team mappers
for mapper in team_mappers:
if self._create_single_mapper(authenticator_id, mapper, 'team'):
created_count += 1
else:
failed_count += 1
# Summary
self._write_output(f'Mappers created: {created_count}, failed: {failed_count}')
return {'created': created_count, 'failed': failed_count}
def _create_single_mapper(self, authenticator_id, mapper_config, mapper_type):
"""Create a single mapper in Gateway."""
try:
# Update the mapper config with the correct authenticator ID
mapper_config = mapper_config.copy() # Don't modify the original
mapper_config['authenticator'] = authenticator_id
# Create the mapper
self.gateway_client.create_authenticator_map(authenticator_id, mapper_config)
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✓ Created {mapper_type} mapper: {mapper_name}', 'success')
return True
except GatewayAPIError as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✗ Failed to create {mapper_type} mapper "{mapper_name}": {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✗ Unexpected error creating {mapper_type} mapper "{mapper_name}": {str(e)}', 'error')
return False
def _write_output(self, message, style=None):
"""Write output message if command is available."""
if self.command:
if style == 'success':
self.command.stdout.write(self.command.style.SUCCESS(message))
elif style == 'warning':
self.command.stdout.write(self.command.style.WARNING(message))
elif style == 'error':
self.command.stdout.write(self.command.style.ERROR(message))
else:
self.command.stdout.write(message)

View File

@ -0,0 +1,232 @@
"""
GitHub authenticator migrator.
This module handles the migration of GitHub authenticators from AWX to Gateway.
"""
from django.conf import settings
from awx.conf import settings_registry
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format
from awx.main.utils.gateway_client import GatewayAPIError
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
import re
class GitHubMigrator(BaseAuthenticatorMigrator):
"""
Handles the migration of GitHub authenticators from AWX to Gateway.
"""
def get_authenticator_type(self):
"""Get the human-readable authenticator type name."""
return "GitHub"
def get_controller_config(self):
"""
Export all GitHub authenticators. A GitHub authenticator is only exported if both,
id and secret, are defined. Otherwise it will be skipped.
Returns:
list: List of configured GitHub authentication providers with their settings
"""
github_categories = ['github', 'github-org', 'github-team', 'github-enterprise', 'github-enterprise-org', 'github-enterprise-team']
found_configs = []
for category in github_categories:
try:
category_settings = settings_registry.get_registered_settings(category_slug=category)
if category_settings:
config_data = {}
key_setting = None
secret_setting = None
# Ensure category_settings is iterable and contains strings
if isinstance(category_settings, re.Pattern) or not hasattr(category_settings, '__iter__') or isinstance(category_settings, str):
continue
for setting_name in category_settings:
# Skip if setting_name is not a string (e.g., regex pattern)
if not isinstance(setting_name, str):
continue
if setting_name.endswith('_KEY'):
key_setting = setting_name
elif setting_name.endswith('_SECRET'):
secret_setting = setting_name
# Skip this category if KEY or SECRET is missing or empty
if not key_setting or not secret_setting:
continue
key_value = getattr(settings, key_setting, None)
secret_value = getattr(settings, secret_setting, None)
# Skip this category if OIDC Key and/or Secret are not configured
if not key_value or not secret_value:
continue
# If we have both key and secret, collect all settings
org_map_value = None
team_map_value = None
for setting_name in category_settings:
# Skip if setting_name is not a string (e.g., regex pattern)
if not isinstance(setting_name, str):
continue
value = getattr(settings, setting_name, None)
config_data[setting_name] = value
# Capture org and team map values for special processing
if setting_name.endswith('_ORGANIZATION_MAP'):
org_map_value = value
elif setting_name.endswith('_TEAM_MAP'):
team_map_value = value
# Convert GitHub org and team mappings from AWX to the Gateway format
# Start with order 1 and maintain sequence across both org and team mappers
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=1)
team_mappers, _ = team_map_to_gateway_format(team_map_value, start_order=next_order)
found_configs.append({'category': category, 'settings': config_data, 'org_mappers': org_mappers, 'team_mappers': team_mappers})
except Exception as e:
raise Exception(f'Could not retrieve {category} settings: {str(e)}')
return found_configs
def create_gateway_authenticator(self, config):
"""Create a GitHub/OIDC authenticator in Gateway."""
category = config['category']
settings = config['settings']
# Extract the OAuth2 credentials
key_value = None
secret_value = None
for setting_name, value in settings.items():
if setting_name.endswith('_KEY') and value:
key_value = value
elif setting_name.endswith('_SECRET') and value:
secret_value = value
if not key_value or not secret_value:
self._write_output(f'Skipping {category}: missing OAuth2 credentials', 'warning')
return False
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('-', '_').title()}"
authenticator_slug = self._generate_authenticator_slug('github', category, key_value)
# Map AWX category to Gateway authenticator type
type_mapping = {
'github': 'ansible_base.authentication.authenticator_plugins.github',
'github-org': 'ansible_base.authentication.authenticator_plugins.github_org',
'github-team': 'ansible_base.authentication.authenticator_plugins.github_team',
'github-enterprise': 'ansible_base.authentication.authenticator_plugins.github_enterprise',
'github-enterprise-org': 'ansible_base.authentication.authenticator_plugins.github_enterprise_org',
'github-enterprise-team': 'ansible_base.authentication.authenticator_plugins.github_enterprise_team',
}
authenticator_type = type_mapping.get(category)
if not authenticator_type:
self._write_output(f'Unknown category {category}, skipping', 'warning')
return False
self._write_output(f'\n--- Processing {category} authenticator ---')
self._write_output(f'Name: {authenticator_name}')
self._write_output(f'Slug: {authenticator_slug}')
self._write_output(f'Type: {authenticator_type}')
self._write_output(f'Client ID: {key_value}')
self._write_output(f'Client Secret: {"*" * 8}')
try:
# Check if authenticator already exists by slug
existing_authenticators = self.gateway_client.get_authenticators()
existing_authenticator = None
for auth in existing_authenticators:
if auth.get('slug') == authenticator_slug:
existing_authenticator = auth
break
if existing_authenticator:
# Authenticator already exists, use it
authenticator_id = existing_authenticator.get('id')
self._write_output(f'⚠ Authenticator already exists with ID: {authenticator_id}', 'warning')
# Store the existing result for mapper creation
config['gateway_authenticator_id'] = authenticator_id
config['gateway_authenticator'] = existing_authenticator
return True
else:
# Authenticator doesn't exist, create it
self._write_output('Creating new authenticator...')
# Build Gateway authenticator configuration
gateway_config = {
"name": authenticator_name,
"slug": authenticator_slug,
"type": authenticator_type,
"enabled": True,
"create_objects": True, # Allow Gateway to create users/orgs/teams
"remove_users": False, # Don't remove users by default
"configuration": {"KEY": key_value, "SECRET": secret_value},
}
# Add any additional configuration based on AWX settings
additional_config = self._build_additional_config(category, settings)
gateway_config["configuration"].update(additional_config)
# Create the authenticator
result = self.gateway_client.create_authenticator(gateway_config)
self._write_output(f'✓ Successfully created authenticator with ID: {result.get("id")}', 'success')
# Store the result for potential mapper creation later
config['gateway_authenticator_id'] = result.get('id')
config['gateway_authenticator'] = result
return True
except GatewayAPIError as e:
self._write_output(f'✗ Failed to create {category} authenticator: {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
self._write_output(f'✗ Unexpected error creating {category} authenticator: {str(e)}', 'error')
return False
def _build_additional_config(self, category, settings):
"""Build additional configuration for specific authenticator types."""
additional_config = {}
# Add scope configuration if present
for setting_name, value in settings.items():
if setting_name.endswith('_SCOPE') and value:
additional_config['SCOPE'] = value
break
# Add GitHub Enterprise URL if present
if 'enterprise' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_URL') and value:
additional_config['URL'] = value
elif setting_name.endswith('_API_URL') and value:
additional_config['API_URL'] = value
# Add organization name for org-specific authenticators
if 'org' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_NAME') and value:
additional_config['NAME'] = value
break
# Add team ID for team-specific authenticators
if 'team' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_ID') and value:
additional_config['ID'] = value
break
return additional_config

View File

@ -0,0 +1,50 @@
"""
Google OAuth2 authenticator migrator.
This module handles the migration of Google OAuth2 authenticators from AWX to Gateway.
"""
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
class GoogleOAuth2Migrator(BaseAuthenticatorMigrator):
"""
Handles the migration of Google OAuth2 authenticators from AWX to Gateway.
"""
def get_authenticator_type(self):
"""Get the human-readable authenticator type name."""
return "Google OAuth2"
def get_controller_config(self):
"""
Export Google OAuth2 authenticators. A Google OAuth2 authenticator is only exported if
KEY and SECRET are configured.
Returns:
list: List of configured Google OAuth2 authentication providers with their settings
"""
# TODO: Implement Google OAuth2 configuration retrieval
# Google OAuth2 settings typically include:
# - SOCIAL_AUTH_GOOGLE_OAUTH2_KEY
# - SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET
# - SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE
# - SOCIAL_AUTH_GOOGLE_OAUTH2_WHITELISTED_DOMAINS
# - SOCIAL_AUTH_GOOGLE_OAUTH2_WHITELISTED_EMAILS
# - SOCIAL_AUTH_GOOGLE_OAUTH2_ORGANIZATION_MAP
# - SOCIAL_AUTH_GOOGLE_OAUTH2_TEAM_MAP
found_configs = []
return found_configs
def create_gateway_authenticator(self, config):
"""Create a Google OAuth2 authenticator in Gateway."""
# TODO: Implement Google OAuth2 authenticator creation
# When implementing, use this pattern for slug generation:
# client_id = settings.get('SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', 'google')
# authenticator_slug = self._generate_authenticator_slug('google_oauth2', category, client_id)
# Similar to GitHub OAuth2 but with Google-specific endpoints
# - Extract GOOGLE_OAUTH2_KEY and GOOGLE_OAUTH2_SECRET
# - Handle whitelisted domains/emails
# - Configure Google OAuth2 scope
self._write_output('Google OAuth2 authenticator creation not yet implemented', 'warning')
return False

View File

@ -0,0 +1,62 @@
"""
LDAP authenticator migrator.
This module handles the migration of LDAP authenticators from AWX to Gateway.
"""
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
class LDAPMigrator(BaseAuthenticatorMigrator):
"""
Handles the migration of LDAP authenticators from AWX to Gateway.
"""
def get_authenticator_type(self):
"""Get the human-readable authenticator type name."""
return "LDAP"
def get_controller_config(self):
"""
Export all LDAP authenticators. An LDAP authenticator is only exported if
SERVER_URI is configured. Otherwise it will be skipped.
Returns:
list: List of configured LDAP authentication providers with their settings
"""
# TODO: Implement LDAP configuration retrieval
# AWX supports up to 6 LDAP configurations: AUTH_LDAP (default) and AUTH_LDAP_1 through AUTH_LDAP_5
# LDAP settings typically include:
# - AUTH_LDAP_SERVER_URI
# - AUTH_LDAP_BIND_DN
# - AUTH_LDAP_BIND_PASSWORD
# - AUTH_LDAP_START_TLS
# - AUTH_LDAP_CONNECTION_OPTIONS
# - AUTH_LDAP_USER_SEARCH
# - AUTH_LDAP_USER_DN_TEMPLATE
# - AUTH_LDAP_USER_ATTR_MAP
# - AUTH_LDAP_GROUP_SEARCH
# - AUTH_LDAP_GROUP_TYPE
# - AUTH_LDAP_GROUP_TYPE_PARAMS
# - AUTH_LDAP_REQUIRE_GROUP
# - AUTH_LDAP_DENY_GROUP
# - AUTH_LDAP_USER_FLAGS_BY_GROUP
# - AUTH_LDAP_ORGANIZATION_MAP
# - AUTH_LDAP_TEAM_MAP
found_configs = []
return found_configs
def create_gateway_authenticator(self, config):
"""Create an LDAP authenticator in Gateway."""
# TODO: Implement LDAP authenticator creation
# When implementing, use this pattern for slug generation:
# server_uri = settings.get('AUTH_LDAP_SERVER_URI', 'ldap')
# authenticator_slug = self._generate_authenticator_slug('ldap', category, server_uri)
# LDAP requires:
# - Server URI and connection settings
# - Bind DN and password for authentication
# - User and group search configurations
# - Attribute mapping for user fields
# - Group type and parameters
self._write_output('LDAP authenticator creation not yet implemented', 'warning')
return False

View File

@ -0,0 +1,51 @@
"""
Generic OIDC authenticator migrator.
This module handles the migration of generic OIDC authenticators from AWX to Gateway.
"""
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
class OIDCMigrator(BaseAuthenticatorMigrator):
"""
Handles the migration of generic OIDC authenticators from AWX to Gateway.
"""
def get_authenticator_type(self):
"""Get the human-readable authenticator type name."""
return "OIDC"
def get_controller_config(self):
"""
Export generic OIDC authenticators. An OIDC authenticator is only exported if both,
id and secret, are defined. Otherwise it will be skipped.
Returns:
list: List of configured OIDC authentication providers with their settings
"""
# TODO: Implement OIDC configuration retrieval
# OIDC settings typically include:
# - SOCIAL_AUTH_OIDC_KEY
# - SOCIAL_AUTH_OIDC_SECRET
# - SOCIAL_AUTH_OIDC_SCOPE
# - SOCIAL_AUTH_OIDC_OIDC_ENDPOINT
# - SOCIAL_AUTH_OIDC_VERIFY_SSL
# - SOCIAL_AUTH_OIDC_ORGANIZATION_MAP
# - SOCIAL_AUTH_OIDC_TEAM_MAP
found_configs = []
return found_configs
def create_gateway_authenticator(self, config):
"""Create a generic OIDC authenticator in Gateway."""
# TODO: Implement OIDC authenticator creation
# When implementing, use this pattern for slug generation:
# client_id = settings.get('SOCIAL_AUTH_OIDC_KEY', 'oidc')
# authenticator_slug = self._generate_authenticator_slug('oidc', category, client_id)
# OIDC requires:
# - Client ID and secret
# - OIDC endpoint URL
# - Proper scope configuration
# - SSL verification settings
self._write_output('OIDC authenticator creation not yet implemented', 'warning')
return False

View File

@ -0,0 +1,52 @@
"""
SAML authenticator migrator.
This module handles the migration of SAML authenticators from AWX to Gateway.
"""
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
class SAMLMigrator(BaseAuthenticatorMigrator):
"""
Handles the migration of SAML authenticators from AWX to Gateway.
"""
def get_authenticator_type(self):
"""Get the human-readable authenticator type name."""
return "SAML"
def get_controller_config(self):
"""
Export SAML authenticators. A SAML authenticator is only exported if
required configuration is present.
Returns:
list: List of configured SAML authentication providers with their settings
"""
# TODO: Implement SAML configuration retrieval
# SAML settings typically include:
# - SOCIAL_AUTH_SAML_SP_ENTITY_ID
# - SOCIAL_AUTH_SAML_SP_PUBLIC_CERT
# - SOCIAL_AUTH_SAML_SP_PRIVATE_KEY
# - SOCIAL_AUTH_SAML_ORG_INFO
# - SOCIAL_AUTH_SAML_TECHNICAL_CONTACT
# - SOCIAL_AUTH_SAML_SUPPORT_CONTACT
# - SOCIAL_AUTH_SAML_ENABLED_IDPS
# - SOCIAL_AUTH_SAML_ORGANIZATION_MAP
# - SOCIAL_AUTH_SAML_TEAM_MAP
found_configs = []
return found_configs
def create_gateway_authenticator(self, config):
"""Create a SAML authenticator in Gateway."""
# TODO: Implement SAML authenticator creation
# When implementing, use this pattern for slug generation:
# entity_id = settings.get('SOCIAL_AUTH_SAML_SP_ENTITY_ID', 'saml')
# authenticator_slug = self._generate_authenticator_slug('saml', category, entity_id)
# SAML requires complex configuration including:
# - SP entity ID, certificates, metadata
# - IdP configuration and metadata
# - Attribute mapping
self._write_output('SAML authenticator creation not yet implemented', 'warning')
return False