Aap 49709 - settings migration (#7023)

* migrate settings using the existing authenticator framework

* add method to get settings value to gateway client

* add transformer functions for settings

* Switched back to PUT for settings updates

* Started wiring in testing changes

* Added settings_* aggregation results.  Added skip-github option.  Added tests.

Assisted-by: Cursor

* Added --skip-all-authenticators command line argument.  Added GoogleOAuth testing.  Added tests for skipping all authenticators.

Assisted-by: Cursor

* wip: migrate other missing settings

* update login_redirect_override in google_oauth2

* impement login redirect for azuread

* implement login redirect for github

* implement login redirect for saml

* set LOGIN_REDIRECT_OVERRIDE even if no authenticator matched

* extract logic for login redirect override to base class

* use urlparse to compare valid redirect urls

* Preserve the original query parameters

* Fix flake8 issues

* Preserve the query parameter in sso_login_url

Gateway sets the sso_login_url to

/api/gateway/social/login/aap-saml-keycloak/?idp=IdP

The idp needs to be preserved when creating the redirect

* Update awx/main/utils/gateway_client.py

Co-authored-by: Chris Meyers <chrismeyersfsu@users.noreply.github.com>

* Update awx/main/management/commands/import_auth_config_to_gateway.py

Co-authored-by: Chris Meyers <chrismeyersfsu@users.noreply.github.com>

* list of settings updated

* Update awx/main/utils/gateway_client.py

Co-authored-by: Chris Meyers <chrismeyersfsu@users.noreply.github.com>

* Update awx/sso/utils/base_migrator.py

Co-authored-by: Chris Meyers <chrismeyersfsu@users.noreply.github.com>

* fix tests

---------

Co-authored-by: Andrew Potozniak <potozniak@redhat.com>
Co-authored-by: Madhu Kanoor <mkanoor@redhat.com>
Co-authored-by: Chris Meyers <chrismeyersfsu@users.noreply.github.com>
This commit is contained in:
Peter Braun
2025-07-28 22:15:59 +02:00
committed by thedoubl3j
parent 58e237a09a
commit 44c53b02ae
10 changed files with 960 additions and 43 deletions

View File

@@ -14,26 +14,32 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
def options_basic_auth_full_send(self):
return {
'basic_auth': True,
'skip_all_authenticators': False,
'skip_oidc': False,
'skip_github': False,
'skip_ldap': False,
'skip_ad': False,
'skip_saml': False,
'skip_radius': False,
'skip_tacacs': False,
'skip_google': False,
'skip_settings': False,
'force': False,
}
def options_basic_auth_skip_all(self):
def options_basic_auth_skip_all_individual(self):
return {
'basic_auth': True,
'skip_all_authenticators': False,
'skip_oidc': True,
'skip_github': True,
'skip_ldap': True,
'skip_ad': True,
'skip_saml': True,
'skip_radius': True,
'skip_tacacs': True,
'skip_google': True,
'skip_settings': True,
'force': False,
}
@@ -43,7 +49,7 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
return options
def options_svc_token_skip_all(self):
options = self.options_basic_auth_skip_all()
options = self.options_basic_auth_skip_all_individual()
options['basic_auth'] = False
return options
@@ -58,6 +64,10 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
mappers_created=0,
mappers_updated=0,
mappers_failed=0,
settings_created=0,
settings_updated=0,
settings_unchanged=0,
settings_failed=0,
):
"""Helper method to create a mock migrator with specified return values."""
mock_migrator = Mock()
@@ -81,14 +91,25 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
expected_calls = [
call('--basic-auth', action='store_true', help='Use HTTP Basic Authentication between Controller and Gateway'),
call('--skip-oidc', action='store_true', help='Skip importing GitHub and generic OIDC authenticators'),
call(
'--skip-all-authenticators',
action='store_true',
help='Skip importing all authenticators [GitHub, OIDC, SAML, Azure AD, LDAP, RADIUS, TACACS+, Google OAuth2]',
),
call('--skip-oidc', action='store_true', help='Skip importing generic OIDC authenticators'),
call('--skip-github', action='store_true', help='Skip importing GitHub authenticator'),
call('--skip-ldap', action='store_true', help='Skip importing LDAP authenticators'),
call('--skip-ad', action='store_true', help='Skip importing Azure AD authenticator'),
call('--skip-saml', action='store_true', help='Skip importing SAML authenticator'),
call('--skip-radius', action='store_true', help='Skip importing RADIUS authenticator'),
call('--skip-tacacs', action='store_true', help='Skip importing TACACS+ authenticator'),
call('--skip-google', action='store_true', help='Skip importing Google OAuth2 authenticator'),
call('--force', action='store_true', help='Force migration even if configurations already exist'),
call('--skip-settings', action='store_true', help='Skip importing settings'),
call(
'--force',
action='store_true',
help='Force migration even if configurations already exist. Does not apply to skipped authenticators nor skipped settings.',
),
]
parser.add_argument.assert_has_calls(expected_calls, any_order=True)
@@ -113,6 +134,7 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
os.environ,
{'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass', 'GATEWAY_SKIP_VERIFY': 'true'},
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.SettingsMigrator')
@patch.multiple(
'awx.main.management.commands.import_auth_config_to_gateway',
GitHubMigrator=DEFAULT,
@@ -122,10 +144,11 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
LDAPMigrator=DEFAULT,
RADIUSMigrator=DEFAULT,
TACACSMigrator=DEFAULT,
GoogleOAuth2Migrator=DEFAULT,
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_handle_basic_auth_success(self, mock_stdout, mock_gateway_client, **mock_migrators):
def test_handle_basic_auth_success(self, mock_stdout, mock_gateway_client, mock_settings_migrator, **mock_migrators):
"""Test successful execution with basic auth."""
# Mock gateway client context manager
mock_client_instance = Mock()
@@ -135,6 +158,8 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
for mock_migrator_class in mock_migrators.values():
self.create_mock_migrator(mock_migrator_class, created=1, mappers_created=2)
self.create_mock_migrator(mock_settings_migrator, settings_created=1, settings_updated=0, settings_unchanged=2, settings_failed=0)
with patch.object(self.command, 'stdout', mock_stdout):
self.command.handle(**self.options_basic_auth_full_send())
@@ -147,12 +172,17 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
for mock_migrator in mock_migrators.values():
mock_migrator.assert_called_once_with(mock_client_instance, self.command, force=False)
mock_settings_migrator.assert_called_once_with(mock_client_instance, self.command, force=False)
# Verify output contains success messages
output = mock_stdout.getvalue()
self.assertIn('HTTP Basic Auth: true', output)
self.assertIn('Successfully connected to Gateway', output)
self.assertIn('Migration Summary', output)
self.assertIn('authenticators', output)
self.assertIn('mappers', output)
self.assertIn('settings', output)
@patch('awx.main.management.commands.import_auth_config_to_gateway.create_api_client')
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClientSVCToken')
@@ -211,10 +241,12 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
LDAPMigrator=DEFAULT,
RADIUSMigrator=DEFAULT,
TACACSMigrator=DEFAULT,
GoogleOAuth2Migrator=DEFAULT,
SettingsMigrator=DEFAULT,
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_skip_flags_prevent_migrator_creation(self, mock_stdout, mock_gateway_client, **mock_migrators):
def test_skip_flags_prevent_authenticator_individual_and_settings_migration(self, mock_stdout, mock_gateway_client, **mock_migrators):
"""Test that skip flags prevent corresponding migrators from being created."""
# Mock gateway client context manager
@@ -223,7 +255,7 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
mock_gateway_client.return_value.__exit__.return_value = None
with patch.object(self.command, 'stdout', mock_stdout):
self.command.handle(**self.options_basic_auth_skip_all())
self.command.handle(**self.options_basic_auth_skip_all_individual())
# Verify no migrators were created
for mock_migrator in mock_migrators.values():
@@ -232,6 +264,46 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
# Verify warning message about no configurations
output = mock_stdout.getvalue()
self.assertIn('No authentication configurations found to migrate.', output)
self.assertIn('Settings migration will not execute.', output)
self.assertIn('NO MIGRATIONS WILL EXECUTE.', output)
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch.multiple(
'awx.main.management.commands.import_auth_config_to_gateway',
GitHubMigrator=DEFAULT,
OIDCMigrator=DEFAULT,
SAMLMigrator=DEFAULT,
AzureADMigrator=DEFAULT,
LDAPMigrator=DEFAULT,
RADIUSMigrator=DEFAULT,
TACACSMigrator=DEFAULT,
GoogleOAuth2Migrator=DEFAULT,
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_skip_flags_prevent_authenticator_migration(self, mock_stdout, mock_gateway_client, **mock_migrators):
"""Test that skip flags prevent corresponding migrators from being created."""
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client.return_value.__exit__.return_value = None
options = self.options_basic_auth_full_send()
options['skip_all_authenticators'] = True
with patch.object(self.command, 'stdout', mock_stdout):
self.command.handle(**options)
# Verify no migrators were created
for mock_migrator in mock_migrators.values():
mock_migrator.assert_not_called()
# Verify warning message about no configurations
output = mock_stdout.getvalue()
self.assertIn('No authentication configurations found to migrate.', output)
self.assertNotIn('Settings migration will not execute.', output)
self.assertNotIn('NO MIGRATIONS WILL EXECUTE.', output)
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@@ -268,8 +340,9 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('awx.main.management.commands.import_auth_config_to_gateway.GitHubMigrator')
@patch('awx.main.management.commands.import_auth_config_to_gateway.SettingsMigrator')
@patch('sys.stdout', new_callable=StringIO)
def test_force_flag_passed_to_migrators(self, mock_stdout, mock_github, mock_gateway_client):
def test_force_flag_passed_to_migrators(self, mock_stdout, mock_github, mock_settings_migrator, mock_gateway_client):
"""Test that force flag is properly passed to migrators."""
# Mock gateway client context manager
mock_client_instance = Mock()
@@ -278,10 +351,14 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
# Mock migrator
self.create_mock_migrator(mock_github, authenticator_type="GitHub", created=0, mappers_created=2)
self.create_mock_migrator(
mock_settings_migrator, authenticator_type="Settings", settings_created=0, settings_updated=2, settings_unchanged=0, settings_failed=0
)
options = self.options_basic_auth_skip_all()
options = self.options_basic_auth_skip_all_individual()
options['force'] = True
options['skip_oidc'] = False
options['skip_github'] = False
options['skip_settings'] = False
with patch.object(self.command, 'stdout', mock_stdout):
self.command.handle(**options)
@@ -289,6 +366,9 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
# Verify migrator was created with force=True
mock_github.assert_called_once_with(mock_client_instance, self.command, force=True)
# Verify settings migrator was created with force=True
mock_settings_migrator.assert_called_once_with(mock_client_instance, self.command, force=True)
@patch('sys.stdout', new_callable=StringIO)
def test_print_export_summary(self, mock_stdout):
"""Test the _print_export_summary method."""
@@ -315,6 +395,26 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
self.assertIn('Mappers updated: 2', output)
self.assertIn('Mappers failed: 1', output)
@patch('sys.stdout', new_callable=StringIO)
def test_print_export_summary_settings(self, mock_stdout):
"""Test the _print_export_summary method."""
result = {
'settings_created': 2,
'settings_updated': 1,
'settings_unchanged': 3,
'settings_failed': 0,
}
with patch.object(self.command, 'stdout', mock_stdout):
self.command._print_export_summary('Settings', result)
output = mock_stdout.getvalue()
self.assertIn('--- Settings Export Summary ---', output)
self.assertIn('Settings created: 2', output)
self.assertIn('Settings updated: 1', output)
self.assertIn('Settings unchanged: 3', output)
self.assertIn('Settings failed: 0', output)
@patch('sys.stdout', new_callable=StringIO)
def test_print_export_summary_missing_keys(self, mock_stdout):
"""Test _print_export_summary handles missing keys gracefully."""
@@ -350,7 +450,7 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
self.create_mock_migrator(mock_github, authenticator_type="GitHub", created=1, mappers_created=2)
self.create_mock_migrator(mock_oidc, authenticator_type="OIDC", created=0, updated=1, unchanged=1, mappers_created=1, mappers_updated=1)
options = self.options_basic_auth_skip_all()
options = self.options_basic_auth_skip_all_individual()
options['skip_oidc'] = False
options['skip_github'] = False
@@ -401,7 +501,7 @@ class TestImportAuthConfigToGatewayCommand(TestCase):
mock_gateway_client.return_value.__exit__.return_value = None
with patch.object(self.command, 'stdout', mock_stdout):
self.command.handle(**self.options_basic_auth_skip_all())
self.command.handle(**self.options_basic_auth_skip_all_individual())
# Verify gateway client was called with correct skip_verify value
mock_gateway_client.assert_called_once_with(

View File

@@ -869,3 +869,371 @@ class TestSocialAuthMapFunctions:
result = self.migrator.get_social_team_map(setting_name)
assert result == {'test_team': {'organization': 'test_org'}}
class TestHandleLoginOverride:
"""Tests for handle_login_override method."""
def setup_method(self):
"""Set up test fixtures."""
self.gateway_client = Mock()
self.command = Mock()
self.migrator = BaseAuthenticatorMigrator(self.gateway_client, self.command)
# Reset the class-level flag before each test
BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator = False
def test_handle_login_override_no_login_redirect_override(self):
"""Test that method returns early when no login_redirect_override is provided."""
config = {}
valid_login_urls = ['/sso/login/github']
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_empty_login_redirect_override(self):
"""Test that method returns early when login_redirect_override is empty."""
config = {'login_redirect_override': ''}
valid_login_urls = ['/sso/login/github']
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_no_url_match(self):
"""Test that method returns early when login_redirect_override doesn't match valid URLs."""
config = {'login_redirect_override': 'https://localhost:3000/sso/login/saml'}
valid_login_urls = ['/sso/login/github', '/sso/login/azuread-oauth2']
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_no_gateway_authenticator(self):
"""Test that method returns early when gateway_authenticator is missing."""
config = {'login_redirect_override': 'https://localhost:3000/sso/login/github'}
valid_login_urls = ['/sso/login/github']
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_empty_gateway_authenticator(self):
"""Test that method returns early when gateway_authenticator is empty."""
config = {'login_redirect_override': 'https://localhost:3000/sso/login/github', 'gateway_authenticator': {}}
valid_login_urls = ['/sso/login/github']
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_no_sso_login_url(self):
"""Test that method returns early when sso_login_url is missing."""
config = {'login_redirect_override': 'https://localhost:3000/sso/login/github', 'gateway_authenticator': {'id': 123}}
valid_login_urls = ['/sso/login/github']
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_empty_sso_login_url(self):
"""Test that method returns early when sso_login_url is empty."""
config = {'login_redirect_override': 'https://localhost:3000/sso/login/github', 'gateway_authenticator': {'id': 123, 'sso_login_url': ''}}
valid_login_urls = ['/sso/login/github']
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_successful_update(self):
"""Test successful LOGIN_REDIRECT_OVERRIDE update."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/github',
'gateway_authenticator': {'id': 123, 'sso_login_url': '/sso/auth/login/123/'},
}
valid_login_urls = ['/sso/login/github']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.example.com'
self.migrator.handle_login_override(config, valid_login_urls)
# Verify gateway client methods were called correctly
self.gateway_client.get_base_url.assert_called_once()
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com/sso/auth/login/123/')
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_multiple_valid_urls_first_matches(self):
"""Test that first matching URL in valid_login_urls is used."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/github-org',
'gateway_authenticator': {'id': 123, 'sso_login_url': '/sso/auth/login/123/'},
}
valid_login_urls = ['/sso/login/github-org', '/sso/login/github-team', '/sso/login/github']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.example.com'
self.migrator.handle_login_override(config, valid_login_urls)
# Should still work since first URL matches
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com/sso/auth/login/123/')
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_multiple_valid_urls_last_matches(self):
"""Test that last matching URL in valid_login_urls is used."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/github',
'gateway_authenticator': {'id': 123, 'sso_login_url': '/sso/auth/login/123/'},
}
valid_login_urls = ['/sso/login/github-org', '/sso/login/github-team', '/sso/login/github']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.example.com'
self.migrator.handle_login_override(config, valid_login_urls)
# Should work since last URL matches
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com/sso/auth/login/123/')
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_partial_url_match(self):
"""Test that partial URL matching works (using 'in' operator)."""
config = {
'login_redirect_override': 'https://controller.example.com/sso/login/azuread-oauth2/?next=%2Fdashboard',
'gateway_authenticator': {'id': 456, 'sso_login_url': '/auth/login/azuread/456/'},
}
valid_login_urls = ['/sso/login/azuread-oauth2']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.example.com:8080'
self.migrator.handle_login_override(config, valid_login_urls)
# Should work since valid URL is contained in login_redirect_override
self.gateway_client.update_gateway_setting.assert_called_once_with(
'LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com:8080/auth/login/azuread/456/?next=%2Fdashboard'
)
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_saml_with_parameters(self):
"""Test LOGIN_REDIRECT_OVERRIDE with SAML IDP parameters."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/saml/?idp=mycompany',
'gateway_authenticator': {'id': 789, 'sso_login_url': '/auth/login/saml/789/'},
}
valid_login_urls = ['/sso/login/saml/?idp=mycompany']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.local'
self.migrator.handle_login_override(config, valid_login_urls)
# Should work with SAML parameter URLs
self.gateway_client.update_gateway_setting.assert_called_once_with(
'LOGIN_REDIRECT_OVERRIDE', 'https://gateway.local/auth/login/saml/789/?idp=mycompany'
)
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_github_with_trailing_slash(self):
"""Test LOGIN_REDIRECT_OVERRIDE with trailing slash."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/github-enterprise/',
'gateway_authenticator': {'id': 999, 'sso_login_url': '/auth/login/github/999/'},
}
valid_login_urls = ['/sso/login/github-enterprise', '/sso/login/github-enterprise/']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.internal'
self.migrator.handle_login_override(config, valid_login_urls)
# Should work with trailing slash URLs
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.internal/auth/login/github/999/')
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_empty_valid_urls_list(self):
"""Test that method returns early when valid_login_urls is empty."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/github',
'gateway_authenticator': {'id': 123, 'sso_login_url': '/sso/auth/login/123/'},
}
valid_login_urls = []
self.migrator.handle_login_override(config, valid_login_urls)
# Should not call any gateway client methods
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_preserves_existing_flag_state(self):
"""Test that method preserves flag state if it was already set."""
# Set flag to True initially
BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator = True
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/github',
'gateway_authenticator': {'id': 123, 'sso_login_url': '/sso/auth/login/123/'},
}
valid_login_urls = ['/sso/login/github']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.example.com'
self.migrator.handle_login_override(config, valid_login_urls)
# Flag should still be True
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_writes_output_message(self):
"""Test that method writes output message when updating."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/google-oauth2',
'gateway_authenticator': {'id': 555, 'sso_login_url': '/auth/login/google/555/'},
}
valid_login_urls = ['/sso/login/google-oauth2']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.test'
# Mock _write_output method
with patch.object(self.migrator, '_write_output') as mock_write_output:
self.migrator.handle_login_override(config, valid_login_urls)
# Verify output message was written
mock_write_output.assert_called_once_with('Updating LOGIN_REDIRECT_OVERRIDE to: https://gateway.test/auth/login/google/555/')
@pytest.mark.parametrize(
"login_redirect_override,valid_urls,expected_match",
[
# Test Azure AD variations
('https://localhost:3000/sso/login/azuread-oauth2', ['/sso/login/azuread-oauth2'], True),
('https://localhost:3000/sso/login/azuread-oauth2/', ['/sso/login/azuread-oauth2'], True),
('https://controller.example.com/sso/login/azuread-oauth2?next=/home', ['/sso/login/azuread-oauth2'], True),
# Test Google OAuth2 variations
('https://localhost:3000/sso/login/google-oauth2', ['/sso/login/google-oauth2'], True),
('https://localhost:3000/sso/login/google-oauth2/', ['/sso/login/google-oauth2'], True),
# Test GitHub variations
('https://localhost:3000/sso/login/github', ['/sso/login/github'], True),
('https://localhost:3000/sso/login/github-org', ['/sso/login/github-org'], True),
('https://localhost:3000/sso/login/github-team', ['/sso/login/github-team'], True),
('https://localhost:3000/sso/login/github-enterprise', ['/sso/login/github-enterprise'], True),
# Test SAML variations
('https://localhost:3000/sso/login/saml/?idp=company', ['/sso/login/saml/?idp=company'], True),
('https://localhost:3000/sso/login/saml/?idp=test-org', ['/sso/login/saml/?idp=test-org'], True),
# Test non-matching cases
('https://localhost:3000/sso/login/ldap', ['/sso/login/github'], False),
('https://localhost:3000/sso/login/azuread-oauth2', ['/sso/login/google-oauth2'], False),
('https://localhost:3000/sso/login/saml/?idp=wrong', ['/sso/login/saml/?idp=company'], False),
# Test multiple valid URLs
('https://localhost:3000/sso/login/github-org', ['/sso/login/github', '/sso/login/github-org'], True),
('https://localhost:3000/sso/login/github', ['/sso/login/github-org', '/sso/login/github'], True),
# Test improved URL parsing scenarios - better boundary detection
('https://localhost:3000/sso/login/github-enterprise', ['/sso/login/github'], False), # Should NOT match due to better parsing
('https://localhost:3000/sso/login/saml/?idp=company&next=/home', ['/sso/login/saml/?idp=company'], True),
('https://localhost:3000/sso/login/saml/?idp=company', ['/sso/login/saml/?idp=different'], False),
('https://controller.example.com:8080/sso/login/azuread-oauth2/?next=/dashboard', ['/sso/login/azuread-oauth2'], True),
('http://localhost/sso/login/github?state=abc123', ['/sso/login/github'], True),
# Test boundary detection edge cases
('https://localhost:3000/sso/login/github/', ['/sso/login/github'], True), # Trailing slash should match
('https://localhost:3000/sso/login/github#section', ['/sso/login/github'], True), # Fragment should match
],
)
def test_handle_login_override_url_matching_variations(self, login_redirect_override, valid_urls, expected_match):
"""Test various URL matching scenarios parametrically."""
config = {'login_redirect_override': login_redirect_override, 'gateway_authenticator': {'id': 123, 'sso_login_url': '/auth/login/test/123/'}}
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.test'
self.migrator.handle_login_override(config, valid_urls)
if expected_match:
# Should call gateway methods when URL matches
self.gateway_client.get_base_url.assert_called_once()
self.gateway_client.update_gateway_setting.assert_called_once()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
else:
# Should not call gateway methods when URL doesn't match
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_improved_url_parsing(self):
"""Test that improved URL parsing with proper path boundary detection prevents false positive matches."""
# This test demonstrates the improvement over simple string matching
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/github-enterprise',
'gateway_authenticator': {'id': 123, 'sso_login_url': '/auth/login/test/123/'},
}
# With the old simple string matching, this would incorrectly match
# because '/sso/login/github' is contained in '/sso/login/github-enterprise'
# But with proper URL parsing, it should NOT match
valid_login_urls = ['/sso/login/github']
self.migrator.handle_login_override(config, valid_login_urls)
# Should NOT match due to improved parsing
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_query_parameter_handling(self):
"""Test that query parameters are properly handled in URL matching."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/saml/?idp=mycompany&next=%2Fdashboard',
'gateway_authenticator': {'id': 456, 'sso_login_url': '/auth/login/saml/456/?idp=IdP'},
}
# Should match the SAML URL with the correct IDP parameter (boundary-aware matching)
valid_login_urls = ['/sso/login/saml/?idp=mycompany']
self.gateway_client.get_base_url.return_value = 'https://gateway.test'
self.migrator.handle_login_override(config, valid_login_urls)
# Should match because the query parameter is properly contained with boundaries
self.gateway_client.get_base_url.assert_called_once()
self.gateway_client.update_gateway_setting.assert_called_once_with(
'LOGIN_REDIRECT_OVERRIDE', 'https://gateway.test/auth/login/saml/456/?idp=IdP&next=%2Fdashboard'
)
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
def test_handle_login_override_different_query_parameters(self):
"""Test that different query parameters don't match."""
config = {
'login_redirect_override': 'https://localhost:3000/sso/login/saml/?idp=company-a',
'gateway_authenticator': {'id': 456, 'sso_login_url': '/auth/login/saml/456/'},
}
# Should NOT match SAML URL with different IDP parameter
valid_login_urls = ['/sso/login/saml/?idp=company-b']
self.migrator.handle_login_override(config, valid_login_urls)
# Should NOT match because the query parameters are different
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False