Improve redirect override (#7042)

* handle login redirect for the oidc migrator

* handle updating login override redirect centrally in the settings migrator

* update unit tests
This commit is contained in:
Peter Braun
2025-08-05 14:49:18 +02:00
committed by thedoubl3j
parent cf0bc16cf7
commit 3e1b9b2c88
4 changed files with 59 additions and 36 deletions

View File

@@ -880,8 +880,9 @@ class TestHandleLoginOverride:
self.command = Mock()
self.migrator = BaseAuthenticatorMigrator(self.gateway_client, self.command)
# Reset the class-level flag before each test
# Reset the class-level variables before each test
BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator = False
BaseAuthenticatorMigrator.login_redirect_override_new_url = None
def test_handle_login_override_no_login_redirect_override(self):
"""Test that method returns early when no login_redirect_override is provided."""
@@ -981,8 +982,10 @@ class TestHandleLoginOverride:
# Verify gateway client methods were called correctly
self.gateway_client.get_base_url.assert_called_once()
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com/sso/auth/login/123/')
# update_gateway_setting should NOT be called - URL is stored in class variable instead
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.example.com/sso/auth/login/123/'
def test_handle_login_override_multiple_valid_urls_first_matches(self):
"""Test that first matching URL in valid_login_urls is used."""
@@ -998,8 +1001,9 @@ class TestHandleLoginOverride:
self.migrator.handle_login_override(config, valid_login_urls)
# Should still work since first URL matches
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com/sso/auth/login/123/')
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.example.com/sso/auth/login/123/'
def test_handle_login_override_multiple_valid_urls_last_matches(self):
"""Test that last matching URL in valid_login_urls is used."""
@@ -1015,8 +1019,9 @@ class TestHandleLoginOverride:
self.migrator.handle_login_override(config, valid_login_urls)
# Should work since last URL matches
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com/sso/auth/login/123/')
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.example.com/sso/auth/login/123/'
def test_handle_login_override_partial_url_match(self):
"""Test that partial URL matching works (using 'in' operator)."""
@@ -1031,10 +1036,9 @@ class TestHandleLoginOverride:
self.migrator.handle_login_override(config, valid_login_urls)
# Should work since valid URL is contained in login_redirect_override
self.gateway_client.update_gateway_setting.assert_called_once_with(
'LOGIN_REDIRECT_OVERRIDE', 'https://gateway.example.com:8080/auth/login/azuread/456/?next=%2Fdashboard'
)
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.example.com:8080/auth/login/azuread/456/?next=%2Fdashboard'
def test_handle_login_override_saml_with_parameters(self):
"""Test LOGIN_REDIRECT_OVERRIDE with SAML IDP parameters."""
@@ -1050,10 +1054,9 @@ class TestHandleLoginOverride:
self.migrator.handle_login_override(config, valid_login_urls)
# Should work with SAML parameter URLs
self.gateway_client.update_gateway_setting.assert_called_once_with(
'LOGIN_REDIRECT_OVERRIDE', 'https://gateway.local/auth/login/saml/789/?idp=mycompany'
)
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.local/auth/login/saml/789/?idp=mycompany'
def test_handle_login_override_github_with_trailing_slash(self):
"""Test LOGIN_REDIRECT_OVERRIDE with trailing slash."""
@@ -1069,8 +1072,9 @@ class TestHandleLoginOverride:
self.migrator.handle_login_override(config, valid_login_urls)
# Should work with trailing slash URLs
self.gateway_client.update_gateway_setting.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE', 'https://gateway.internal/auth/login/github/999/')
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.internal/auth/login/github/999/'
def test_handle_login_override_empty_valid_urls_list(self):
"""Test that method returns early when valid_login_urls is empty."""
@@ -1087,8 +1091,8 @@ class TestHandleLoginOverride:
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
def test_handle_login_override_preserves_existing_flag_state(self):
"""Test that method preserves flag state if it was already set."""
def test_handle_login_override_already_handled_raises_error(self):
"""Test that calling handle_login_override when already handled raises RuntimeError."""
# Set flag to True initially
BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator = True
@@ -1098,13 +1102,9 @@ class TestHandleLoginOverride:
}
valid_login_urls = ['/sso/login/github']
# Mock gateway client methods
self.gateway_client.get_base_url.return_value = 'https://gateway.example.com'
self.migrator.handle_login_override(config, valid_login_urls)
# Flag should still be True
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
# Should raise RuntimeError when trying to call again
with pytest.raises(RuntimeError, match="LOGIN_REDIRECT_OVERRIDE has already been handled by another migrator"):
self.migrator.handle_login_override(config, valid_login_urls)
def test_handle_login_override_writes_output_message(self):
"""Test that method writes output message when updating."""
@@ -1122,7 +1122,10 @@ class TestHandleLoginOverride:
self.migrator.handle_login_override(config, valid_login_urls)
# Verify output message was written
mock_write_output.assert_called_once_with('Updating LOGIN_REDIRECT_OVERRIDE to: https://gateway.test/auth/login/google/555/')
mock_write_output.assert_called_once_with('LOGIN_REDIRECT_OVERRIDE will be updated to: https://gateway.test/auth/login/google/555/')
# Verify class variables were set correctly
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.test/auth/login/google/555/'
@pytest.mark.parametrize(
"login_redirect_override,valid_urls,expected_match",
@@ -1170,15 +1173,17 @@ class TestHandleLoginOverride:
self.migrator.handle_login_override(config, valid_urls)
if expected_match:
# Should call gateway methods when URL matches
# Should call get_base_url when URL matches but NOT update_gateway_setting
self.gateway_client.get_base_url.assert_called_once()
self.gateway_client.update_gateway_setting.assert_called_once()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url is not None
else:
# Should not call gateway methods when URL doesn't match
self.gateway_client.get_base_url.assert_not_called()
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is False
assert BaseAuthenticatorMigrator.login_redirect_override_new_url is None
def test_handle_login_override_improved_url_parsing(self):
"""Test that improved URL parsing with proper path boundary detection prevents false positive matches."""
@@ -1216,10 +1221,9 @@ class TestHandleLoginOverride:
# Should match because the query parameter is properly contained with boundaries
self.gateway_client.get_base_url.assert_called_once()
self.gateway_client.update_gateway_setting.assert_called_once_with(
'LOGIN_REDIRECT_OVERRIDE', 'https://gateway.test/auth/login/saml/456/?idp=IdP&next=%2Fdashboard'
)
self.gateway_client.update_gateway_setting.assert_not_called()
assert BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator is True
assert BaseAuthenticatorMigrator.login_redirect_override_new_url == 'https://gateway.test/auth/login/saml/456/?idp=IdP&next=%2Fdashboard'
def test_handle_login_override_different_query_parameters(self):
"""Test that different query parameters don't match."""

View File

@@ -18,6 +18,8 @@ class BaseAuthenticatorMigrator:
KEYS_TO_PRESERVE = ['idp']
# Class-level flag to track if LOGIN_REDIRECT_OVERRIDE was set by any migrator
login_redirect_override_set_by_migrator = False
# Class-level variable to store the new LOGIN_REDIRECT_OVERRIDE URL computed by migrators
login_redirect_override_new_url = None
def __init__(self, gateway_client=None, command=None, force=False):
"""
@@ -578,6 +580,10 @@ class BaseAuthenticatorMigrator:
- gateway_authenticator: The created/updated authenticator info
valid_login_urls: List of URL patterns to match against
"""
# Check if another migrator has already handled login redirect override
if BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator:
raise RuntimeError("LOGIN_REDIRECT_OVERRIDE has already been handled by another migrator")
login_redirect_override = config.get('login_redirect_override')
if not login_redirect_override:
return
@@ -628,14 +634,16 @@ class BaseAuthenticatorMigrator:
if not sso_login_url:
return
# Update LOGIN_REDIRECT_OVERRIDE with the new Gateway URL
# Compute the new LOGIN_REDIRECT_OVERRIDE URL with the Gateway URL
gateway_base_url = self.gateway_client.get_base_url()
parsed_sso = urlparse(sso_login_url)
parsed_gw = urlparse(gateway_base_url)
updated_query = self._updated_query_string(parsed_sso)
complete_url = parsed_redirect._replace(scheme=parsed_gw.scheme, path=parsed_sso.path, netloc=parsed_gw.netloc, query=updated_query).geturl()
self._write_output(f'Updating LOGIN_REDIRECT_OVERRIDE to: {complete_url}')
self.gateway_client.update_gateway_setting('LOGIN_REDIRECT_OVERRIDE', complete_url)
self._write_output(f'LOGIN_REDIRECT_OVERRIDE will be updated to: {complete_url}')
# Store the new URL in class variable for settings migrator to use
BaseAuthenticatorMigrator.login_redirect_override_new_url = complete_url
# Set the class-level flag to indicate LOGIN_REDIRECT_OVERRIDE was handled by a migrator
BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator = True

View File

@@ -104,4 +104,10 @@ class OIDCMigrator(BaseAuthenticatorMigrator):
ignore_keys = ['CALLBACK_URL', 'SCOPE']
# Submit the authenticator (create or update as needed)
return self.submit_authenticator(gateway_config, ignore_keys, config)
result = self.submit_authenticator(gateway_config, ignore_keys, config)
# Handle LOGIN_REDIRECT_OVERRIDE if applicable
valid_login_urls = ['/sso/login/oidc']
self.handle_login_override(config, valid_login_urls)
return result

View File

@@ -48,16 +48,21 @@ class SettingsMigrator(BaseAuthenticatorMigrator):
list: List of configured settings that need to be migrated
"""
# Define settings that should be migrated from AWX to Gateway
settings_to_migrate = ['SESSION_COOKIE_AGE', 'SOCIAL_AUTH_USERNAME_IS_FULL_EMAIL', 'ALLOW_OAUTH2_FOR_EXTERNAL_USERS']
# Add LOGIN_REDIRECT_OVERRIDE to the list if no authenticator migrator has handled it
if not BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator:
settings_to_migrate.append("LOGIN_REDIRECT_OVERRIDE")
settings_to_migrate = ['SESSION_COOKIE_AGE', 'SOCIAL_AUTH_USERNAME_IS_FULL_EMAIL', 'ALLOW_OAUTH2_FOR_EXTERNAL_USERS', 'LOGIN_REDIRECT_OVERRIDE']
found_configs = []
for setting_name in settings_to_migrate:
setting_value = getattr(settings, setting_name, None)
# Handle LOGIN_REDIRECT_OVERRIDE specially
if setting_name == 'LOGIN_REDIRECT_OVERRIDE':
if BaseAuthenticatorMigrator.login_redirect_override_set_by_migrator:
# Use the URL computed by the authenticator migrator
setting_value = BaseAuthenticatorMigrator.login_redirect_override_new_url
else:
# Use the original controller setting value
setting_value = getattr(settings, setting_name, None)
else:
setting_value = getattr(settings, setting_name, None)
# Only include settings that have non-None and non-empty values
if setting_value is not None and setting_value != "":