Aap 47760 - initial auth migration management command (#6981)

* wip: management command for authenticator export to GateWay

* wip: implement ldap auth config migration

* refactor: split concerns into gathering config and converting / recreating config

* refactor: dry run by default

* use the authenticator slug for idempotency

* move to correct utils path

* use env vars instead of flags, fix linter errors

* remove unused import
This commit is contained in:
Peter Braun 2025-06-30 13:11:46 +02:00 committed by thedoubl3j
parent 243e27c7a9
commit 05b6f4fcb9
No known key found for this signature in database
GPG Key ID: E84C42ACF75B0768
7 changed files with 2128 additions and 0 deletions

View File

@ -0,0 +1,102 @@
import sys
import os
from django.core.management.base import BaseCommand
from awx.sso.utils.auth_migration import AuthConfigMigrator
from awx.main.utils.auth_exporter import AuthConfigExporter
from awx.main.utils.gateway_client import GatewayClient, GatewayAPIError
class Command(BaseCommand):
help = 'Import existing auth provider configurations to AAP Gateway via API requests'
def add_arguments(self, parser):
parser.add_argument('--skip-oidc', action='store_true', help='Skip importing GitHub and generic OIDC authenticators')
parser.add_argument('--skip-ldap', action='store_true', help='Skip importing LDAP authenticators')
def handle(self, *args, **options):
# Read Gateway connection parameters from environment variables
gateway_base_url = os.getenv('GATEWAY_BASE_URL')
gateway_user = os.getenv('GATEWAY_USER')
gateway_password = os.getenv('GATEWAY_PASSWORD')
gateway_skip_verify = os.getenv('GATEWAY_SKIP_VERIFY', '').lower() in ('true', '1', 'yes', 'on')
skip_oidc = options['skip_oidc']
# skip_ldap = options['skip_ldap']
# If the management command isn't called with all parameters needed to talk to Gateway, consider
# it a dry-run and exit cleanly
if not gateway_base_url or not gateway_user or not gateway_password:
self.stdout.write(self.style.WARNING('Missing required environment variables:'))
self.stdout.write(self.style.WARNING('- GATEWAY_BASE_URL: Base URL of the AAP Gateway instance'))
self.stdout.write(self.style.WARNING('- GATEWAY_USER: Username for AAP Gateway authentication'))
self.stdout.write(self.style.WARNING('- GATEWAY_PASSWORD: Password for AAP Gateway authentication'))
self.stdout.write(self.style.WARNING('- GATEWAY_SKIP_VERIFY: Skip SSL certificate verification (optional)'))
sys.exit(0)
self.stdout.write(self.style.SUCCESS(f'Gateway Base URL: {gateway_base_url}'))
self.stdout.write(self.style.SUCCESS(f'Gateway User: {gateway_user}'))
self.stdout.write(self.style.SUCCESS(f'Gateway Password: {"*" * len(gateway_password)}'))
self.stdout.write(self.style.SUCCESS(f'Skip SSL Verification: {gateway_skip_verify}'))
# Initialize the auth config migrator
migrator = AuthConfigMigrator()
# Gather all authentication configurations
try:
# Retrieve GitHub OIDC configuration
github_oidc_configs = migrator.get_github_oidc_config() if not skip_oidc else []
# Retrieve LDAP configuration
# ldap_configs = migrator.get_ldap_config() if not skip_ldap else []
# Create Gateway client and export configurations
self.stdout.write(self.style.SUCCESS('\n=== Connecting to Gateway ==='))
try:
with GatewayClient(
base_url=gateway_base_url, username=gateway_user, password=gateway_password, skip_verify=gateway_skip_verify
) as gateway_client:
self.stdout.write(self.style.SUCCESS('Successfully connected to Gateway'))
# Initialize the auth config exporter
exporter = AuthConfigExporter(gateway_client, self)
# Export GitHub configurations
if github_oidc_configs:
self.stdout.write(self.style.SUCCESS('\n=== Exporting GitHub Configurations ==='))
github_result = exporter.export_configs(github_oidc_configs, 'github')
self._print_export_summary('GitHub', github_result)
# Export LDAP configurations
# if ldap_configs:
# self.stdout.write(self.style.SUCCESS('\n=== Exporting LDAP Configurations ==='))
# ldap_result = exporter.export_configs(ldap_configs, 'ldap')
# self._print_export_summary('LDAP', ldap_result)
# Overall summary
if not github_oidc_configs: # and not ldap_configs:
self.stdout.write(self.style.WARNING('No authentication configurations found to migrate.'))
except GatewayAPIError as e:
self.stdout.write(self.style.ERROR(f'Gateway API Error: {e.message}'))
if e.status_code:
self.stdout.write(self.style.ERROR(f'Status Code: {e.status_code}'))
if e.response_data:
self.stdout.write(self.style.ERROR(f'Response: {e.response_data}'))
return
except Exception as e:
self.stdout.write(self.style.ERROR(f'Unexpected error connecting to Gateway: {str(e)}'))
return
except Exception as e:
self.stdout.write(self.style.ERROR(f'Error retrieving authentication configs: {str(e)}'))
def _print_export_summary(self, config_type, result):
"""Print a summary of the export results."""
self.stdout.write(f'\n--- {config_type} Export Summary ---')
self.stdout.write(f'Authenticators created: {result["created"]}')
self.stdout.write(f'Authenticators failed: {result["failed"]}')
self.stdout.write(f'Mappers created: {result["mappers_created"]}')
self.stdout.write(f'Mappers failed: {result["mappers_failed"]}')

View File

@ -0,0 +1,695 @@
"""
Unit tests for auth migration utilities.
"""
import pytest
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format
def get_org_mappers(org_map, start_order=1):
"""Helper function to get just the mappers from org_map_to_gateway_format."""
result, _ = org_map_to_gateway_format(org_map, start_order)
return result
def get_team_mappers(team_map, start_order=1):
"""Helper function to get just the mappers from team_map_to_gateway_format."""
result, _ = team_map_to_gateway_format(team_map, start_order)
return result
class TestOrgMapToGatewayFormat:
def test_none_input(self):
"""Test that None input returns empty list."""
result, next_order = org_map_to_gateway_format(None)
assert result == []
assert next_order == 1 # Default start_order
def test_empty_dict(self):
"""Test that empty dict returns empty list."""
result, next_order = org_map_to_gateway_format({})
assert result == []
assert next_order == 1
def test_single_org_with_admin_true(self):
"""Test organization with admin access set to True."""
org_map = {"myorg": {"admins": True}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Admins",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"always": {}},
"role": "Organization Admin",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_org_with_admin_false(self):
"""Test organization with admin access set to False."""
org_map = {"myorg": {"admins": False}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Admins",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"never": {}},
"role": "Organization Admin",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_org_with_admin_string(self):
"""Test organization with admin access set to a specific group."""
org_map = {"myorg": {"admins": "admin-group"}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Admins",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"groups": {"has_or": ["admin-group"]}},
"role": "Organization Admin",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_org_with_admin_list(self):
"""Test organization with admin access set to multiple groups."""
org_map = {"myorg": {"admins": ["admin-group1", "admin-group2"]}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Admins",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"groups": {"has_or": ["admin-group1", "admin-group2"]}},
"role": "Organization Admin",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_org_with_users_true(self):
"""Test organization with user access set to True."""
org_map = {"myorg": {"users": True}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Users",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"always": {}},
"role": "Organization Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_org_with_both_admin_and_users(self):
"""Test organization with both admin and user mappings."""
org_map = {"myorg": {"admins": True, "users": ["user-group"]}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Admins",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"always": {}},
"role": "Organization Admin",
"revoke": False,
"order": 1,
},
{
"name": "myorg - Organization Users",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"groups": {"has_or": ["user-group"]}},
"role": "Organization Member",
"revoke": False,
"order": 2,
},
]
assert result == expected
def test_single_org_with_remove_admins(self):
"""Test organization with remove_admins flag."""
org_map = {"myorg": {"admins": True, "remove_admins": True}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Admins",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"always": {}},
"role": "Organization Admin",
"revoke": True,
"order": 1,
}
]
assert result == expected
def test_single_org_with_remove_users(self):
"""Test organization with remove_users flag."""
org_map = {"myorg": {"users": True, "remove_users": True}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Users",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"always": {}},
"role": "Organization Member",
"revoke": True,
"order": 1,
}
]
assert result == expected
def test_multiple_organizations(self):
"""Test multiple organizations with different configurations."""
org_map = {"org1": {"admins": True}, "org2": {"users": ["group1", "group2"]}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "org1 - Organization Admins",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "org1",
"triggers": {"always": {}},
"role": "Organization Admin",
"revoke": False,
"order": 1,
},
{
"name": "org2 - Organization Users",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "org2",
"triggers": {"groups": {"has_or": ["group1", "group2"]}},
"role": "Organization Member",
"revoke": False,
"order": 2,
},
]
assert result == expected
def test_org_with_none_values_skipped(self):
"""Test that entries with None values are skipped."""
org_map = {"myorg": {"admins": None, "users": True}}
result, _ = org_map_to_gateway_format(org_map)
expected = [
{
"name": "myorg - Organization Users",
"authenticator": -1,
"map_type": "organization",
"team": None,
"organization": "myorg",
"triggers": {"always": {}},
"role": "Organization Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_order_increments_correctly(self):
"""Test that order values increment correctly."""
org_map = {"myorg": {"admins": True, "users": True}}
result, _ = org_map_to_gateway_format(org_map)
assert len(result) == 2
assert result[0]["order"] == 1
assert result[1]["order"] == 2
def test_triggers_format_validation(self):
"""Test that trigger formats match Gateway specification."""
org_map = {"myorg": {"admins": ["group1", "group2"]}}
result, _ = org_map_to_gateway_format(org_map)
# Validate that triggers follow Gateway format
triggers = result[0]["triggers"]
assert "groups" in triggers
assert "has_or" in triggers["groups"]
assert isinstance(triggers["groups"]["has_or"], list)
assert triggers["groups"]["has_or"] == ["group1", "group2"]
def test_string_to_list_conversion(self):
"""Test that string groups are converted to lists."""
org_map = {"myorg": {"users": "single-group"}}
result, _ = org_map_to_gateway_format(org_map)
# Should convert string to list for has_or
assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"]
class TestTeamMapToGatewayFormat:
"""Tests for team_map_to_gateway_format function."""
def test_none_input(self):
"""Test that None input returns empty list."""
result, next_order = team_map_to_gateway_format(None)
assert result == []
assert next_order == 1 # Default start_order
def test_empty_dict(self):
"""Test that empty dict returns empty list."""
result, next_order = team_map_to_gateway_format({})
assert result == []
assert next_order == 1
def test_single_team_with_users_true(self):
"""Test team with users access set to True."""
team_map = {"engineering-team": {"organization": "myorg", "users": True}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "myorg - engineering-team",
"authenticator": -1,
"map_type": "team",
"team": "engineering-team",
"organization": "myorg",
"triggers": {"always": {}},
"role": "Team Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_team_with_users_false(self):
"""Test team with users access set to False."""
team_map = {"dev-team": {"organization": "myorg", "users": False}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "myorg - dev-team",
"authenticator": -1,
"map_type": "team",
"team": "dev-team",
"organization": "myorg",
"triggers": {"never": {}},
"role": "Team Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_team_with_users_string(self):
"""Test team with users access set to a specific group."""
team_map = {"qa-team": {"organization": "myorg", "users": "qa-group"}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "myorg - qa-team",
"authenticator": -1,
"map_type": "team",
"team": "qa-team",
"organization": "myorg",
"triggers": {"groups": {"has_or": ["qa-group"]}},
"role": "Team Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_single_team_with_users_list(self):
"""Test team with users access set to multiple groups."""
team_map = {"ops-team": {"organization": "myorg", "users": ["ops-group1", "ops-group2"]}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "myorg - ops-team",
"authenticator": -1,
"map_type": "team",
"team": "ops-team",
"organization": "myorg",
"triggers": {"groups": {"has_or": ["ops-group1", "ops-group2"]}},
"role": "Team Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_team_with_remove_flag(self):
"""Test team with remove flag set to True."""
team_map = {"legacy-team": {"organization": "myorg", "users": True, "remove": True}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "myorg - legacy-team",
"authenticator": -1,
"map_type": "team",
"team": "legacy-team",
"organization": "myorg",
"triggers": {"always": {}},
"role": "Team Member",
"revoke": True,
"order": 1,
}
]
assert result == expected
def test_team_with_no_organization(self):
"""Test team without organization specified."""
team_map = {"orphan-team": {"users": True}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "Unknown - orphan-team",
"authenticator": -1,
"map_type": "team",
"team": "orphan-team",
"organization": "Unknown",
"triggers": {"always": {}},
"role": "Team Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_multiple_teams(self):
"""Test multiple teams with different configurations."""
team_map = {"team1": {"organization": "org1", "users": True}, "team2": {"organization": "org2", "users": ["group1", "group2"]}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "org1 - team1",
"authenticator": -1,
"map_type": "team",
"team": "team1",
"organization": "org1",
"triggers": {"always": {}},
"role": "Team Member",
"revoke": False,
"order": 1,
},
{
"name": "org2 - team2",
"authenticator": -1,
"map_type": "team",
"team": "team2",
"organization": "org2",
"triggers": {"groups": {"has_or": ["group1", "group2"]}},
"role": "Team Member",
"revoke": False,
"order": 2,
},
]
assert result == expected
def test_team_with_none_users_skipped(self):
"""Test that teams with None users are skipped."""
team_map = {"skipped-team": {"organization": "myorg", "users": None}, "valid-team": {"organization": "myorg", "users": True}}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "myorg - valid-team",
"authenticator": -1,
"map_type": "team",
"team": "valid-team",
"organization": "myorg",
"triggers": {"always": {}},
"role": "Team Member",
"revoke": False,
"order": 1,
}
]
assert result == expected
def test_order_increments_correctly(self):
"""Test that order values increment correctly for teams."""
team_map = {"team1": {"organization": "myorg", "users": True}, "team2": {"organization": "myorg", "users": True}}
result, _ = team_map_to_gateway_format(team_map)
assert len(result) == 2
assert result[0]["order"] == 1
assert result[1]["order"] == 2
def test_string_to_list_conversion(self):
"""Test that string groups are converted to lists."""
team_map = {"myteam": {"organization": "myorg", "users": "single-group"}}
result, _ = team_map_to_gateway_format(team_map)
# Should convert string to list for has_or
assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"]
def test_team_triggers_format_validation(self):
"""Test that team trigger formats match Gateway specification."""
team_map = {"myteam": {"organization": "myorg", "users": ["group1", "group2"]}}
result, _ = team_map_to_gateway_format(team_map)
# Validate that triggers follow Gateway format
triggers = result[0]["triggers"]
assert "groups" in triggers
assert "has_or" in triggers["groups"]
assert isinstance(triggers["groups"]["has_or"], list)
assert triggers["groups"]["has_or"] == ["group1", "group2"]
def test_team_with_regex_patterns(self):
"""Test that teams with regex patterns in users are handled correctly."""
team_map = {
"My Team": {"organization": "Test Org", "users": ["/^[^@]+?@test\\.example\\.com$/"], "remove": True},
"Other Team": {"organization": "Test Org 2", "users": ["/^[^@]+?@test\\.example\\.com$/"], "remove": False},
}
result, _ = team_map_to_gateway_format(team_map)
expected = [
{
"name": "Test Org - My Team",
"authenticator": -1,
"map_type": "team",
"team": "My Team",
"organization": "Test Org",
"triggers": {"groups": {"has_or": ["/^[^@]+?@test\\.example\\.com$/"]}},
"role": "Team Member",
"revoke": True,
"order": 1,
},
{
"name": "Test Org 2 - Other Team",
"authenticator": -1,
"map_type": "team",
"team": "Other Team",
"organization": "Test Org 2",
"triggers": {"groups": {"has_or": ["/^[^@]+?@test\\.example\\.com$/"]}},
"role": "Team Member",
"revoke": False,
"order": 2,
},
]
assert result == expected
# Validate that the result is JSON serializable
import json
json_str = json.dumps(result)
assert json_str is not None
# Parametrized tests for edge cases
@pytest.mark.parametrize(
"org_map,expected_length",
[
(None, 0),
({}, 0),
({"org1": {}}, 0), # Organization with no admin/user mappings
({"org1": {"admins": True}}, 1),
({"org1": {"users": True}}, 1),
({"org1": {"admins": True, "users": True}}, 2),
({"org1": {"admins": True}, "org2": {"users": True}}, 2),
],
)
def test_org_map_result_lengths(org_map, expected_length):
"""Test that org_map_to_gateway_format returns expected number of mappings."""
result, _ = org_map_to_gateway_format(org_map)
assert len(result) == expected_length
# Test for Gateway format compliance
@pytest.mark.parametrize(
"org_map",
[
{"org1": {"admins": True}},
{"org1": {"users": ["group1"]}},
{"org1": {"admins": False}},
],
)
def test_gateway_format_compliance(org_map):
"""Test that all results comply with Gateway mapping format."""
result, _ = org_map_to_gateway_format(org_map)
for mapping in result:
# Required fields per Gateway spec
assert "name" in mapping
assert "authenticator" in mapping
assert "map_type" in mapping
assert "organization" in mapping
assert "team" in mapping
assert "triggers" in mapping
assert "role" in mapping
assert "revoke" in mapping
assert "order" in mapping
# Field types
assert isinstance(mapping["name"], str)
assert isinstance(mapping["authenticator"], int)
assert mapping["map_type"] == "organization" # For org mappings
assert isinstance(mapping["organization"], str)
assert mapping["team"] is None # For org mappings, team should be None
assert isinstance(mapping["triggers"], dict)
assert isinstance(mapping["role"], str)
assert isinstance(mapping["revoke"], bool)
assert isinstance(mapping["order"], int)
# Parametrized tests for team mappings
@pytest.mark.parametrize(
"team_map,expected_length",
[
(None, 0),
({}, 0),
({"team1": {"organization": "org1", "users": None}}, 0), # Team with None users should be skipped
({"team1": {"organization": "org1", "users": True}}, 1),
({"team1": {"organization": "org1", "users": ["group1"]}}, 1),
({"team1": {"organization": "org1", "users": True}, "team2": {"organization": "org2", "users": False}}, 2),
],
)
def test_team_map_result_lengths(team_map, expected_length):
"""Test that team_map_to_gateway_format returns expected number of mappings."""
result, _ = team_map_to_gateway_format(team_map)
assert len(result) == expected_length
# Test for Gateway format compliance for teams
@pytest.mark.parametrize(
"team_map",
[
{"team1": {"organization": "org1", "users": True}},
{"team1": {"organization": "org1", "users": ["group1"]}},
{"team1": {"organization": "org1", "users": False}},
],
)
def test_team_gateway_format_compliance(team_map):
"""Test that all team results comply with Gateway mapping format."""
result, _ = team_map_to_gateway_format(team_map)
for mapping in result:
# Required fields per Gateway spec
assert "name" in mapping
assert "authenticator" in mapping
assert "map_type" in mapping
assert "organization" in mapping
assert "team" in mapping
assert "triggers" in mapping
assert "role" in mapping
assert "revoke" in mapping
assert "order" in mapping
# Field types
assert isinstance(mapping["name"], str)
assert isinstance(mapping["authenticator"], int)
assert mapping["map_type"] == "team" # For team mappings
assert isinstance(mapping["organization"], str)
assert isinstance(mapping["team"], str)
assert isinstance(mapping["triggers"], dict)
assert isinstance(mapping["role"], str)
assert isinstance(mapping["revoke"], bool)
assert isinstance(mapping["order"], int)

View File

@ -0,0 +1,590 @@
"""
Authentication configuration exporter for AAP Gateway.
This module handles the conversion and export of AWX authentication
configurations to AAP Gateway via REST API calls.
"""
import re
import hashlib
from awx.main.utils.gateway_client import GatewayClient, GatewayAPIError
class AuthConfigExporter:
"""
Handles the export of authentication configurations from AWX to Gateway.
Converts AWX configurations to Gateway format and creates authenticators and mappers.
"""
def __init__(self, gateway_client: GatewayClient, command=None):
"""
Initialize the auth config exporter.
Args:
gateway_client: GatewayClient instance for API calls
command: Optional Django management command instance (for styled output)
"""
self.gateway_client = gateway_client
self.command = command
def export_configs(self, auth_configs, config_type='github'):
"""
Export a list of authentication configurations to Gateway.
Args:
auth_configs: List of authentication configurations from AuthConfigMigrator
config_type: Type of configuration ('github', 'ldap', etc.)
Returns:
dict: Summary of export results
"""
if not auth_configs:
self._write_output(f'No {config_type} authenticators found to migrate.', 'warning')
return {'created': 0, 'failed': 0, 'mappers_created': 0, 'mappers_failed': 0}
self._write_output(f'Found {len(auth_configs)} {config_type} authentication configuration(s).', 'success')
# Process each authenticator configuration
created_authenticators = []
for config in auth_configs:
if self._create_gateway_authenticator(config, config_type):
created_authenticators.append(config)
# Create mappers for successfully created authenticators
mappers_created = 0
mappers_failed = 0
if created_authenticators:
self._write_output('\n=== Creating Authenticator Mappers ===', 'success')
for config in created_authenticators:
mapper_result = self._create_gateway_mappers(config)
mappers_created += mapper_result['created']
mappers_failed += mapper_result['failed']
return {
'created': len(created_authenticators),
'failed': len(auth_configs) - len(created_authenticators),
'mappers_created': mappers_created,
'mappers_failed': mappers_failed,
}
def _create_gateway_authenticator(self, config, config_type):
"""Create a single authenticator in Gateway from AWX config.
Args:
config: AWX authenticator configuration dict
config_type: Type of configuration ('github', 'ldap', etc.)
Returns:
bool: True if authenticator was created successfully, False otherwise
"""
category = config['category']
settings = config['settings']
# Handle different config types
if config_type == 'github':
return self._create_github_authenticator(config, category, settings)
elif config_type == 'ldap':
return self._create_ldap_authenticator(config, category, settings)
elif config_type == 'saml':
return self._create_saml_authenticator(config, category, settings)
elif config_type == 'google_oauth2':
return self._create_google_oauth2_authenticator(config, category, settings)
elif config_type == 'azure_ad':
return self._create_azure_ad_authenticator(config, category, settings)
elif config_type == 'radius':
return self._create_radius_authenticator(config, category, settings)
elif config_type == 'tacacs_plus':
return self._create_tacacs_plus_authenticator(config, category, settings)
else:
self._write_output(f'Unknown config type {config_type}, skipping', 'warning')
return False
def _create_github_authenticator(self, config, category, settings):
"""Create a GitHub authenticator in Gateway."""
# Extract the OAuth2 credentials
key_value = None
secret_value = None
for setting_name, value in settings.items():
if setting_name.endswith('_KEY') and value:
key_value = value
elif setting_name.endswith('_SECRET') and value:
secret_value = value
if not key_value or not secret_value:
self._write_output(f'Skipping {category}: missing OAuth2 credentials', 'warning')
return False
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('-', '_').title()}"
authenticator_slug = self._generate_authenticator_slug('github', category, key_value)
# Map AWX category to Gateway authenticator type
type_mapping = {
'github': 'ansible_base.authentication.authenticator_plugins.github',
'github-org': 'ansible_base.authentication.authenticator_plugins.github_org',
'github-team': 'ansible_base.authentication.authenticator_plugins.github_team',
'github-enterprise': 'ansible_base.authentication.authenticator_plugins.github_enterprise',
'github-enterprise-org': 'ansible_base.authentication.authenticator_plugins.github_enterprise_org',
'github-enterprise-team': 'ansible_base.authentication.authenticator_plugins.github_enterprise_team',
'oidc': 'ansible_base.authentication.authenticator_plugins.oidc',
}
authenticator_type = type_mapping.get(category)
if not authenticator_type:
self._write_output(f'Unknown category {category}, skipping', 'warning')
return False
self._write_output(f'\n--- Processing {category} authenticator ---')
self._write_output(f'Name: {authenticator_name}')
self._write_output(f'Slug: {authenticator_slug}')
self._write_output(f'Type: {authenticator_type}')
self._write_output(f'Client ID: {key_value}')
self._write_output(f'Client Secret: {"*" * 8}')
try:
# Check if authenticator already exists by slug
existing_authenticators = self.gateway_client.get_authenticators()
existing_authenticator = None
for auth in existing_authenticators:
if auth.get('slug') == authenticator_slug:
existing_authenticator = auth
break
if existing_authenticator:
# Authenticator already exists, use it
authenticator_id = existing_authenticator.get('id')
self._write_output(f'⚠ Authenticator already exists with ID: {authenticator_id}', 'warning')
# Store the existing result for mapper creation
config['gateway_authenticator_id'] = authenticator_id
config['gateway_authenticator'] = existing_authenticator
return True
else:
# Authenticator doesn't exist, create it
self._write_output('Creating new authenticator...')
# Build Gateway authenticator configuration
gateway_config = {
"name": authenticator_name,
"slug": authenticator_slug,
"type": authenticator_type,
"enabled": True,
"create_objects": True, # Allow Gateway to create users/orgs/teams
"remove_users": False, # Don't remove users by default
"configuration": {"KEY": key_value, "SECRET": secret_value},
}
# Add any additional configuration based on AWX settings
additional_config = self._build_additional_config(category, settings)
gateway_config["configuration"].update(additional_config)
# Create the authenticator
result = self.gateway_client.create_authenticator(gateway_config)
self._write_output(f'✓ Successfully created authenticator with ID: {result.get("id")}', 'success')
# Store the result for potential mapper creation later
config['gateway_authenticator_id'] = result.get('id')
config['gateway_authenticator'] = result
return True
except GatewayAPIError as e:
self._write_output(f'✗ Failed to create {category} authenticator: {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
self._write_output(f'✗ Unexpected error creating {category} authenticator: {str(e)}', 'error')
return False
def _create_ldap_authenticator(self, config, category, settings):
"""Create an LDAP authenticator in Gateway."""
# Extract the LDAP server URI as the identifier
server_uri = None
for setting_name, value in settings.items():
if setting_name.endswith('_SERVER_URI') and value:
server_uri = value
break
if not server_uri:
self._write_output(f'Skipping {category}: missing LDAP server URI', 'warning')
return False
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('_', '-').title()}"
authenticator_slug = self._generate_authenticator_slug('ldap', category, server_uri)
# Get LDAP authenticator type
authenticator_type = 'ansible_base.authentication.authenticator_plugins.ldap'
self._write_output(f'\n--- Processing {category} authenticator ---')
self._write_output(f'Name: {authenticator_name}')
self._write_output(f'Slug: {authenticator_slug}')
self._write_output(f'Type: {authenticator_type}')
self._write_output(f'Server URI: {server_uri}')
try:
# Check if authenticator already exists by slug
existing_authenticators = self.gateway_client.get_authenticators()
existing_authenticator = None
for auth in existing_authenticators:
if auth.get('slug') == authenticator_slug:
existing_authenticator = auth
break
if existing_authenticator:
# Authenticator already exists, use it
authenticator_id = existing_authenticator.get('id')
self._write_output(f'⚠ Authenticator already exists with ID: {authenticator_id}', 'warning')
# Store the existing result for mapper creation
config['gateway_authenticator_id'] = authenticator_id
config['gateway_authenticator'] = existing_authenticator
return True
else:
# Authenticator doesn't exist, create it
self._write_output('Creating new LDAP authenticator...')
# Build Gateway LDAP authenticator configuration
gateway_config = {
"name": authenticator_name,
"slug": authenticator_slug,
"type": authenticator_type,
"enabled": True,
"create_objects": True, # Allow Gateway to create users/orgs/teams
"remove_users": False, # Don't remove users by default
"configuration": self._build_ldap_configuration(settings),
}
# Create the authenticator
result = self.gateway_client.create_authenticator(gateway_config)
self._write_output(f'✓ Successfully created LDAP authenticator with ID: {result.get("id")}', 'success')
# Store the result for potential mapper creation later
config['gateway_authenticator_id'] = result.get('id')
config['gateway_authenticator'] = result
return True
except GatewayAPIError as e:
self._write_output(f'✗ Failed to create {category} authenticator: {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
self._write_output(f'✗ Unexpected error creating {category} authenticator: {str(e)}', 'error')
return False
def _create_saml_authenticator(self, config, category, settings):
"""Create a SAML authenticator in Gateway."""
# TODO: Implement SAML authenticator creation
# When implementing, use this pattern for slug generation:
# entity_id = settings.get('SOCIAL_AUTH_SAML_SP_ENTITY_ID', 'saml')
# authenticator_slug = self._generate_authenticator_slug('saml', category, entity_id)
# SAML requires complex configuration including:
# - SP entity ID, certificates, metadata
# - IdP configuration and metadata
# - Attribute mapping
self._write_output(f'SAML authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_google_oauth2_authenticator(self, config, category, settings):
"""Create a Google OAuth2 authenticator in Gateway."""
# TODO: Implement Google OAuth2 authenticator creation
# When implementing, use this pattern for slug generation:
# client_id = settings.get('SOCIAL_AUTH_GOOGLE_OAUTH2_KEY', 'google')
# authenticator_slug = self._generate_authenticator_slug('google_oauth2', category, client_id)
# Similar to GitHub OAuth2 but with Google-specific endpoints
# - Extract GOOGLE_OAUTH2_KEY and GOOGLE_OAUTH2_SECRET
# - Handle whitelisted domains/emails
# - Configure Google OAuth2 scope
self._write_output(f'Google OAuth2 authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_azure_ad_authenticator(self, config, category, settings):
"""Create an Azure AD authenticator in Gateway."""
# TODO: Implement Azure AD authenticator creation
# When implementing, use this pattern for slug generation:
# client_id = settings.get('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY', 'azure')
# authenticator_slug = self._generate_authenticator_slug('azure_ad', category, client_id)
# Azure AD requires:
# - Application ID and secret
# - Tenant ID (for tenant-specific auth)
# - Proper OAuth2 endpoints for Azure
self._write_output(f'Azure AD authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_radius_authenticator(self, config, category, settings):
"""Create a RADIUS authenticator in Gateway."""
# TODO: Implement RADIUS authenticator creation
# When implementing, use this pattern for slug generation:
# server_host = settings.get('RADIUS_SERVER', 'radius')
# authenticator_slug = self._generate_authenticator_slug('radius', category, server_host)
# RADIUS is a different authentication protocol than OAuth2/SAML
# - Server host and port
# - Shared secret
# - NAS identifier
# - Timeout and retry settings
self._write_output(f'RADIUS authenticator creation not yet implemented for {category}', 'warning')
return False
def _create_tacacs_plus_authenticator(self, config, category, settings):
"""Create a TACACS+ authenticator in Gateway."""
# TODO: Implement TACACS+ authenticator creation
# When implementing, use this pattern for slug generation:
# server_host = settings.get('TACACSPLUS_HOST', 'tacacs')
# authenticator_slug = self._generate_authenticator_slug('tacacs_plus', category, server_host)
# TACACS+ is a Cisco-developed authentication protocol
# - Server host and port
# - Shared secret
# - Authentication protocol settings
# - Session timeout
self._write_output(f'TACACS+ authenticator creation not yet implemented for {category}', 'warning')
return False
def _build_additional_config(self, category, settings):
"""Build additional configuration for specific authenticator types.
Args:
category: AWX category (github, github-org, etc.)
settings: AWX settings dictionary
Returns:
dict: Additional configuration parameters
"""
additional_config = {}
# Add scope configuration if present
for setting_name, value in settings.items():
if setting_name.endswith('_SCOPE') and value:
additional_config['SCOPE'] = value
break
# Add GitHub Enterprise URL if present
if 'enterprise' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_URL') and value:
additional_config['URL'] = value
elif setting_name.endswith('_API_URL') and value:
additional_config['API_URL'] = value
# Add organization name for org-specific authenticators
if 'org' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_NAME') and value:
additional_config['ORG_NAME'] = value
break
# Add team ID for team-specific authenticators
if 'team' in category:
for setting_name, value in settings.items():
if setting_name.endswith('_ID') and value:
additional_config['TEAM_ID'] = value
break
# Add OIDC endpoint for generic OIDC
if category == 'oidc':
for setting_name, value in settings.items():
if setting_name.endswith('_OIDC_ENDPOINT') and value:
additional_config['OIDC_ENDPOINT'] = value
elif setting_name.endswith('_VERIFY_SSL') and value is not None:
additional_config['VERIFY_SSL'] = value
return additional_config
def _build_ldap_configuration(self, settings):
"""Build LDAP configuration for Gateway from AWX settings.
Args:
settings: AWX LDAP settings dictionary
Returns:
dict: Gateway-compatible LDAP configuration
"""
config = {}
# Required settings
for setting_name, value in settings.items():
if setting_name.endswith('_SERVER_URI') and value:
# Gateway expects SERVER_URI as a list
config['SERVER_URI'] = [value] if isinstance(value, str) else value
elif setting_name.endswith('_BIND_DN') and value:
config['BIND_DN'] = value
elif setting_name.endswith('_BIND_PASSWORD') and value:
config['BIND_PASSWORD'] = value
elif setting_name.endswith('_START_TLS') and value is not None:
config['START_TLS'] = bool(value)
# User search configuration
for setting_name, value in settings.items():
if setting_name.endswith('_USER_SEARCH') and value:
# AWX stores USER_SEARCH as a tuple/list like (base_dn, scope, filter)
if isinstance(value, (list, tuple)) and len(value) >= 3:
config['USER_SEARCH'] = [value[0], value[1], value[2]]
# User attribute mapping
for setting_name, value in settings.items():
if setting_name.endswith('_USER_ATTR_MAP') and value:
config['USER_ATTR_MAP'] = value
# Group search configuration
for setting_name, value in settings.items():
if setting_name.endswith('_GROUP_SEARCH') and value:
# AWX stores GROUP_SEARCH as a tuple/list like (base_dn, scope, filter)
if isinstance(value, (list, tuple)) and len(value) >= 3:
config['GROUP_SEARCH'] = [value[0], value[1], value[2]]
# Group type configuration
for setting_name, value in settings.items():
if setting_name.endswith('_GROUP_TYPE') and value:
# Convert AWX group type class to string if needed
if hasattr(value, '__name__'):
config['GROUP_TYPE'] = value.__name__
else:
config['GROUP_TYPE'] = str(value)
elif setting_name.endswith('_GROUP_TYPE_PARAMS') and value:
config['GROUP_TYPE_PARAMS'] = value
# Connection options
for setting_name, value in settings.items():
if setting_name.endswith('_CONNECTION_OPTIONS') and value:
config['CONNECTION_OPTIONS'] = value
# Other LDAP settings
for setting_name, value in settings.items():
if setting_name.endswith('_USER_DN_TEMPLATE') and value:
config['USER_DN_TEMPLATE'] = value
elif setting_name.endswith('_REQUIRE_GROUP') and value:
config['REQUIRE_GROUP'] = value
elif setting_name.endswith('_DENY_GROUP') and value:
config['DENY_GROUP'] = value
elif setting_name.endswith('_USER_FLAGS_BY_GROUP') and value:
config['USER_FLAGS_BY_GROUP'] = value
return config
def _generate_authenticator_slug(self, auth_type, category, identifier):
"""Generate a deterministic slug for an authenticator.
Args:
auth_type: Type of authenticator ('github', 'ldap', etc.)
category: AWX category (github, github-org, ldap, etc.)
identifier: Unique identifier (client ID, server URI, etc.)
Returns:
str: Deterministic slug for the authenticator
"""
# Create a base string from the components
base_string = f"awx-{auth_type}-{category}-{identifier}"
# Clean the string: lowercase, replace non-alphanumeric with hyphens
cleaned = re.sub(r'[^a-zA-Z0-9]+', '-', base_string.lower())
# Remove leading/trailing hyphens and ensure no double hyphens
cleaned = re.sub(r'^-+|-+$', '', cleaned)
cleaned = re.sub(r'-+', '-', cleaned)
# Generate a hash of the cleaned string for consistent length
slug_hash = hashlib.md5(cleaned.encode('utf-8')).hexdigest()[:8]
# Combine type and hash for the final slug
final_slug = f"awx-{auth_type}-{slug_hash}"
return final_slug
def _create_gateway_mappers(self, config):
"""Create authenticator mappers in Gateway from AWX config.
Args:
config: AWX authenticator configuration dict with gateway_authenticator_id
Returns:
dict: Summary with 'created' and 'failed' counts
"""
authenticator_id = config.get('gateway_authenticator_id')
if not authenticator_id:
self._write_output(f'No authenticator ID found for {config["category"]}, skipping mappers', 'error')
return {'created': 0, 'failed': 0}
category = config['category']
org_mappers = config.get('org_mappers', [])
team_mappers = config.get('team_mappers', [])
total_mappers = len(org_mappers) + len(team_mappers)
if total_mappers == 0:
self._write_output(f'No mappers to create for {category} authenticator')
return {'created': 0, 'failed': 0}
self._write_output(f'\n--- Creating mappers for {category} authenticator (ID: {authenticator_id}) ---')
self._write_output(f'Organization mappers: {len(org_mappers)}')
self._write_output(f'Team mappers: {len(team_mappers)}')
created_count = 0
failed_count = 0
# Create organization mappers
for mapper in org_mappers:
if self._create_single_mapper(authenticator_id, mapper, 'organization'):
created_count += 1
else:
failed_count += 1
# Create team mappers
for mapper in team_mappers:
if self._create_single_mapper(authenticator_id, mapper, 'team'):
created_count += 1
else:
failed_count += 1
# Summary
self._write_output(f'Mappers created: {created_count}, failed: {failed_count}')
return {'created': created_count, 'failed': failed_count}
def _create_single_mapper(self, authenticator_id, mapper_config, mapper_type):
"""Create a single mapper in Gateway.
Args:
authenticator_id: ID of the authenticator to create mapper for
mapper_config: Mapper configuration dictionary
mapper_type: Type of mapper ('organization' or 'team')
Returns:
bool: True if mapper was created successfully, False otherwise
"""
try:
# Update the mapper config with the correct authenticator ID
mapper_config = mapper_config.copy() # Don't modify the original
mapper_config['authenticator'] = authenticator_id
# Create the mapper
self.gateway_client.create_authenticator_map(authenticator_id, mapper_config)
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✓ Created {mapper_type} mapper: {mapper_name}', 'success')
return True
except GatewayAPIError as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✗ Failed to create {mapper_type} mapper "{mapper_name}": {e.message}', 'error')
if e.response_data:
self._write_output(f' Details: {e.response_data}', 'error')
return False
except Exception as e:
mapper_name = mapper_config.get('name', 'Unknown')
self._write_output(f' ✗ Unexpected error creating {mapper_type} mapper "{mapper_name}": {str(e)}', 'error')
return False
def _write_output(self, message, style=None):
"""Write output message if command is available."""
if self.command:
if style == 'success':
self.command.stdout.write(self.command.style.SUCCESS(message))
elif style == 'warning':
self.command.stdout.write(self.command.style.WARNING(message))
elif style == 'error':
self.command.stdout.write(self.command.style.ERROR(message))
else:
self.command.stdout.write(message)

View File

@ -0,0 +1,307 @@
"""
Gateway API client for AAP Gateway interactions.
This module provides a client class to interact with the AAP Gateway REST API,
specifically for creating authenticators and mapping configurations.
"""
import requests
import logging
from typing import Dict, List, Optional, Any
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
class GatewayAPIError(Exception):
"""Exception raised for Gateway API errors."""
def __init__(self, message: str, status_code: Optional[int] = None, response_data: Optional[Dict] = None):
self.message = message
self.status_code = status_code
self.response_data = response_data
super().__init__(self.message)
class GatewayClient:
"""Client for AAP Gateway REST API interactions."""
def __init__(self, base_url: str, username: str, password: str, skip_verify: bool = False):
"""Initialize Gateway client.
Args:
base_url: Base URL of the AAP Gateway instance
username: Username for authentication
password: Password for authentication
skip_verify: Skip SSL certificate verification
"""
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.skip_verify = skip_verify
# Initialize session
self.session = requests.Session()
# Configure SSL verification
if skip_verify:
self.session.verify = False
# Disable SSL warnings when verification is disabled
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Set default headers
self.session.headers.update(
{
'User-Agent': 'AWX-Gateway-Migration-Client/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json',
}
)
# Authentication state
self._authenticated = False
def authenticate(self) -> bool:
"""Authenticate with the Gateway using HTTP Basic Authentication.
Returns:
bool: True if authentication successful, False otherwise
Raises:
GatewayAPIError: If authentication fails
"""
try:
# Set up HTTP Basic Authentication
from requests.auth import HTTPBasicAuth
self.session.auth = HTTPBasicAuth(self.username, self.password)
# Test authentication by making a simple request to the API
test_url = urljoin(self.base_url, '/api/gateway/v1/authenticators/')
response = self.session.get(test_url)
if response.status_code in [200, 401]: # 401 means auth is working but might need permissions
self._authenticated = True
logger.info("Successfully authenticated with Gateway using Basic Auth")
return True
else:
error_msg = f"Authentication test failed with status {response.status_code}"
try:
error_data = response.json()
error_msg += f": {error_data}"
except requests.exceptions.JSONDecodeError:
error_msg += f": {response.text}"
raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None)
except requests.RequestException as e:
raise GatewayAPIError(f"Network error during authentication: {str(e)}")
def _ensure_authenticated(self):
"""Ensure the client is authenticated, authenticate if needed."""
if not self._authenticated:
self.authenticate()
def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None) -> requests.Response:
"""Make an authenticated request to the Gateway API.
Args:
method: HTTP method (GET, POST, PUT, DELETE, etc.)
endpoint: API endpoint (without base URL)
data: JSON data to send in request body
params: Query parameters
Returns:
requests.Response: The response object
Raises:
GatewayAPIError: If request fails
"""
self._ensure_authenticated()
url = urljoin(self.base_url, endpoint.lstrip('/'))
try:
response = self.session.request(method=method.upper(), url=url, json=data, params=params)
# Log request details
logger.debug(f"{method.upper()} {url} - Status: {response.status_code}")
return response
except requests.RequestException as e:
raise GatewayAPIError(f"Request failed: {str(e)}")
def create_authenticator(self, authenticator_config: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new authenticator in Gateway.
Args:
authenticator_config: Authenticator configuration dictionary
Returns:
dict: Created authenticator data
Raises:
GatewayAPIError: If creation fails
"""
endpoint = '/api/gateway/v1/authenticators/'
try:
response = self._make_request('POST', endpoint, data=authenticator_config)
if response.status_code == 201:
result = response.json()
logger.info(f"Successfully created authenticator: {result.get('name', 'Unknown')}")
return result
else:
error_msg = f"Failed to create authenticator. Status: {response.status_code}"
try:
error_data = response.json()
error_msg += f", Error: {error_data}"
except requests.exceptions.JSONDecodeError:
error_msg += f", Response: {response.text}"
raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None)
except requests.RequestException as e:
raise GatewayAPIError(f"Failed to create authenticator: {str(e)}")
def create_authenticator_map(self, authenticator_id: int, mapper_config: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new authenticator map in Gateway.
Args:
authenticator_id: ID of the authenticator to create map for
mapper_config: Mapper configuration dictionary
Returns:
dict: Created mapper data
Raises:
GatewayAPIError: If creation fails
"""
endpoint = '/api/gateway/v1/authenticator_maps/'
try:
response = self._make_request('POST', endpoint, data=mapper_config)
if response.status_code == 201:
result = response.json()
logger.info(f"Successfully created authenticator map: {result.get('name', 'Unknown')}")
return result
else:
error_msg = f"Failed to create authenticator map. Status: {response.status_code}"
try:
error_data = response.json()
error_msg += f", Error: {error_data}"
except requests.exceptions.JSONDecodeError:
error_msg += f", Response: {response.text}"
raise GatewayAPIError(error_msg, response.status_code, response.json() if response.content else None)
except requests.RequestException as e:
raise GatewayAPIError(f"Failed to create authenticator map: {str(e)}")
def get_authenticators(self, params: Optional[Dict] = None) -> List[Dict[str, Any]]:
"""Get list of authenticators from Gateway.
Args:
params: Optional query parameters
Returns:
list: List of authenticator configurations
Raises:
GatewayAPIError: If request fails
"""
endpoint = '/api/gateway/v1/authenticators/'
try:
response = self._make_request('GET', endpoint, params=params)
if response.status_code == 200:
result = response.json()
# Handle paginated response
if isinstance(result, dict) and 'results' in result:
return result['results']
return result
else:
error_msg = f"Failed to get authenticators. Status: {response.status_code}"
raise GatewayAPIError(error_msg, response.status_code)
except requests.RequestException as e:
raise GatewayAPIError(f"Failed to get authenticators: {str(e)}")
def get_authenticator_maps(self, authenticator_id: int) -> List[Dict[str, Any]]:
"""Get list of maps for a specific authenticator.
Args:
authenticator_id: ID of the authenticator
Returns:
list: List of authenticator maps
Raises:
GatewayAPIError: If request fails
"""
endpoint = f'/api/gateway/v1/authenticators/{authenticator_id}/maps/'
try:
response = self._make_request('GET', endpoint)
if response.status_code == 200:
result = response.json()
# Handle paginated response
if isinstance(result, dict) and 'results' in result:
return result['results']
return result
else:
error_msg = f"Failed to get authenticator maps. Status: {response.status_code}"
raise GatewayAPIError(error_msg, response.status_code)
except requests.RequestException as e:
raise GatewayAPIError(f"Failed to get authenticator maps: {str(e)}")
def create_github_authenticator(
self, name: str, client_id: str, client_secret: str, enabled: bool = True, create_objects: bool = False, remove_users: bool = False
) -> Dict[str, Any]:
"""Create a GitHub authenticator with the specified configuration.
Args:
name: Name for the authenticator
client_id: GitHub OAuth App Client ID
client_secret: GitHub OAuth App Client Secret
enabled: Whether authenticator should be enabled
create_objects: Whether to create users/orgs/teams automatically
remove_users: Whether to remove users when they lose access
Returns:
dict: Created authenticator data
"""
config = {
"name": name,
"type": "ansible_base.authentication.authenticator_plugins.github",
"enabled": enabled,
"create_objects": create_objects,
"remove_users": remove_users,
"configuration": {"KEY": client_id, "SECRET": client_secret},
}
return self.create_authenticator(config)
def close(self):
"""Close the session and clean up resources."""
if self.session:
self.session.close()
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()

View File

@ -0,0 +1,165 @@
"""
Gateway mapping conversion utilities.
This module contains functions to convert AWX authentication mappings
(organization and team mappings) to AAP Gateway format.
"""
def team_map_to_gateway_format(team_map, start_order=1):
"""Convert AWX team mapping to Gateway authenticator format.
Args:
team_map: The SOCIAL_AUTH_*_TEAM_MAP setting value
start_order: Starting order value for the mappers
Returns:
tuple: (List of Gateway-compatible team mappers, next_order)
"""
if team_map is None:
return [], start_order
result = []
order = start_order
for team_name in team_map.keys():
team = team_map[team_name]
# TODO: Confirm that if we have None with remove we still won't remove
if team['users'] is None:
continue
if team['users'] is False:
triggers = {"never": {}}
elif team['users'] is True:
triggers = {"always": {}}
else:
import re
# Handle the case where the value itself is a regex pattern
if isinstance(team['users'], re.Pattern):
# Convert single regex pattern to string in a list
triggers = {"groups": {"has_or": [str(team['users'])]}}
else:
# Handle list or string values
if type(team['users']) is str:
team['users'] = [team['users']]
# Convert any non-string items to strings (e.g., regex patterns)
users_list = []
for user in team['users']:
if isinstance(user, str):
users_list.append(user)
elif isinstance(user, re.Pattern):
# Convert regex patterns to string representation
users_list.append(str(user.pattern))
else:
# Convert other objects to string representation
users_list.append(str(user))
triggers = {"groups": {"has_or": users_list}}
organization_name = team.get('organization', 'Unknown')
# Check for remove flag
revoke = team.get('remove', False)
result.append(
{
"name": f"{organization_name} - {team_name}",
"map_type": "team",
"order": order,
"authenticator": -1, # Will be updated when creating the mapper
"triggers": triggers,
"organization": organization_name,
"team": team_name,
"role": "Team Member", # Gateway team member role
"revoke": revoke,
}
)
order += 1
return result, order
def org_map_to_gateway_format(org_map, start_order=1):
"""Convert AWX organization mapping to Gateway authenticator format.
Args:
org_map: The SOCIAL_AUTH_*_ORGANIZATION_MAP setting value
start_order: Starting order value for the mappers
Returns:
tuple: (List of Gateway-compatible organization mappers, next_order)
"""
if org_map is None:
return [], start_order
result = []
order = start_order
for organization_name in org_map.keys():
organization = org_map[organization_name]
for user_type in ['admins', 'users']:
if user_type in organization:
# TODO: Confirm that if we have None with remove we still won't remove
if organization[user_type] is None:
continue
if organization[user_type] is False:
triggers = {"never": {}}
elif organization[user_type] is True:
triggers = {"always": {}}
else:
import re
# Handle the case where the value itself is a regex pattern
if isinstance(organization[user_type], re.Pattern):
# Convert single regex pattern to string in a list
triggers = {"groups": {"has_or": [str(organization[user_type])]}}
else:
# Handle list or string values
if type(organization[user_type]) is str:
organization[user_type] = [organization[user_type]]
# Convert any non-string items to strings (e.g., regex patterns)
users_list = []
for user in organization[user_type]:
if isinstance(user, str):
users_list.append(user)
elif isinstance(user, re.Pattern):
# Convert regex patterns to string representation
users_list.append(str(user.pattern))
else:
# Convert other objects to string representation
users_list.append(str(user))
triggers = {"groups": {"has_or": users_list}}
team_name = f"Organization {user_type.title()}"
# Map AWX admin/users to appropriate Gateway organization roles
role = "Organization Admin" if user_type == "admins" else "Organization Member"
# Check for remove flags
revoke = False
if user_type == "admins" and organization.get("remove_admins"):
revoke = True
elif user_type == "users" and organization.get("remove_users"):
revoke = True
result.append(
{
"name": f"{organization_name} - {team_name}",
"map_type": "organization",
"order": order,
"authenticator": -1, # Will be updated when creating the mapper
"triggers": triggers,
"organization": organization_name,
"team": None, # Organization-level mapping, not team-specific
"role": role,
"revoke": revoke,
}
)
order += 1
return result, order

View File

View File

@ -0,0 +1,269 @@
from django.conf import settings
from awx.conf import settings_registry
from awx.main.utils.gateway_mapping import org_map_to_gateway_format, team_map_to_gateway_format
class AuthConfigMigrator:
"""
Handles the migration of authenticators from AWX to Gateway
TODO: this is a work in progress
"""
"""
Export all GitHub OIDC authenticators. An OIDC authenticator is only exported if both,
id and secret, are defined. Otherwise it will be skipped.
Returns:
list: List of configured GitHub authentication providers with their settings
"""
def get_github_oidc_config(self):
github_categories = ['github', 'github-org', 'github-team', 'github-enterprise', 'github-enterprise-org', 'github-enterprise-team', 'oidc']
found_configs = []
for category in github_categories:
try:
category_settings = settings_registry.get_registered_settings(category_slug=category)
if category_settings:
config_data = {}
key_setting = None
secret_setting = None
# Ensure category_settings is iterable and contains strings
import re
if isinstance(category_settings, re.Pattern) or not hasattr(category_settings, '__iter__') or isinstance(category_settings, str):
continue
for setting_name in category_settings:
# Skip if setting_name is not a string (e.g., regex pattern)
if not isinstance(setting_name, str):
continue
if setting_name.endswith('_KEY'):
key_setting = setting_name
elif setting_name.endswith('_SECRET'):
secret_setting = setting_name
# Skip this category if KEY or SECRET is missing or empty
if not key_setting or not secret_setting:
continue
key_value = getattr(settings, key_setting, None)
secret_value = getattr(settings, secret_setting, None)
# Skip this category if OIDC Key and/or Secret are not configured
if not key_value or not secret_value:
continue
# If we have both key and secret, collect all settings
org_map_value = None
team_map_value = None
for setting_name in category_settings:
# Skip if setting_name is not a string (e.g., regex pattern)
if not isinstance(setting_name, str):
continue
value = getattr(settings, setting_name, None)
config_data[setting_name] = value
# Capture org and team map values for special processing
if setting_name.endswith('_ORGANIZATION_MAP'):
org_map_value = value
elif setting_name.endswith('_TEAM_MAP'):
team_map_value = value
# Convert GitHub org and team mappings from AWX to the Gateway format
# Start with order 1 and maintain sequence across both org and team mappers
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=1)
team_mappers, _ = team_map_to_gateway_format(team_map_value, start_order=next_order)
found_configs.append({'category': category, 'settings': config_data, 'org_mappers': org_mappers, 'team_mappers': team_mappers})
except Exception as e:
raise Exception(f'Could not retrieve {category} settings: {str(e)}')
return found_configs
def get_ldap_config(self):
"""
Export all LDAP authenticators. An LDAP authenticator is only exported if
SERVER_URI is configured. Otherwise it will be skipped.
Returns:
list: List of configured LDAP authentication providers with their settings
"""
found_configs = []
# AWX supports up to 6 LDAP configurations: AUTH_LDAP (default) and AUTH_LDAP_1 through AUTH_LDAP_5
ldap_instances = [''] + [f'_{i}' for i in range(1, 6)] # ['', '_1', '_2', '_3', '_4', '_5']
for instance in ldap_instances:
try:
# Build the setting prefix (AUTH_LDAP or AUTH_LDAP_1, etc.)
prefix = f'AUTH_LDAP{instance}'
# Check if this LDAP instance is configured by looking for SERVER_URI
server_uri_setting = f'{prefix}_SERVER_URI'
server_uri = getattr(settings, server_uri_setting, None)
# Skip this instance if SERVER_URI is not configured or empty
if not server_uri or server_uri.strip() == '':
continue
config_data = {}
org_map_value = None
team_map_value = None
# Define all LDAP settings we want to collect
ldap_settings = [
'SERVER_URI',
'BIND_DN',
'BIND_PASSWORD',
'START_TLS',
'CONNECTION_OPTIONS',
'USER_SEARCH',
'USER_DN_TEMPLATE',
'USER_ATTR_MAP',
'GROUP_SEARCH',
'GROUP_TYPE',
'GROUP_TYPE_PARAMS',
'REQUIRE_GROUP',
'DENY_GROUP',
'USER_FLAGS_BY_GROUP',
'ORGANIZATION_MAP',
'TEAM_MAP',
]
# Collect all settings for this LDAP instance
for setting in ldap_settings:
setting_name = f'{prefix}_{setting}'
value = getattr(settings, setting_name, None)
config_data[setting_name] = value
# Capture org and team map values for special processing
if setting == 'ORGANIZATION_MAP':
org_map_value = value
elif setting == 'TEAM_MAP':
team_map_value = value
# Convert LDAP org and team mappings from AWX to the Gateway format
# Start with order 1 and maintain sequence across both org and team mappers
org_mappers, next_order = org_map_to_gateway_format(org_map_value, start_order=1)
team_mappers, _ = team_map_to_gateway_format(team_map_value, start_order=next_order)
# Determine instance name for identification
instance_name = 'ldap' if instance == '' else f'ldap{instance}'
found_configs.append({'category': instance_name, 'settings': config_data, 'org_mappers': org_mappers, 'team_mappers': team_mappers})
except Exception as e:
instance_name = 'ldap' if instance == '' else f'ldap{instance}'
raise Exception(f'Could not retrieve {instance_name} settings: {str(e)}')
return found_configs
def get_saml_config(self):
"""
Export SAML authenticators. A SAML authenticator is only exported if
required configuration is present.
Returns:
list: List of configured SAML authentication providers with their settings
"""
# TODO: Implement SAML configuration retrieval
# SAML settings typically include:
# - SOCIAL_AUTH_SAML_SP_ENTITY_ID
# - SOCIAL_AUTH_SAML_SP_PUBLIC_CERT
# - SOCIAL_AUTH_SAML_SP_PRIVATE_KEY
# - SOCIAL_AUTH_SAML_ORG_INFO
# - SOCIAL_AUTH_SAML_TECHNICAL_CONTACT
# - SOCIAL_AUTH_SAML_SUPPORT_CONTACT
# - SOCIAL_AUTH_SAML_ENABLED_IDPS
# - SOCIAL_AUTH_SAML_ORGANIZATION_MAP
# - SOCIAL_AUTH_SAML_TEAM_MAP
found_configs = []
return found_configs
def get_google_oauth2_config(self):
"""
Export Google OAuth2 authenticators. A Google OAuth2 authenticator is only exported if
KEY and SECRET are configured.
Returns:
list: List of configured Google OAuth2 authentication providers with their settings
"""
# TODO: Implement Google OAuth2 configuration retrieval
# Google OAuth2 settings typically include:
# - SOCIAL_AUTH_GOOGLE_OAUTH2_KEY
# - SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET
# - SOCIAL_AUTH_GOOGLE_OAUTH2_SCOPE
# - SOCIAL_AUTH_GOOGLE_OAUTH2_WHITELISTED_DOMAINS
# - SOCIAL_AUTH_GOOGLE_OAUTH2_WHITELISTED_EMAILS
# - SOCIAL_AUTH_GOOGLE_OAUTH2_ORGANIZATION_MAP
# - SOCIAL_AUTH_GOOGLE_OAUTH2_TEAM_MAP
found_configs = []
return found_configs
def get_azure_ad_config(self):
"""
Export Azure AD authenticators. An Azure AD authenticator is only exported if
KEY and SECRET are configured.
Returns:
list: List of configured Azure AD authentication providers with their settings
"""
# TODO: Implement Azure AD configuration retrieval
# Azure AD settings typically include:
# - SOCIAL_AUTH_AZUREAD_OAUTH2_KEY
# - SOCIAL_AUTH_AZUREAD_OAUTH2_SECRET
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_KEY
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_SECRET
# - SOCIAL_AUTH_AZUREAD_TENANT_OAUTH2_TENANT_ID
# - SOCIAL_AUTH_AZUREAD_OAUTH2_ORGANIZATION_MAP
# - SOCIAL_AUTH_AZUREAD_OAUTH2_TEAM_MAP
found_configs = []
return found_configs
def get_radius_config(self):
"""
Export RADIUS authenticators. A RADIUS authenticator is only exported if
server configuration is present.
Returns:
list: List of configured RADIUS authentication providers with their settings
"""
# TODO: Implement RADIUS configuration retrieval
# RADIUS settings typically include:
# - RADIUS_SERVER
# - RADIUS_PORT
# - RADIUS_SECRET
# - RADIUS_NASID
# - RADIUS_TIMEOUT
# - RADIUS_RETRIES
# - RADIUS_GROUP_TYPE
# - RADIUS_GROUP_TYPE_PARAMS
# - RADIUS_ORGANIZATION_MAP
# - RADIUS_TEAM_MAP
found_configs = []
return found_configs
def get_tacacs_plus_config(self):
"""
Export TACACS+ authenticators. A TACACS+ authenticator is only exported if
server configuration is present.
Returns:
list: List of configured TACACS+ authentication providers with their settings
"""
# TODO: Implement TACACS+ configuration retrieval
# TACACS+ settings typically include:
# - TACACSPLUS_HOST
# - TACACSPLUS_PORT
# - TACACSPLUS_SECRET
# - TACACSPLUS_SESSION_TIMEOUT
# - TACACSPLUS_AUTH_PROTOCOL
# - TACACSPLUS_ORGANIZATION_MAP
# - TACACSPLUS_TEAM_MAP
found_configs = []
return found_configs