Fallback to use subscription cred for analytic upload (#15479) (#6668)

* Fallback to use subscription cred for analytic

Fall back to use SUBSCRIPTION_USERNAME/PASSWORD to upload analytic to if REDHAT_USERNAME/PASSWORD are not set

* Improve error message

* Guard against request with no query or data

* Add test for _send_to_analytics

Focus on credentials

* Supress sonarcloud warning about password

* Add test for analytic ship

Co-authored-by: Hao Liu <44379968+TheRealHaoLiu@users.noreply.github.com>
This commit is contained in:
Peter Braun
2024-08-30 14:18:42 +02:00
committed by GitHub
parent 85bd7c3ca0
commit 64d2e10dc2
4 changed files with 208 additions and 9 deletions

View File

@@ -185,8 +185,12 @@ class AnalyticsGenericView(APIView):
self._get_setting('INSIGHTS_TRACKING_STATE', False, ERROR_UPLOAD_NOT_ENABLED) self._get_setting('INSIGHTS_TRACKING_STATE', False, ERROR_UPLOAD_NOT_ENABLED)
url = self._get_analytics_url(request.path) url = self._get_analytics_url(request.path)
rh_user = self._get_setting('REDHAT_USERNAME', None, ERROR_MISSING_USER) try:
rh_password = self._get_setting('REDHAT_PASSWORD', None, ERROR_MISSING_PASSWORD) rh_user = self._get_setting('REDHAT_USERNAME', None, ERROR_MISSING_USER)
rh_password = self._get_setting('REDHAT_PASSWORD', None, ERROR_MISSING_PASSWORD)
except MissingSettings:
rh_user = self._get_setting('SUBSCRIPTIONS_USERNAME', None, ERROR_MISSING_USER)
rh_password = self._get_setting('SUBSCRIPTIONS_PASSWORD', None, ERROR_MISSING_PASSWORD)
if method not in ["GET", "POST", "OPTIONS"]: if method not in ["GET", "POST", "OPTIONS"]:
return self._error_response(ERROR_UNSUPPORTED_METHOD, method, remote=False, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) return self._error_response(ERROR_UNSUPPORTED_METHOD, method, remote=False, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
@@ -196,9 +200,9 @@ class AnalyticsGenericView(APIView):
url, url,
auth=(rh_user, rh_password), auth=(rh_user, rh_password),
verify=settings.INSIGHTS_CERT_PATH, verify=settings.INSIGHTS_CERT_PATH,
params=request.query_params, params=getattr(request, 'query_params', {}),
headers=headers, headers=headers,
json=request.data, json=getattr(request, 'data', {}),
timeout=(31, 31), timeout=(31, 31),
) )
# #

View File

@@ -181,7 +181,10 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
logger.log(log_level, "Automation Analytics not enabled. Use --dry-run to gather locally without sending.") logger.log(log_level, "Automation Analytics not enabled. Use --dry-run to gather locally without sending.")
return None return None
if not (settings.AUTOMATION_ANALYTICS_URL and settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD): if not (
settings.AUTOMATION_ANALYTICS_URL
and ((settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD) or (settings.SUBSCRIPTION_USERNAME and settings.SUBSCRIPTION_PASSWORD))
):
logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.") logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.")
return None return None
@@ -361,14 +364,22 @@ def ship(path):
if not url: if not url:
logger.error('AUTOMATION_ANALYTICS_URL is not set') logger.error('AUTOMATION_ANALYTICS_URL is not set')
return False return False
rh_user = getattr(settings, 'REDHAT_USERNAME', None) rh_user = getattr(settings, 'REDHAT_USERNAME', None)
rh_password = getattr(settings, 'REDHAT_PASSWORD', None) rh_password = getattr(settings, 'REDHAT_PASSWORD', None)
if rh_user is None or rh_password is None:
logger.info('REDHAT_USERNAME and REDHAT_PASSWORD are not set, using SUBSCRIPTION_USERNAME and SUBSCRIPTION_PASSWORD')
rh_user = getattr(settings, 'SUBSCRIPTION_USERNAME', None)
rh_password = getattr(settings, 'SUBSCRIPTION_PASSWORD', None)
if not rh_user: if not rh_user:
logger.error('REDHAT_USERNAME is not set') logger.error('REDHAT_USERNAME and SUBSCRIPTIONS_USERNAME are not set')
return False return False
if not rh_password: if not rh_password:
logger.error('REDHAT_PASSWORD is not set') logger.error('REDHAT_PASSWORD and SUBSCRIPTIONS_USERNAME are not set')
return False return False
with open(path, 'rb') as f: with open(path, 'rb') as f:
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
s = requests.Session() s = requests.Session()

View File

@@ -2,11 +2,13 @@ import importlib
import json import json
import os import os
import tarfile import tarfile
import tempfile
from unittest import mock from unittest import mock
import pytest import pytest
from django.conf import settings from django.conf import settings
from awx.main.analytics import gather, register from django.test.utils import override_settings
from awx.main.analytics import gather, register, ship
@register('example', '1.0') @register('example', '1.0')
@@ -57,3 +59,83 @@ def test_gather(mock_valid_license):
os.remove(tgz) os.remove(tgz)
except Exception: except Exception:
pass pass
@pytest.fixture
def temp_analytic_tar():
# Create a temporary file and yield its path
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(b"data")
temp_file_path = temp_file.name
yield temp_file_path
# Clean up the temporary file after the test
os.remove(temp_file_path)
@pytest.fixture
def mock_analytic_post():
# Patch the Session.post method to return a mock response with status_code 200
with mock.patch('awx.main.analytics.core.requests.Session.post', return_value=mock.Mock(status_code=200)) as mock_post:
yield mock_post
@pytest.mark.parametrize(
"setting_map, expected_result, expected_auth",
[
# Test case 1: Valid Red Hat credentials
(
{
'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTION_USERNAME': None,
'SUBSCRIPTION_PASSWORD': None,
},
True,
('redhat_user', 'redhat_pass'),
),
# Test case 2: Valid Subscription credentials
(
{
'REDHAT_USERNAME': None,
'REDHAT_PASSWORD': None,
'SUBSCRIPTION_USERNAME': 'subs_user',
'SUBSCRIPTION_PASSWORD': 'subs_pass', # NOSONAR
},
True,
('subs_user', 'subs_pass'),
),
# Test case 3: No credentials
(
{
'REDHAT_USERNAME': None,
'REDHAT_PASSWORD': None,
'SUBSCRIPTION_USERNAME': None,
'SUBSCRIPTION_PASSWORD': None,
},
False,
None, # No request should be made
),
# Test case 4: Mixed credentials
(
{
'REDHAT_USERNAME': None,
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTION_USERNAME': 'subs_user',
'SUBSCRIPTION_PASSWORD': None,
},
False,
None, # Invalid, no request should be made
),
],
)
@pytest.mark.django_db
def test_ship_credential(setting_map, expected_result, expected_auth, temp_analytic_tar, mock_analytic_post):
with override_settings(**setting_map):
result = ship(temp_analytic_tar)
assert result == expected_result
if expected_auth:
mock_analytic_post.assert_called_once()
assert mock_analytic_post.call_args[1]['auth'] == expected_auth
else:
mock_analytic_post.assert_not_called()

View File

@@ -1,7 +1,10 @@
import pytest import pytest
import requests import requests
from awx.api.views.analytics import AnalyticsGenericView, MissingSettings, AUTOMATION_ANALYTICS_API_URL_PATH from unittest import mock
from awx.api.views.analytics import AnalyticsGenericView, MissingSettings, AUTOMATION_ANALYTICS_API_URL_PATH, ERROR_MISSING_USER, ERROR_MISSING_PASSWORD
from django.test.utils import override_settings from django.test.utils import override_settings
from django.test import RequestFactory
from rest_framework import status
from awx.main.utils import get_awx_version from awx.main.utils import get_awx_version
from django.utils import translation from django.utils import translation
@@ -84,3 +87,102 @@ class TestAnalyticsGenericView:
AnalyticsGenericView._get_setting(setting_name, False, None) AnalyticsGenericView._get_setting(setting_name, False, None)
else: else:
assert AnalyticsGenericView._get_setting(setting_name, False, None) == setting_value assert AnalyticsGenericView._get_setting(setting_name, False, None) == setting_value
@pytest.mark.parametrize(
"settings_map, expected_auth, expected_error_keyword",
[
# Test case 1: Valid Red Hat credentials
(
{
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': '',
'SUBSCRIPTIONS_PASSWORD': '',
},
('redhat_user', 'redhat_pass'),
None,
),
# Test case 2: Valid Subscription credentials
(
{
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR
},
('subs_user', 'subs_pass'),
None,
),
# Test case 3: No credentials
(
{
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': '',
'SUBSCRIPTIONS_PASSWORD': '',
},
None,
ERROR_MISSING_USER,
),
# Test case 4: Both credentials
(
{
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR
},
('redhat_user', 'redhat_pass'),
None,
),
# Test case 5: Missing password
(
{
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': 'subs_user', # NOSONAR
'SUBSCRIPTIONS_PASSWORD': '',
},
None,
ERROR_MISSING_PASSWORD,
),
],
)
@pytest.mark.django_db
def test__send_to_analytics_credentials(self, settings_map, expected_auth, expected_error_keyword):
with override_settings(**settings_map):
request = RequestFactory().post('/some/path')
view = AnalyticsGenericView()
if expected_auth:
with mock.patch('requests.request') as mock_request:
mock_request.return_value = mock.Mock(status_code=200)
analytic_url = view._get_analytics_url(request.path)
response = view._send_to_analytics(request, 'POST')
# Assertions
mock_request.assert_called_once_with(
'POST',
analytic_url,
auth=expected_auth,
verify=mock.ANY,
headers=mock.ANY,
json=mock.ANY,
params=mock.ANY,
timeout=mock.ANY,
)
assert response.status_code == 200
else:
# Test when settings are missing and MissingSettings is raised
response = view._send_to_analytics(request, 'POST')
# # Assert that _error_response is called when MissingSettings is raised
# mock_error_response.assert_called_once_with(expected_error_keyword, remote=False)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert response.data['error']['keyword'] == expected_error_keyword