feat: AAP-48499 TACACS+ authenticator migrator (#7014)

* feat: AAP-48499 TACACS+ authenticator migrator

Issue: AAP 48499

* enable by default
This commit is contained in:
Bruno Cesar Rocha 2025-07-16 19:07:07 +01:00 committed by thedoubl3j
parent abc4692231
commit c4a6b28b87
No known key found for this signature in database
GPG Key ID: E84C42ACF75B0768
4 changed files with 145 additions and 0 deletions

View File

@ -8,6 +8,7 @@ from awx.sso.utils.ldap_migrator import LDAPMigrator
from awx.sso.utils.oidc_migrator import OIDCMigrator
from awx.sso.utils.saml_migrator import SAMLMigrator
from awx.sso.utils.radius_migrator import RADIUSMigrator
from awx.sso.utils.tacacs_migrator import TACACSMigrator
from awx.main.utils.gateway_client import GatewayClient, GatewayAPIError
@ -20,6 +21,7 @@ class Command(BaseCommand):
parser.add_argument('--skip-ad', action='store_true', help='Skip importing Azure AD authenticator')
parser.add_argument('--skip-saml', action='store_true', help='Skip importing SAML authenticator')
parser.add_argument('--skip-radius', action='store_true', help='Skip importing RADIUS authenticator')
parser.add_argument('--skip-tacacs', action='store_true', help='Skip importing TACACS+ authenticator')
parser.add_argument('--force', action='store_true', help='Force migration even if configurations already exist')
def handle(self, *args, **options):
@ -34,6 +36,7 @@ class Command(BaseCommand):
skip_ad = options['skip_ad']
skip_saml = options['skip_saml']
skip_radius = options['skip_radius']
skip_tacacs = options['skip_tacacs']
force = options['force']
# If the management command isn't called with all parameters needed to talk to Gateway, consider
@ -77,6 +80,9 @@ class Command(BaseCommand):
if not skip_radius:
migrators.append(RADIUSMigrator(gateway_client, self, force=force))
if not skip_tacacs:
migrators.append(TACACSMigrator(gateway_client, self, force=force))
# Run migrations
total_results = {
'created': 0,

View File

@ -69,3 +69,13 @@ def test_saml_config(settings):
"x509cert": "A" * 64 + "B" * 64 + "C" * 23,
}
}
@pytest.fixture
def test_tacacs_config(settings):
settings.TACACSPLUS_HOST = "tacacshost"
settings.TACACSPLUS_PORT = 49
settings.TACACSPLUS_SECRET = "secret"
settings.TACACSPLUS_SESSION_TIMEOUT = 10
settings.TACACSPLUS_AUTH_PROTOCOL = "pap"
settings.TACACSPLUS_REM_ADDR = True

View File

@ -0,0 +1,37 @@
import pytest
from unittest.mock import MagicMock
from awx.sso.utils.tacacs_migrator import TACACSMigrator
@pytest.mark.django_db
def test_get_controller_config(test_tacacs_config):
gateway_client = MagicMock()
command_obj = MagicMock()
obj = TACACSMigrator(gateway_client, command_obj)
result = obj.get_controller_config()
assert len(result) == 1
config = result[0]
assert config['category'] == 'TACACS+'
settings_data = config['settings']
assert settings_data['name'] == 'default'
assert settings_data['type'] == 'ansible_base.authentication.authenticator_plugins.tacacs'
configuration = settings_data['configuration']
assert configuration['HOST'] == 'tacacshost'
assert configuration['PORT'] == 49
assert configuration['SECRET'] == 'secret'
assert configuration['SESSION_TIMEOUT'] == 10
assert configuration['AUTH_PROTOCOL'] == 'pap'
assert configuration['REM_ADDR'] is True
@pytest.mark.django_db
def test_get_controller_config_no_host(settings):
settings.TACACSPLUS_HOST = ""
gateway_client = MagicMock()
command_obj = MagicMock()
obj = TACACSMigrator(gateway_client, command_obj)
result = obj.get_controller_config()
assert len(result) == 0

View File

@ -0,0 +1,92 @@
"""
TACACS+ authenticator migrator.
This module handles the migration of TACACS+ authenticators from AWX to Gateway.
"""
from django.conf import settings
from awx.sso.utils.base_migrator import BaseAuthenticatorMigrator
class TACACSMigrator(BaseAuthenticatorMigrator):
"""
Handles the migration of TACACS+ authenticators from AWX to Gateway.
"""
CATEGORY = "TACACS+"
AUTH_TYPE = "ansible_base.authentication.authenticator_plugins.tacacs"
def get_authenticator_type(self):
"""Get the human-readable authenticator type name."""
return "TACACS+"
def get_controller_config(self):
"""
Export TACACS+ authenticator. A TACACS+ authenticator is only exported if
required configuration is present.
Returns:
list: List of configured TACACS+ authentication providers with their settings
"""
host = getattr(settings, "TACACSPLUS_HOST", None)
if not host:
return []
port = getattr(settings, "TACACSPLUS_PORT", 49)
secret = getattr(settings, "TACACSPLUS_SECRET", "")
session_timeout = getattr(settings, "TACACSPLUS_SESSION_TIMEOUT", 5)
auth_protocol = getattr(settings, "TACACSPLUS_AUTH_PROTOCOL", "ascii")
rem_addr = getattr(settings, "TACACSPLUS_REM_ADDR", False)
config_data = {
"name": "default",
"type": self.AUTH_TYPE,
"enabled": True,
"create_objects": True,
"remove_users": False,
"configuration": {
"HOST": host,
"PORT": port,
"SECRET": secret,
"SESSION_TIMEOUT": session_timeout,
"AUTH_PROTOCOL": auth_protocol,
"REM_ADDR": rem_addr,
},
}
return [
{
"category": self.CATEGORY,
"settings": config_data,
}
]
def create_gateway_authenticator(self, config):
"""Create a TACACS+ authenticator in Gateway."""
category = config["category"]
config_settings = config["settings"]
name = config_settings["name"]
# Generate authenticator name and slug
authenticator_name = f"AWX-{category.replace('+', 'plus').replace('-', '_').title()}-{name}"
authenticator_slug = self._generate_authenticator_slug("tacacsplus", category)
self._write_output(f"\n--- Processing {category} authenticator ---")
self._write_output(f"Name: {authenticator_name}")
self._write_output(f"Slug: {authenticator_slug}")
self._write_output(f"Type: {config_settings['type']}")
# Build Gateway authenticator configuration
gateway_config = {
"name": authenticator_name,
"slug": authenticator_slug,
"type": config_settings["type"],
"enabled": config_settings["enabled"],
"create_objects": config_settings["create_objects"],
"remove_users": config_settings["remove_users"],
"configuration": config_settings["configuration"],
}
# Submit the authenticator (create or update as needed)
return self.submit_authenticator(gateway_config, config=config)