Add basic auth to subscription management API (#16103)

Allow users to do subscription management using
Red Hat username and password.

In basic auth case, the candlepin API
at subscriptions.rhsm.redhat.com will be used instead
of console.redhat.com.

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster
2025-10-02 01:06:47 -04:00
committed by GitHub
parent 6db08bfa4e
commit 2729076f7f
7 changed files with 569 additions and 75 deletions

View File

@@ -0,0 +1,237 @@
from unittest.mock import patch, MagicMock
import pytest
from awx.api.versioning import reverse
from rest_framework import status
@pytest.mark.django_db
class TestApiV2SubscriptionView:
"""Test cases for the /api/v2/config/subscriptions/ endpoint"""
def test_basic_auth(self, post, admin):
"""Test POST with subscriptions_username and subscriptions_password calls validate_rh with basic_auth=True"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_user', 'test_password', True)
def test_service_account(self, post, admin):
"""Test POST with subscriptions_client_id and subscriptions_client_secret calls validate_rh with basic_auth=False"""
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': 'test_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False)
def test_encrypted_password_basic_auth(self, post, admin, settings):
"""Test POST with $encrypted$ password uses settings value for basic auth"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': '$encrypted$'}
settings.SUBSCRIPTIONS_PASSWORD = 'actual_password_from_settings'
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_user', 'actual_password_from_settings', True)
def test_encrypted_client_secret_service_account(self, post, admin, settings):
"""Test POST with $encrypted$ client_secret uses settings value for service_account"""
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': '$encrypted$'}
settings.SUBSCRIPTIONS_CLIENT_SECRET = 'actual_secret_from_settings'
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'actual_secret_from_settings', False)
def test_missing_username_returns_error(self, post, admin):
"""Test POST with missing username returns 400 error"""
data = {'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_password_returns_error(self, post, admin, settings):
"""Test POST with missing password returns 400 error"""
data = {'subscriptions_username': 'test_user'}
settings.SUBSCRIPTIONS_PASSWORD = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_client_id_returns_error(self, post, admin):
"""Test POST with missing client_id returns 400 error"""
data = {'subscriptions_client_secret': 'test_secret'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_client_secret_returns_error(self, post, admin, settings):
"""Test POST with missing client_secret returns 400 error"""
data = {'subscriptions_client_id': 'test_client_id'}
settings.SUBSCRIPTIONS_CLIENT_SECRET = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_empty_username_returns_error(self, post, admin):
"""Test POST with empty username returns 400 error"""
data = {'subscriptions_username': '', 'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_empty_password_returns_error(self, post, admin, settings):
"""Test POST with empty password returns 400 error"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': ''}
settings.SUBSCRIPTIONS_PASSWORD = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_non_superuser_permission_denied(self, post, rando):
"""Test that non-superuser cannot access the endpoint"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, rando)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_settings_updated_on_successful_basic_auth(self, post, admin, settings):
"""Test that settings are updated when basic auth validation succeeds"""
data = {'subscriptions_username': 'new_username', 'subscriptions_password': 'new_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
assert settings.SUBSCRIPTIONS_USERNAME == 'new_username'
assert settings.SUBSCRIPTIONS_PASSWORD == 'new_password'
def test_settings_updated_on_successful_service_account(self, post, admin, settings):
"""Test that settings are updated when service account validation succeeds"""
data = {'subscriptions_client_id': 'new_client_id', 'subscriptions_client_secret': 'new_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
assert settings.SUBSCRIPTIONS_CLIENT_ID == 'new_client_id'
assert settings.SUBSCRIPTIONS_CLIENT_SECRET == 'new_client_secret'
def test_analytics_credentials_plumbed_when_missing(self, post, admin, settings):
"""Test that service account credentials are plumbed to analytics when REDHAT_* settings are missing"""
data = {'subscriptions_client_id': 'analytics_client_id', 'subscriptions_client_secret': 'analytics_client_secret'}
# Ensure REDHAT_* settings are not set
settings.REDHAT_USERNAME = None
settings.REDHAT_PASSWORD = None
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
assert settings.REDHAT_USERNAME == 'analytics_client_id'
assert settings.REDHAT_PASSWORD == 'analytics_client_secret'
def test_analytics_credentials_not_overwritten_when_present(self, post, admin, settings):
"""Test that existing REDHAT_* settings are not overwritten"""
data = {'subscriptions_client_id': 'new_client_id', 'subscriptions_client_secret': 'new_client_secret'}
# Set existing REDHAT_* settings
settings.REDHAT_USERNAME = 'existing_username'
settings.REDHAT_PASSWORD = 'existing_password'
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# REDHAT_* settings should remain unchanged
assert settings.REDHAT_USERNAME == 'existing_username'
assert settings.REDHAT_PASSWORD == 'existing_password'
def test_validate_rh_exception_handling(self, post, admin):
"""Test that exceptions from validate_rh are properly handled"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.side_effect = Exception("Connection error")
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_mixed_credentials_prioritizes_client_id(self, post, admin):
"""Test that when both username and client_id are provided, client_id takes precedence"""
data = {
'subscriptions_username': 'test_user',
'subscriptions_password': 'test_password',
'subscriptions_client_id': 'test_client_id',
'subscriptions_client_secret': 'test_client_secret',
}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# Should use service account (basic_auth=False) since client_id is present
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False)

View File

@@ -1,37 +0,0 @@
import json
from http import HTTPStatus
from unittest.mock import patch
from requests import Response
from awx.main.utils.licensing import Licenser
def test_rhsm_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'body': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('awx.main.utils.analytics_proxy.OIDCClient.make_request', new=mocked_requests_get):
subs = licenser.get_rhsm_subs('localhost', 'admin', 'admin')
assert subs == []
def test_satellite_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'results': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('requests.get', new=mocked_requests_get):
subs = licenser.get_satellite_subs('localhost', 'admin', 'admin')
assert subs == []

View File

@@ -0,0 +1,154 @@
from unittest.mock import patch
from awx.main.utils.licensing import Licenser
def test_validate_rh_basic_auth_rhsm():
"""
Assert get_rhsm_subs is called when
- basic_auth=True
- host is subscription.rhsm.redhat.com
"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs', return_value=[]
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_called_once_with('https://subscription.rhsm.redhat.com', 'testuser', 'testpass')
mock_get_satellite.assert_not_called()
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_basic_auth_satellite():
"""
Assert get_satellite_subs is called when
- basic_auth=True
- custom satellite host
"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://satellite.example.com') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs'
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs', return_value=[]) as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_called_once_with('https://satellite.example.com', 'testuser', 'testpass')
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_service_account_crc():
"""
Assert get_crc_subs is called when
- basic_auth=False
"""
licenser = Licenser()
with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object(licenser, 'get_host_from_rhsm_config') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs'
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object(
licenser, 'get_crc_subs', return_value=[]
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
mock_settings.SUBSCRIPTIONS_RHSM_URL = 'https://console.redhat.com/api/rhsm/v1/subscriptions'
licenser.validate_rh('client_id', 'client_secret', basic_auth=False)
# Assert the correct methods were called
mock_get_host.assert_not_called()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_not_called()
mock_get_crc.assert_called_once_with('https://console.redhat.com/api/rhsm/v1/subscriptions', 'client_id', 'client_secret')
mock_generate.assert_called_once_with([], is_candlepin=False)
def test_validate_rh_missing_user_raises_error():
"""Test validate_rh raises ValueError when user is missing"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
try:
licenser.validate_rh(None, 'testpass', basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_id or subscriptions_username is required' in str(e)
def test_validate_rh_missing_password_raises_error():
"""Test validate_rh raises ValueError when password is missing"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
try:
licenser.validate_rh('testuser', None, basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_secret or subscriptions_password is required' in str(e)
def test_validate_rh_no_host_fallback_to_candlepin():
"""Test validate_rh falls back to REDHAT_CANDLEPIN_HOST when no host from config
- basic_auth=True
- no host from config
- REDHAT_CANDLEPIN_HOST is set
"""
licenser = Licenser()
with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object(
licenser, 'get_host_from_rhsm_config', return_value=None
) as mock_get_host, patch.object(licenser, 'get_rhsm_subs', return_value=[]) as mock_get_rhsm, patch.object(
licenser, 'get_satellite_subs', return_value=[]
) as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
mock_settings.REDHAT_CANDLEPIN_HOST = 'https://candlepin.example.com'
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_called_once_with('https://candlepin.example.com', 'testuser', 'testpass')
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_empty_credentials_basic_auth():
"""Test validate_rh with empty string credentials raises ValueError"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
# Test empty user
try:
licenser.validate_rh(None, 'testpass', basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_id or subscriptions_username is required' in str(e)
# Test empty password
try:
licenser.validate_rh('testuser', None, basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_secret or subscriptions_password is required' in str(e)