diff --git a/awx/api/views/root.py b/awx/api/views/root.py index f3fc85043f..1fa45ad224 100644 --- a/awx/api/views/root.py +++ b/awx/api/views/root.py @@ -180,16 +180,46 @@ class ApiV2SubscriptionView(APIView): def post(self, request): data = request.data.copy() - if data.get('subscriptions_client_secret') == '$encrypted$': - data['subscriptions_client_secret'] = settings.SUBSCRIPTIONS_CLIENT_SECRET + try: - user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret') + user = None + pw = None + basic_auth = False + # determine if the credentials are for basic auth or not + if data.get('subscriptions_client_id'): + user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret') + if pw == '$encrypted$': + pw = settings.SUBSCRIPTIONS_CLIENT_SECRET + elif data.get('subscriptions_username'): + user, pw = data.get('subscriptions_username'), data.get('subscriptions_password') + if pw == '$encrypted$': + pw = settings.SUBSCRIPTIONS_PASSWORD + basic_auth = True + + if not user or not pw: + return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST) + with set_environ(**settings.AWX_TASK_ENV): - validated = get_licenser().validate_rh(user, pw) - if user: - settings.SUBSCRIPTIONS_CLIENT_ID = data['subscriptions_client_id'] - if pw: - settings.SUBSCRIPTIONS_CLIENT_SECRET = data['subscriptions_client_secret'] + validated = get_licenser().validate_rh(user, pw, basic_auth) + + # update settings if the credentials were valid + if basic_auth: + if user: + settings.SUBSCRIPTIONS_USERNAME = user + if pw: + settings.SUBSCRIPTIONS_PASSWORD = pw + else: + if user: + settings.SUBSCRIPTIONS_CLIENT_ID = user + if not settings.REDHAT_USERNAME: + # plumb these to analytics credentials + settings.REDHAT_USERNAME = user + if pw: + settings.SUBSCRIPTIONS_CLIENT_SECRET = pw + if not settings.REDHAT_PASSWORD: + # plumb these to analytics credentials + settings.REDHAT_PASSWORD = pw + except Exception as exc: msg = _("Invalid Subscription") if isinstance(exc, TokenError) or ( @@ -225,16 +255,21 @@ class ApiV2AttachView(APIView): if not subscription_id: return Response({"error": _("No subscription ID provided.")}, status=status.HTTP_400_BAD_REQUEST) # Ensure we always use the latest subscription credentials - cache.delete_many(['SUBSCRIPTIONS_CLIENT_ID', 'SUBSCRIPTIONS_CLIENT_SECRET']) + cache.delete_many(['SUBSCRIPTIONS_CLIENT_ID', 'SUBSCRIPTIONS_CLIENT_SECRET', 'SUBSCRIPTIONS_USERNAME', 'SUBSCRIPTIONS_PASSWORD']) user = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None) pw = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None) + basic_auth = False + if not (user and pw): + user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None) + pw = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None) + basic_auth = True if not (user and pw): return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST) if subscription_id and user and pw: data = request.data.copy() try: with set_environ(**settings.AWX_TASK_ENV): - validated = get_licenser().validate_rh(user, pw) + validated = get_licenser().validate_rh(user, pw, basic_auth) except Exception as exc: msg = _("Invalid Subscription") if isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401: diff --git a/awx/main/conf.py b/awx/main/conf.py index 50eed36666..06852ff3f8 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -144,6 +144,33 @@ register( category_slug='system', ) +register( + 'SUBSCRIPTIONS_USERNAME', + field_class=fields.CharField, + default='', + allow_blank=True, + encrypted=False, + read_only=False, + label=_('Red Hat Username for Subscriptions'), + help_text=_('Username used to retrieve subscription and content information'), # noqa + category=_('System'), + category_slug='system', +) + +register( + 'SUBSCRIPTIONS_PASSWORD', + field_class=fields.CharField, + default='', + allow_blank=True, + encrypted=True, + read_only=False, + label=_('Red Hat Password for Subscriptions'), + help_text=_('Password used to retrieve subscription and content information'), # noqa + category=_('System'), + category_slug='system', +) + + register( 'SUBSCRIPTIONS_CLIENT_ID', field_class=fields.CharField, diff --git a/awx/main/tests/functional/api/test_licensing.py b/awx/main/tests/functional/api/test_licensing.py new file mode 100644 index 0000000000..efec73ae3f --- /dev/null +++ b/awx/main/tests/functional/api/test_licensing.py @@ -0,0 +1,237 @@ +from unittest.mock import patch, MagicMock + +import pytest +from awx.api.versioning import reverse +from rest_framework import status + + +@pytest.mark.django_db +class TestApiV2SubscriptionView: + """Test cases for the /api/v2/config/subscriptions/ endpoint""" + + def test_basic_auth(self, post, admin): + """Test POST with subscriptions_username and subscriptions_password calls validate_rh with basic_auth=True""" + data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'} + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + mock_licenser.validate_rh.assert_called_once_with('test_user', 'test_password', True) + + def test_service_account(self, post, admin): + """Test POST with subscriptions_client_id and subscriptions_client_secret calls validate_rh with basic_auth=False""" + data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': 'test_client_secret'} + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False) + + def test_encrypted_password_basic_auth(self, post, admin, settings): + """Test POST with $encrypted$ password uses settings value for basic auth""" + data = {'subscriptions_username': 'test_user', 'subscriptions_password': '$encrypted$'} + + settings.SUBSCRIPTIONS_PASSWORD = 'actual_password_from_settings' + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + mock_licenser.validate_rh.assert_called_once_with('test_user', 'actual_password_from_settings', True) + + def test_encrypted_client_secret_service_account(self, post, admin, settings): + """Test POST with $encrypted$ client_secret uses settings value for service_account""" + data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': '$encrypted$'} + + settings.SUBSCRIPTIONS_CLIENT_SECRET = 'actual_secret_from_settings' + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'actual_secret_from_settings', False) + + def test_missing_username_returns_error(self, post, admin): + """Test POST with missing username returns 400 error""" + data = {'subscriptions_password': 'test_password'} + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert 'Missing subscription credentials' in response.data['error'] + + def test_missing_password_returns_error(self, post, admin, settings): + """Test POST with missing password returns 400 error""" + data = {'subscriptions_username': 'test_user'} + settings.SUBSCRIPTIONS_PASSWORD = None + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert 'Missing subscription credentials' in response.data['error'] + + def test_missing_client_id_returns_error(self, post, admin): + """Test POST with missing client_id returns 400 error""" + data = {'subscriptions_client_secret': 'test_secret'} + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert 'Missing subscription credentials' in response.data['error'] + + def test_missing_client_secret_returns_error(self, post, admin, settings): + """Test POST with missing client_secret returns 400 error""" + data = {'subscriptions_client_id': 'test_client_id'} + settings.SUBSCRIPTIONS_CLIENT_SECRET = None + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert 'Missing subscription credentials' in response.data['error'] + + def test_empty_username_returns_error(self, post, admin): + """Test POST with empty username returns 400 error""" + data = {'subscriptions_username': '', 'subscriptions_password': 'test_password'} + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert 'Missing subscription credentials' in response.data['error'] + + def test_empty_password_returns_error(self, post, admin, settings): + """Test POST with empty password returns 400 error""" + data = {'subscriptions_username': 'test_user', 'subscriptions_password': ''} + settings.SUBSCRIPTIONS_PASSWORD = None + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert 'Missing subscription credentials' in response.data['error'] + + def test_non_superuser_permission_denied(self, post, rando): + """Test that non-superuser cannot access the endpoint""" + data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'} + + response = post(reverse('api:api_v2_subscription_view'), data, rando) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_settings_updated_on_successful_basic_auth(self, post, admin, settings): + """Test that settings are updated when basic auth validation succeeds""" + data = {'subscriptions_username': 'new_username', 'subscriptions_password': 'new_password'} + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + assert settings.SUBSCRIPTIONS_USERNAME == 'new_username' + assert settings.SUBSCRIPTIONS_PASSWORD == 'new_password' + + def test_settings_updated_on_successful_service_account(self, post, admin, settings): + """Test that settings are updated when service account validation succeeds""" + data = {'subscriptions_client_id': 'new_client_id', 'subscriptions_client_secret': 'new_client_secret'} + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + assert settings.SUBSCRIPTIONS_CLIENT_ID == 'new_client_id' + assert settings.SUBSCRIPTIONS_CLIENT_SECRET == 'new_client_secret' + + def test_analytics_credentials_plumbed_when_missing(self, post, admin, settings): + """Test that service account credentials are plumbed to analytics when REDHAT_* settings are missing""" + data = {'subscriptions_client_id': 'analytics_client_id', 'subscriptions_client_secret': 'analytics_client_secret'} + + # Ensure REDHAT_* settings are not set + settings.REDHAT_USERNAME = None + settings.REDHAT_PASSWORD = None + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + assert settings.REDHAT_USERNAME == 'analytics_client_id' + assert settings.REDHAT_PASSWORD == 'analytics_client_secret' + + def test_analytics_credentials_not_overwritten_when_present(self, post, admin, settings): + """Test that existing REDHAT_* settings are not overwritten""" + data = {'subscriptions_client_id': 'new_client_id', 'subscriptions_client_secret': 'new_client_secret'} + + # Set existing REDHAT_* settings + settings.REDHAT_USERNAME = 'existing_username' + settings.REDHAT_PASSWORD = 'existing_password' + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + # REDHAT_* settings should remain unchanged + assert settings.REDHAT_USERNAME == 'existing_username' + assert settings.REDHAT_PASSWORD == 'existing_password' + + def test_validate_rh_exception_handling(self, post, admin): + """Test that exceptions from validate_rh are properly handled""" + data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'} + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.side_effect = Exception("Connection error") + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + def test_mixed_credentials_prioritizes_client_id(self, post, admin): + """Test that when both username and client_id are provided, client_id takes precedence""" + data = { + 'subscriptions_username': 'test_user', + 'subscriptions_password': 'test_password', + 'subscriptions_client_id': 'test_client_id', + 'subscriptions_client_secret': 'test_client_secret', + } + + with patch('awx.api.views.root.get_licenser') as mock_get_licenser: + mock_licenser = MagicMock() + mock_licenser.validate_rh.return_value = [] + mock_get_licenser.return_value = mock_licenser + + response = post(reverse('api:api_v2_subscription_view'), data, admin) + + assert response.status_code == status.HTTP_200_OK + # Should use service account (basic_auth=False) since client_id is present + mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False) diff --git a/awx/main/tests/unit/utils/test_licensing.py b/awx/main/tests/unit/utils/test_licensing.py deleted file mode 100644 index 0a93aa8612..0000000000 --- a/awx/main/tests/unit/utils/test_licensing.py +++ /dev/null @@ -1,37 +0,0 @@ -import json -from http import HTTPStatus -from unittest.mock import patch - -from requests import Response - -from awx.main.utils.licensing import Licenser - - -def test_rhsm_licensing(): - def mocked_requests_get(*args, **kwargs): - assert kwargs['verify'] == True - response = Response() - subs = json.dumps({'body': []}) - response.status_code = HTTPStatus.OK - response._content = bytes(subs, 'utf-8') - return response - - licenser = Licenser() - with patch('awx.main.utils.analytics_proxy.OIDCClient.make_request', new=mocked_requests_get): - subs = licenser.get_rhsm_subs('localhost', 'admin', 'admin') - assert subs == [] - - -def test_satellite_licensing(): - def mocked_requests_get(*args, **kwargs): - assert kwargs['verify'] == True - response = Response() - subs = json.dumps({'results': []}) - response.status_code = HTTPStatus.OK - response._content = bytes(subs, 'utf-8') - return response - - licenser = Licenser() - with patch('requests.get', new=mocked_requests_get): - subs = licenser.get_satellite_subs('localhost', 'admin', 'admin') - assert subs == [] diff --git a/awx/main/tests/unit/utils/test_validate_rh.py b/awx/main/tests/unit/utils/test_validate_rh.py new file mode 100644 index 0000000000..65052bbdef --- /dev/null +++ b/awx/main/tests/unit/utils/test_validate_rh.py @@ -0,0 +1,154 @@ +from unittest.mock import patch +from awx.main.utils.licensing import Licenser + + +def test_validate_rh_basic_auth_rhsm(): + """ + Assert get_rhsm_subs is called when + - basic_auth=True + - host is subscription.rhsm.redhat.com + """ + licenser = Licenser() + + with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com') as mock_get_host, patch.object( + licenser, 'get_rhsm_subs', return_value=[] + ) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object( + licenser, 'get_crc_subs' + ) as mock_get_crc, patch.object( + licenser, 'generate_license_options_from_entitlements' + ) as mock_generate: + + licenser.validate_rh('testuser', 'testpass', basic_auth=True) + + # Assert the correct methods were called + mock_get_host.assert_called_once() + mock_get_rhsm.assert_called_once_with('https://subscription.rhsm.redhat.com', 'testuser', 'testpass') + mock_get_satellite.assert_not_called() + mock_get_crc.assert_not_called() + mock_generate.assert_called_once_with([], is_candlepin=True) + + +def test_validate_rh_basic_auth_satellite(): + """ + Assert get_satellite_subs is called when + - basic_auth=True + - custom satellite host + """ + licenser = Licenser() + + with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://satellite.example.com') as mock_get_host, patch.object( + licenser, 'get_rhsm_subs' + ) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs', return_value=[]) as mock_get_satellite, patch.object( + licenser, 'get_crc_subs' + ) as mock_get_crc, patch.object( + licenser, 'generate_license_options_from_entitlements' + ) as mock_generate: + + licenser.validate_rh('testuser', 'testpass', basic_auth=True) + + # Assert the correct methods were called + mock_get_host.assert_called_once() + mock_get_rhsm.assert_not_called() + mock_get_satellite.assert_called_once_with('https://satellite.example.com', 'testuser', 'testpass') + mock_get_crc.assert_not_called() + mock_generate.assert_called_once_with([], is_candlepin=True) + + +def test_validate_rh_service_account_crc(): + """ + Assert get_crc_subs is called when + - basic_auth=False + """ + licenser = Licenser() + + with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object(licenser, 'get_host_from_rhsm_config') as mock_get_host, patch.object( + licenser, 'get_rhsm_subs' + ) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object( + licenser, 'get_crc_subs', return_value=[] + ) as mock_get_crc, patch.object( + licenser, 'generate_license_options_from_entitlements' + ) as mock_generate: + + mock_settings.SUBSCRIPTIONS_RHSM_URL = 'https://console.redhat.com/api/rhsm/v1/subscriptions' + + licenser.validate_rh('client_id', 'client_secret', basic_auth=False) + + # Assert the correct methods were called + mock_get_host.assert_not_called() + mock_get_rhsm.assert_not_called() + mock_get_satellite.assert_not_called() + mock_get_crc.assert_called_once_with('https://console.redhat.com/api/rhsm/v1/subscriptions', 'client_id', 'client_secret') + mock_generate.assert_called_once_with([], is_candlepin=False) + + +def test_validate_rh_missing_user_raises_error(): + """Test validate_rh raises ValueError when user is missing""" + licenser = Licenser() + + with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'): + try: + licenser.validate_rh(None, 'testpass', basic_auth=True) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert 'subscriptions_client_id or subscriptions_username is required' in str(e) + + +def test_validate_rh_missing_password_raises_error(): + """Test validate_rh raises ValueError when password is missing""" + licenser = Licenser() + + with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'): + try: + licenser.validate_rh('testuser', None, basic_auth=True) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert 'subscriptions_client_secret or subscriptions_password is required' in str(e) + + +def test_validate_rh_no_host_fallback_to_candlepin(): + """Test validate_rh falls back to REDHAT_CANDLEPIN_HOST when no host from config + - basic_auth=True + - no host from config + - REDHAT_CANDLEPIN_HOST is set + """ + licenser = Licenser() + + with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object( + licenser, 'get_host_from_rhsm_config', return_value=None + ) as mock_get_host, patch.object(licenser, 'get_rhsm_subs', return_value=[]) as mock_get_rhsm, patch.object( + licenser, 'get_satellite_subs', return_value=[] + ) as mock_get_satellite, patch.object( + licenser, 'get_crc_subs' + ) as mock_get_crc, patch.object( + licenser, 'generate_license_options_from_entitlements' + ) as mock_generate: + + mock_settings.REDHAT_CANDLEPIN_HOST = 'https://candlepin.example.com' + licenser.validate_rh('testuser', 'testpass', basic_auth=True) + + # Assert the correct methods were called + mock_get_host.assert_called_once() + mock_get_rhsm.assert_not_called() + mock_get_satellite.assert_called_once_with('https://candlepin.example.com', 'testuser', 'testpass') + mock_get_crc.assert_not_called() + mock_generate.assert_called_once_with([], is_candlepin=True) + + +def test_validate_rh_empty_credentials_basic_auth(): + """Test validate_rh with empty string credentials raises ValueError""" + licenser = Licenser() + + with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'): + # Test empty user + try: + licenser.validate_rh(None, 'testpass', basic_auth=True) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert 'subscriptions_client_id or subscriptions_username is required' in str(e) + + # Test empty password + try: + licenser.validate_rh('testuser', None, basic_auth=True) + assert False, "Expected ValueError to be raised" + except ValueError as e: + assert 'subscriptions_client_secret or subscriptions_password is required' in str(e) diff --git a/awx/main/utils/licensing.py b/awx/main/utils/licensing.py index 780ad97249..80b78757e2 100644 --- a/awx/main/utils/licensing.py +++ b/awx/main/utils/licensing.py @@ -219,30 +219,65 @@ class Licenser(object): kwargs['license_date'] = int(kwargs['license_date']) self._attrs.update(kwargs) - def validate_rh(self, user, pw): + def get_host_from_rhsm_config(self): try: host = 'https://' + str(self.config.get("server", "hostname")) except Exception: logger.exception('Cannot access rhsm.conf, make sure subscription manager is installed and configured.') host = None + return host + + def validate_rh(self, user, pw, basic_auth): + # if basic auth is True, host is read from rhsm.conf (subscription.rhsm.redhat.com) + # if basic auth is False, host is settings.SUBSCRIPTIONS_RHSM_URL (console.redhat.com) + # if rhsm.conf is not found, host is settings.REDHAT_CANDLEPIN_HOST (satellite server) + if basic_auth: + host = self.get_host_from_rhsm_config() + if not host: + host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None) + else: + host = settings.SUBSCRIPTIONS_RHSM_URL + if not host: - host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None) + raise ValueError('Could not get host url for subscriptions') if not user: - raise ValueError('subscriptions_client_id is required') + raise ValueError('subscriptions_client_id or subscriptions_username is required') if not pw: - raise ValueError('subscriptions_client_secret is required') + raise ValueError('subscriptions_client_secret or subscriptions_password is required') if host and user and pw: - if 'subscription.rhsm.redhat.com' in host: - json = self.get_rhsm_subs(settings.SUBSCRIPTIONS_RHSM_URL, user, pw) + if basic_auth: + if 'subscription.rhsm.redhat.com' in host: + json = self.get_rhsm_subs(host, user, pw) + else: + json = self.get_satellite_subs(host, user, pw) else: - json = self.get_satellite_subs(host, user, pw) - return self.generate_license_options_from_entitlements(json) + json = self.get_crc_subs(host, user, pw) + return self.generate_license_options_from_entitlements(json, is_candlepin=basic_auth) return [] - def get_rhsm_subs(self, host, client_id, client_secret): + def get_rhsm_subs(self, host, user, pw): + verify = getattr(settings, 'REDHAT_CANDLEPIN_VERIFY', True) + json = [] + try: + subs = requests.get('/'.join([host, 'subscription/users/{}/owners'.format(user)]), verify=verify, auth=(user, pw)) + except requests.exceptions.ConnectionError as error: + raise error + except OSError as error: + raise OSError( + 'Unable to open certificate bundle {}. Check that the service is running on Red Hat Enterprise Linux.'.format(verify) + ) from error # noqa + subs.raise_for_status() + + for sub in subs.json(): + resp = requests.get('/'.join([host, 'subscription/owners/{}/pools/?match=*tower*'.format(sub['key'])]), verify=verify, auth=(user, pw)) + resp.raise_for_status() + json.extend(resp.json()) + return json + + def get_crc_subs(self, host, client_id, client_secret): try: client = OIDCClient(client_id, client_secret) subs = client.make_request( @@ -320,12 +355,21 @@ class Licenser(object): json.append(license) return json + def is_appropriate_sub(self, sub): + if sub['activeSubscription'] is False: + return False + # Products that contain Ansible Tower + products = sub.get('providedProducts', []) + if any(product.get('productId') == '480' for product in products): + return True + return False + def is_appropriate_sat_sub(self, sub): if 'Red Hat Ansible Automation' not in sub['subscription_name']: return False return True - def generate_license_options_from_entitlements(self, json): + def generate_license_options_from_entitlements(self, json, is_candlepin=False): from dateutil.parser import parse ValidSub = collections.namedtuple( @@ -336,12 +380,14 @@ class Licenser(object): satellite = sub.get('satellite') if satellite: is_valid = self.is_appropriate_sat_sub(sub) + elif is_candlepin: + is_valid = self.is_appropriate_sub(sub) else: - # the list of subs from console.redhat.com are already valid based on the query params we provided + # the list of subs from console.redhat.com and subscriptions.rhsm.redhat.com are already valid based on the query params we provided is_valid = True if is_valid: try: - if satellite: + if is_candlepin: end_date = parse(sub.get('endDate')) else: end_date = parse(sub['subscriptions']['endDate']) @@ -354,10 +400,10 @@ class Licenser(object): continue developer_license = False - support_level = '' + support_level = sub.get('support_level', '') account_number = '' usage = sub.get('usage', '') - if satellite: + if is_candlepin: try: quantity = int(sub['quantity']) except Exception: @@ -365,7 +411,6 @@ class Licenser(object): sku = sub['productId'] subscription_id = sub['subscriptionId'] sub_name = sub['productName'] - support_level = sub['support_level'] account_number = sub['accountNumber'] else: try: @@ -434,6 +479,8 @@ class Licenser(object): license.update(subscription_id=sub.subscription_id) license.update(account_number=sub.account_number) licenses.append(license._attrs.copy()) + # sort by sku + licenses.sort(key=lambda x: x['sku']) return licenses raise ValueError('No valid Red Hat Ansible Automation subscription could be found for this account.') # noqa diff --git a/awx_collection/plugins/modules/subscriptions.py b/awx_collection/plugins/modules/subscriptions.py index 761fb9cc3c..1834d893df 100644 --- a/awx_collection/plugins/modules/subscriptions.py +++ b/awx_collection/plugins/modules/subscriptions.py @@ -19,18 +19,27 @@ short_description: Get subscription list description: - Get subscriptions available to Automation Platform Controller. See U(https://www.ansible.com/tower) for an overview. + - The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions options: + username: + description: + - Red Hat username to get available subscriptions. + required: False + type: str + password: + description: + - Red Hat password to get available subscriptions. + required: False + type: str client_id: description: - - Red Hat service account client ID or Red Hat Satellite username to get available subscriptions. - - The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions - required: True + - Red Hat service account client ID to get available subscriptions. + required: False type: str client_secret: description: - - Red Hat service account client secret or Red Hat Satellite password to get available subscriptions. - - The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions - required: True + - Red Hat service account client secret to get available subscriptions. + required: False type: str filters: description: @@ -72,19 +81,41 @@ def main(): module = ControllerAPIModule( argument_spec=dict( - client_id=dict(type='str', required=True), - client_secret=dict(type='str', no_log=True, required=True), + username=dict(type='str', required=False), + password=dict(type='str', no_log=True, required=False), + client_id=dict(type='str', required=False), + client_secret=dict(type='str', no_log=True, required=False), filters=dict(type='dict', required=False, default={}), ), + mutually_exclusive=[ + ['username', 'client_id'] + ], + required_together=[ + ['username', 'password'], + ['client_id', 'client_secret'] + ], + required_one_of=[ + ['username', 'client_id'] + ], ) json_output = {'changed': False} + username = module.params.get('username') + password = module.params.get('password') + client_id = module.params.get('client_id') + client_secret = module.params.get('client_secret') + + if username and password: + post_data = { + 'subscriptions_username': username, + 'subscriptions_password': password, + } + else: + post_data = { + 'subscriptions_client_id': client_id, + 'subscriptions_client_secret': client_secret, + } - # Check if Tower is already licensed - post_data = { - 'subscriptions_client_secret': module.params.get('client_secret'), - 'subscriptions_client_id': module.params.get('client_id'), - } all_subscriptions = module.post_endpoint('config/subscriptions', data=post_data)['json'] json_output['subscriptions'] = [] for subscription in all_subscriptions: