Fix Subscriptions credentials fallback (#15980)

Ensure service account authentication is being used
when falling back to using SUBSCRIPTIONS_CLIENT_ID.

Additional change:
Subscription data can return two types of capacities:
Sockets and Nodes

For determining overall capacity
if capacity name is Nodes:
  capacity quantity x subscription quantity
if capacity name is Sockets:
  capacity quantity / 2 (minimum of 1) x subscription quantity

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster 2025-05-14 11:59:19 -04:00 committed by GitHub
parent 537850c650
commit 6377824af5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 97 additions and 15 deletions

View File

@ -202,9 +202,15 @@ class AnalyticsGenericView(APIView):
if method not in ["GET", "POST", "OPTIONS"]:
return self._error_response(ERROR_UNSUPPORTED_METHOD, method, remote=False, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
url = self._get_analytics_url(request.path)
using_subscriptions_credentials = False
try:
rh_user = self._get_setting('REDHAT_USERNAME', None, ERROR_MISSING_USER)
rh_password = self._get_setting('REDHAT_PASSWORD', None, ERROR_MISSING_PASSWORD)
rh_user = getattr(settings, 'REDHAT_USERNAME', None)
rh_password = getattr(settings, 'REDHAT_PASSWORD', None)
if not (rh_user and rh_password):
rh_user = self._get_setting('SUBSCRIPTIONS_CLIENT_ID', None, ERROR_MISSING_USER)
rh_password = self._get_setting('SUBSCRIPTIONS_CLIENT_SECRET', None, ERROR_MISSING_PASSWORD)
using_subscriptions_credentials = True
client = OIDCClient(rh_user, rh_password)
response = client.make_request(
method,
@ -216,17 +222,17 @@ class AnalyticsGenericView(APIView):
timeout=(31, 31),
)
except requests.RequestException:
logger.error("Automation Analytics API request failed, trying base auth method")
response = self._base_auth_request(request, method, url, rh_user, rh_password, headers)
except MissingSettings:
rh_user = self._get_setting('SUBSCRIPTIONS_CLIENT_ID', None, ERROR_MISSING_USER)
rh_password = self._get_setting('SUBSCRIPTIONS_CLIENT_SECRET', None, ERROR_MISSING_PASSWORD)
response = self._base_auth_request(request, method, url, rh_user, rh_password, headers)
# subscriptions credentials are not valid for basic auth, so just return 401
if using_subscriptions_credentials:
response = Response(status=status.HTTP_401_UNAUTHORIZED)
else:
logger.error("Automation Analytics API request failed, trying base auth method")
response = self._base_auth_request(request, method, url, rh_user, rh_password, headers)
#
# Missing or wrong user/pass
#
if response.status_code == status.HTTP_401_UNAUTHORIZED:
text = (response.text or '').rstrip("\n")
text = response.get('text', '').rstrip("\n")
return self._error_response(ERROR_UNAUTHORIZED, text, remote=True, remote_status_code=response.status_code)
#
# Not found, No entitlement or No data in Analytics

View File

@ -155,26 +155,36 @@ class TestAnalyticsGenericView:
)
@pytest.mark.django_db
def test__send_to_analytics_credentials(self, settings_map, expected_auth, expected_error_keyword):
"""
Test _send_to_analytics with various combinations of credentials.
"""
with override_settings(**settings_map):
request = RequestFactory().post('/some/path')
view = AnalyticsGenericView()
if expected_auth:
with mock.patch('requests.request') as mock_request:
mock_request.return_value = mock.Mock(status_code=200)
with mock.patch('awx.api.views.analytics.OIDCClient') as mock_oidc_client:
# Configure the mock OIDCClient instance and its make_request method
mock_client_instance = mock.Mock()
mock_oidc_client.return_value = mock_client_instance
mock_client_instance.make_request.return_value = mock.Mock(status_code=200)
analytic_url = view._get_analytics_url(request.path)
response = view._send_to_analytics(request, 'POST')
# Assertions
mock_request.assert_called_once_with(
# Assert OIDCClient instantiation
expected_client_id, expected_client_secret = expected_auth
mock_oidc_client.assert_called_once_with(expected_client_id, expected_client_secret)
# Assert make_request call
mock_client_instance.make_request.assert_called_once_with(
'POST',
analytic_url,
auth=expected_auth,
verify=mock.ANY,
headers=mock.ANY,
json=mock.ANY,
verify=mock.ANY,
params=mock.ANY,
json=mock.ANY,
timeout=mock.ANY,
)
assert response.status_code == 200
@ -186,3 +196,64 @@ class TestAnalyticsGenericView:
# mock_error_response.assert_called_once_with(expected_error_keyword, remote=False)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.data['error']['keyword'] == expected_error_keyword
@pytest.mark.django_db
@pytest.mark.parametrize(
"settings_map, expected_auth",
[
# Test case 1: Username and password should be used for basic auth
(
{
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_CLIENT_SECRET': '',
},
('redhat_user', 'redhat_pass'),
),
# Test case 2: Client ID and secret should be used for basic auth
(
{
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
},
None,
),
],
)
def test__send_to_analytics_fallback_to_basic_auth(self, settings_map, expected_auth):
"""
Test _send_to_analytics with basic auth fallback.
"""
with override_settings(**settings_map):
request = RequestFactory().post('/some/path')
view = AnalyticsGenericView()
with mock.patch('awx.api.views.analytics.OIDCClient') as mock_oidc_client, mock.patch(
'awx.api.views.analytics.AnalyticsGenericView._base_auth_request'
) as mock_base_auth_request:
# Configure the mock OIDCClient instance and its make_request method
mock_client_instance = mock.Mock()
mock_oidc_client.return_value = mock_client_instance
mock_client_instance.make_request.side_effect = requests.RequestException("Incorrect credentials")
analytic_url = view._get_analytics_url(request.path)
view._send_to_analytics(request, 'POST')
if expected_auth:
# assert mock_base_auth_request called with expected_auth
mock_base_auth_request.assert_called_once_with(
request,
'POST',
analytic_url,
expected_auth[0],
expected_auth[1],
mock.ANY,
)
else:
# assert mock_base_auth_request not called
mock_base_auth_request.assert_not_called()

View File

@ -360,8 +360,13 @@ class Licenser(object):
account_number = sub['accountNumber']
else:
try:
# Determine total quantity based on capacity name
# if capacity name is Nodes, capacity quantity x subscription quantity
# if capacity name is Sockets, capacity quantity / 2 (minimum of 1) x subscription quantity
if sub['capacity']['name'] == "Nodes":
quantity = int(sub['capacity']['quantity']) * int(sub['subscriptions']['quantity'])
elif sub['capacity']['name'] == "Sockets":
quantity = max(int(sub['capacity']['quantity']) / 2, 1) * int(sub['subscriptions']['quantity'])
else:
continue
except Exception: