diff --git a/awx/api/views/analytics.py b/awx/api/views/analytics.py index 6a855eab70..890307dc7c 100644 --- a/awx/api/views/analytics.py +++ b/awx/api/views/analytics.py @@ -49,7 +49,6 @@ class GetNotAllowedMixin(object): class AnalyticsRootView(APIView): permission_classes = (AnalyticsPermission,) name = _('Automation Analytics') - swagger_topic = 'Automation Analytics' resource_purpose = 'automation analytics endpoints' @extend_schema_if_available(extensions={"x-ai-description": "A list of additional API endpoints related to analytics"}) @@ -306,7 +305,6 @@ class AnalyticsAuthorizedView(AnalyticsGenericListView): class AnalyticsReportsList(GetNotAllowedMixin, AnalyticsGenericListView): name = _("Reports") - swagger_topic = "Automation Analytics" resource_purpose = 'automation analytics reports' diff --git a/awx/main/analytics/core.py b/awx/main/analytics/core.py index c5ecddd8b5..4407ab6257 100644 --- a/awx/main/analytics/core.py +++ b/awx/main/analytics/core.py @@ -8,6 +8,7 @@ import pathlib import shutil import tarfile import tempfile +from urllib.parse import urlparse, urlunparse from django.conf import settings from django.core.serializers.json import DjangoJSONEncoder @@ -23,6 +24,8 @@ from awx.main.models import Job from awx.main.access import access_registry from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook from awx.main.utils.analytics_proxy import OIDCClient +from awx.main.utils.candlepin import get_or_generate_candlepin_certificate +from awx.main.utils.candlepin.client import _temp_cert_files __all__ = ['register', 'gather', 'ship'] @@ -41,6 +44,76 @@ def _valid_license(): return True +def _get_cert_upload_url(url): + """ + Convert analytics URL to use 'cert.' subdomain for mTLS uploads. + + Some analytics services use different hostnames for different auth methods: + - cert.example.com - for mTLS (certificate-based) uploads + - example.com - for OIDC (token-based) uploads + + Args: + url: Original analytics URL + + Returns: + URL with 'cert.' prepended to hostname if not already present + """ + try: + parsed = urlparse(url) + hostname = parsed.hostname + + # Only modify if hostname doesn't already start with 'cert.' + if hostname and not hostname.startswith('cert.'): + new_hostname = f'cert.{hostname}' + # Reconstruct URL with new hostname + netloc = new_hostname + if parsed.port: + netloc = f'{new_hostname}:{parsed.port}' + + new_parsed = parsed._replace(netloc=netloc) + return urlunparse(new_parsed) + + return url + except Exception as e: + logger.warning(f'Could not modify URL for cert upload: {e}, using original URL') + return url + + +def _get_analytics_credentials(): + """ + Get Red Hat Insights credentials from settings. + + Attempts to retrieve credentials in the following priority order: + 1. REDHAT_USERNAME / REDHAT_PASSWORD + 2. SUBSCRIPTIONS_USERNAME / SUBSCRIPTIONS_PASSWORD + 3. SUBSCRIPTIONS_CLIENT_ID / SUBSCRIPTIONS_CLIENT_SECRET + + Returns: + tuple: (username, password) if credentials are found, (None, None) otherwise + """ + rh_id = getattr(settings, 'REDHAT_USERNAME', None) + rh_secret = getattr(settings, 'REDHAT_PASSWORD', None) + + if rh_id and rh_secret: + return rh_id, rh_secret + + # Try SUBSCRIPTIONS_USERNAME / SUBSCRIPTIONS_PASSWORD + rh_id = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None) + rh_secret = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None) + + if rh_id and rh_secret: + return rh_id, rh_secret + + # Try SUBSCRIPTIONS_CLIENT_ID / SUBSCRIPTIONS_CLIENT_SECRET + rh_id = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None) + rh_secret = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None) + + if rh_id and rh_secret: + return rh_id, rh_secret + + return None, None + + def all_collectors(): from awx.main.analytics import collectors @@ -184,10 +257,8 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti logger.log(log_level, "Automation Analytics not enabled. Use --dry-run to gather locally without sending.") return None - if not ( - settings.AUTOMATION_ANALYTICS_URL - and ((settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD) or (settings.SUBSCRIPTIONS_CLIENT_ID and settings.SUBSCRIPTIONS_CLIENT_SECRET)) - ): + rh_id, rh_secret = _get_analytics_credentials() + if not (settings.AUTOMATION_ANALYTICS_URL and rh_id and rh_secret): logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.") return None @@ -368,19 +439,14 @@ def ship(path): logger.error('AUTOMATION_ANALYTICS_URL is not set') return False - rh_id = getattr(settings, 'REDHAT_USERNAME', None) - rh_secret = getattr(settings, 'REDHAT_PASSWORD', None) - - if not (rh_id and rh_secret): - rh_id = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None) - rh_secret = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None) + rh_id, rh_secret = _get_analytics_credentials() if not rh_id: - logger.error('Neither REDHAT_USERNAME nor SUBSCRIPTIONS_CLIENT_ID are set') + logger.error('No valid username found. Tried: REDHAT_USERNAME, SUBSCRIPTIONS_USERNAME, SUBSCRIPTIONS_CLIENT_ID') return False if not rh_secret: - logger.error('Neither REDHAT_PASSWORD nor SUBSCRIPTIONS_CLIENT_SECRET are set') + logger.error('No valid password found. Tried: REDHAT_PASSWORD, SUBSCRIPTIONS_PASSWORD, SUBSCRIPTIONS_CLIENT_SECRET') return False with open(path, 'rb') as f: @@ -388,17 +454,40 @@ def ship(path): s = requests.Session() s.headers = get_awx_http_client_headers() s.headers.pop('Content-Type') + with set_environ(**settings.AWX_TASK_ENV): + # Try Certificate-based mTLS authentication (zero-touch) + cert_pem, key_pem = get_or_generate_candlepin_certificate() + if cert_pem and key_pem: + # Use cert. subdomain for mTLS uploads + cert_url = _get_cert_upload_url(url) + logger.debug("Attempting certificate-based authentication for analytics upload") + try: + with _temp_cert_files(cert_pem, key_pem) as (cert_path, key_path): + response = s.post( + cert_url, files=files, cert=(cert_path, key_path), verify=settings.INSIGHTS_CERT_PATH, headers=s.headers, timeout=(31, 31) + ) + if response.status_code < 300: + return True + else: + logger.warning( + f'Certificate-based authentication failed with status {response.status_code}, {response.text}. Falling back to OIDC auth' + ) + except Exception as e: + logger.warning(f"Certificate-based authentication failed: {e}, falling back to OIDC auth") + + # Try OIDC authentication + logger.debug("Attempting OIDC authentication for analytics upload") + f.seek(0) # requests POST may read from the handler, so seek to beginning of file for the next POST attempt try: client = OIDCClient(rh_id, rh_secret) response = client.make_request("POST", url, headers=s.headers, files=files, verify=settings.INSIGHTS_CERT_PATH, timeout=(31, 31)) - except requests.RequestException: - logger.error("Automation Analytics API request failed, trying base auth method") - response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_id, rh_secret), headers=s.headers, timeout=(31, 31)) - # Accept 2XX status_codes - if response.status_code >= 300: - logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text)) - return False - - return True + if response.status_code < 300: + return True + else: + logger.error(f'OIDC authentication failed with status {response.status_code}, {response.text}') + return False + except requests.RequestException as e: + logger.error(f"OIDC authentication failed: {e}") + return False diff --git a/awx/main/conf.py b/awx/main/conf.py index f26e9f53a6..0ef98cc854 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -213,6 +213,40 @@ register( category_slug='system', ) +register( + 'AWX_ANALYTICS_CANDLEPIN_CA', + field_class=fields.CharField, + default='/etc/rhsm/ca/redhat-uep.pem', + allow_blank=True, + label=_('Candlepin CA Certificate Path'), + help_text=_('Path to the CA certificate file for verifying TLS connections to Candlepin. Leave blank to use system certificates.'), + category=_('System'), + category_slug='system', +) + +register( + 'AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS', + field_class=fields.IntegerField, + default=90, + min_value=1, + label=_('Candlepin Certificate Renewal Threshold'), + help_text=_('Number of days before certificate expiry to trigger automatic renewal of Candlepin identity certificates.'), + category=_('System'), + category_slug='system', + unit=_('days'), +) + +register( + 'AWX_ANALYTICS_CANDLEPIN_PROXY_URL', + field_class=fields.CharField, + default='', + allow_blank=True, + label=_('Candlepin Proxy URL'), + help_text=_('HTTP/HTTPS proxy URL for Candlepin API requests (e.g., http://proxy.example.com:8080). Leave blank for no proxy.'), + category=_('System'), + category_slug='system', +) + register( 'INSTALL_UUID', field_class=fields.CharField, @@ -824,6 +858,58 @@ register( unit=_('seconds'), ) +register( + 'CANDLEPIN_CONSUMER_UUID', + field_class=fields.CharField, + default='', + allow_blank=True, + encrypted=False, + label=_('Candlepin Consumer UUID'), + help_text=_('UUID of the registered Candlepin consumer for this AAP instance.'), + category=_('System'), + category_slug='system', + hidden=True, +) + +register( + 'CANDLEPIN_CERT_PEM', + field_class=fields.CharField, + default='', + allow_blank=True, + encrypted=True, + label=_('Candlepin Identity Certificate'), + help_text=_('PEM-encoded Candlepin identity certificate for mTLS authentication.'), + category=_('System'), + category_slug='system', + hidden=True, +) + +register( + 'CANDLEPIN_KEY_PEM', + field_class=fields.CharField, + default='', + allow_blank=True, + encrypted=True, + label=_('Candlepin Identity Key'), + help_text=_('PEM-encoded private key for Candlepin identity certificate.'), + category=_('System'), + category_slug='system', + hidden=True, +) + +register( + 'CANDLEPIN_SERIAL_NUMBER', + field_class=fields.CharField, + default='', + allow_blank=True, + encrypted=False, + label=_('Candlepin Certificate Serial Number'), + help_text=_('Serial number of the Candlepin identity certificate for tracking.'), + category=_('System'), + category_slug='system', + hidden=True, +) + register( 'IS_K8S', field_class=fields.BooleanField, diff --git a/awx/main/management/commands/candlepin_cert.py b/awx/main/management/commands/candlepin_cert.py new file mode 100644 index 0000000000..7b09d66bea --- /dev/null +++ b/awx/main/management/commands/candlepin_cert.py @@ -0,0 +1,330 @@ +import sys + +from argparse import RawDescriptionHelpFormatter + +from django.core.management.base import BaseCommand + +from awx.main.utils.candlepin.client import CandlepinClient +from awx.main.utils.candlepin.lifecycle import ( + get_candlepin_ca, + get_candlepin_url, + get_proxy_url, + get_renewal_days, + needs_renewal, + parse_cert, +) +from awx.main.utils.candlepin import ( + _fetch_candlepin_cert_from_db, + _save_candlepin_cert_to_db, + _save_candlepin_registration_to_db, + resolve_registration_credentials, +) + + +class Command(BaseCommand): + """ + Manage Candlepin consumer registration and certificate lifecycle. + + Subcommands: + register Register this AAP instance as a Candlepin consumer and obtain an + identity certificate for mTLS analytics uploads. + renew Perform a manual check-in and, if needed, renew the stored identity + certificate. + """ + + help = 'Manage Candlepin consumer registration and certificate lifecycle' + + def create_parser(self, prog_name, subcommand, **kwargs): + return super().create_parser( + prog_name, + subcommand, + formatter_class=RawDescriptionHelpFormatter, + epilog='\n'.join( + [ + 'SUBCOMMANDS', + '', + ' register Register this instance as a Candlepin consumer.', + ' Credentials are read from AWX database by default', + ' (REDHAT_USERNAME, REDHAT_PASSWORD). The organization is', + ' discovered automatically from the Candlepin account.', + ' Pass --username / --password-stdin / --org to override.', + ' Example: echo "password" | awx-manage candlepin_cert register --username user --password-stdin', + '', + ' renew Perform a manual check-in and proactive cert renewal.', + ' Reads the stored cert/key/UUID from database.', + ' Use --force to renew even if the cert is not near expiry.', + '', + 'CONFIGURATION', + '', + ' Settings can be configured via Django settings (awx/settings/defaults.py):', + '', + ' AWX_ANALYTICS_CANDLEPIN_URL Candlepin base URL', + ' (default: https://subscription.example.com/candlepin)', + ' AWX_ANALYTICS_CANDLEPIN_CA Path to Candlepin CA cert for TLS verification', + ' AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS Days before expiry to trigger renewal (default: 90)', + ' AWX_ANALYTICS_CANDLEPIN_PROXY_URL HTTP/HTTPS proxy for Candlepin API calls', + ] + ), + **kwargs, + ) + + def add_arguments(self, parser): + subparsers = parser.add_subparsers(dest='subcommand', metavar='subcommand') + subparsers.required = True + + # --- register --- + reg = subparsers.add_parser( + 'register', + help='Register this instance as a Candlepin consumer', + formatter_class=RawDescriptionHelpFormatter, + ) + reg.add_argument('--username', help='Red Hat subscription username (overrides REDHAT_USERNAME from database)') + reg.add_argument( + '--password-stdin', dest='password_stdin', action='store_true', help='Read password from stdin (overrides REDHAT_PASSWORD from database)' + ) + reg.add_argument('--org', help='Candlepin owner/org key (overrides auto-discovered organization)') + reg.add_argument('--candlepin-url', dest='candlepin_url', help='Candlepin base URL (overrides AWX_ANALYTICS_CANDLEPIN_URL setting)') + reg.add_argument( + '--candlepin-ca', dest='candlepin_ca', help='Path to Candlepin CA cert for TLS verification (overrides AWX_ANALYTICS_CANDLEPIN_CA setting)' + ) + reg.add_argument('--proxy', help='HTTP/HTTPS proxy URL (overrides AWX_ANALYTICS_CANDLEPIN_PROXY_URL setting)') + reg.add_argument('--no-verify-tls', dest='no_verify_tls', action='store_true', help='Disable TLS certificate verification for Candlepin API calls') + reg.add_argument('--force', action='store_true', help='Re-register even if a certificate already exists in database') + reg.add_argument('--dry-run', dest='dry_run', action='store_true', help='Perform registration but do not save the result to database') + + # --- renew --- + ren = subparsers.add_parser( + 'renew', + help='Check in and renew the Candlepin identity certificate', + formatter_class=RawDescriptionHelpFormatter, + ) + ren.add_argument('--candlepin-url', dest='candlepin_url', help='Candlepin base URL (overrides AWX_ANALYTICS_CANDLEPIN_URL setting)') + ren.add_argument( + '--candlepin-ca', dest='candlepin_ca', help='Path to Candlepin CA cert for TLS verification (overrides AWX_ANALYTICS_CANDLEPIN_CA setting)' + ) + ren.add_argument('--proxy', help='HTTP/HTTPS proxy URL (overrides AWX_ANALYTICS_CANDLEPIN_PROXY_URL setting)') + ren.add_argument('--no-verify-tls', dest='no_verify_tls', action='store_true', help='Disable TLS certificate verification for Candlepin API calls') + ren.add_argument('--force', action='store_true', help='Renew the certificate even if it is not near expiry') + ren.add_argument('--dry-run', dest='dry_run', action='store_true', help='Perform check-in and renewal but do not save the result to database') + + def handle(self, *args, **options): + subcommand = options['subcommand'] + if subcommand == 'register': + ok = self._handle_register(options) + elif subcommand == 'renew': + ok = self._handle_renew(options) + else: + self.stderr.write(f'Unknown subcommand: {subcommand}') + sys.exit(1) + + if not ok: + sys.exit(1) + + # ------------------------------------------------------------------ + # register + # ------------------------------------------------------------------ + + def _resolve_and_validate_credentials(self, options): + """Merge CLI options with DB values and validate all required fields are present. + + Returns ``(username, password, org, db_install_uuid)`` on success, or ``None`` + if any required field is missing (errors are written to ``self.stderr``). + """ + username_override = options.get('username') + org_override = options.get('org') + verify_tls = not options.get('no_verify_tls', False) + + # Read password from stdin if --password-stdin is set + if options.get('password_stdin'): + password_override = sys.stdin.read().strip() + if not password_override: + self.stderr.write('--password-stdin specified but no password provided on stdin') + return None + else: + password_override = None + + # Use shared resolution and validation function + username, password, org, install_uuid, errors = resolve_registration_credentials( + username_override=username_override, password_override=password_override, org_override=org_override, verify_tls=verify_tls + ) + + if errors: + for error in errors: + self.stderr.write(f'Missing required value: {error}') + return None + + return username, password, org, install_uuid + + def _handle_register(self, options): + dry_run = options['dry_run'] + force = options['force'] + + # Check whether a cert is already stored unless --force. + existing_cert, existing_key, _ = _fetch_candlepin_cert_from_db() + if existing_cert and existing_key and not force: + self.stdout.write('A Candlepin identity certificate is already stored in database. Use --force to re-register and replace it.') + return True + + # Resolve credentials: CLI flags take precedence over database. + resolved = self._resolve_and_validate_credentials(options) + if resolved is None: + return False + username, password, org, db_install_uuid = resolved + + candlepin_url = options.get('candlepin_url') or get_candlepin_url() + candlepin_ca = options.get('candlepin_ca') or get_candlepin_ca() + proxy = options.get('proxy') or get_proxy_url() + verify_tls = not options.get('no_verify_tls', False) + + # If dry-run, display what would happen and exit early before any Candlepin operations + if dry_run: + self.stdout.write('[dry-run] Would register with Candlepin:') + self.stdout.write(f' URL : {candlepin_url}') + self.stdout.write(f' Organization : {org}') + self.stdout.write(f' Username : {username}') + self.stdout.write(f' Install UUID : {db_install_uuid}') + if candlepin_ca: + self.stdout.write(f' CA cert : {candlepin_ca}') + if proxy: + self.stdout.write(f' Proxy : {proxy}') + self.stdout.write(f' Verify TLS : {verify_tls}') + self.stdout.write('[dry-run] No Candlepin operations performed.') + return True + + client = CandlepinClient(base_url=candlepin_url, candlepin_ca=candlepin_ca, proxy=proxy, verify_tls=verify_tls) + + self.stdout.write(f'Registering with Candlepin at {candlepin_url} (org={org}) ...') + try: + cert_pem, key_pem, consumer_uuid = client.register_consumer(username, password, org, install_uuid=db_install_uuid) + except Exception as e: + self.stderr.write(f'Registration failed: {e}') + return False + + self.stdout.write('Registered successfully.') + self.stdout.write(f' Consumer UUID : {consumer_uuid}') + + # Save to database + if _save_candlepin_registration_to_db(cert_pem, key_pem, consumer_uuid): + self.stdout.write('Certificate, key, and consumer UUID saved to database.') + else: + self.stderr.write('Failed to save registration to database.') + return False + + # Best-effort certificate metadata display + try: + info = parse_cert(cert_pem) + self.stdout.write(f' Cert serial : {info["serial"]}') + self.stdout.write(f' Cert CN : {info["cn"]}') + self.stdout.write(f' Valid until : {info["not_after"]} ({info["days_remaining"]} days remaining)') + except ValueError as e: + self.stdout.write(f'Certificate metadata unavailable: {e}') + + return True + + # ------------------------------------------------------------------ + # renew + # ------------------------------------------------------------------ + + def _handle_renew(self, options): + dry_run = options['dry_run'] + force = options['force'] + + cert_pem, key_pem, consumer_uuid = _fetch_candlepin_cert_from_db() + + if not cert_pem or not key_pem: + self.stderr.write('No Candlepin identity certificate found in database. Run the register subcommand first.') + return False + + if not consumer_uuid: + self.stderr.write('CANDLEPIN_CONSUMER_UUID is not set. Run the register subcommand first.') + return False + + try: + info = parse_cert(cert_pem) + self.stdout.write('Current certificate:') + self.stdout.write(f' Serial : {info["serial"]}') + self.stdout.write(f' CN : {info["cn"]}') + self.stdout.write(f' Valid until : {info["not_after"]} ({info["days_remaining"]} days remaining)') + except ValueError as e: + self.stdout.write('Current certificate:') + self.stdout.write(f' Certificate metadata unavailable: {e}') + info = None + + candlepin_url = options.get('candlepin_url') or get_candlepin_url() + candlepin_ca = options.get('candlepin_ca') or get_candlepin_ca() + proxy = options.get('proxy') or get_proxy_url() + verify_tls = not options.get('no_verify_tls', False) + renewal_days = get_renewal_days() + + # Check if renewal is needed (without force, just check cert expiry locally) + renewal_needed = force or needs_renewal(cert_pem, renewal_days) + + # If dry-run, display what would happen and exit early before any Candlepin operations + if dry_run: + self.stdout.write('[dry-run] Would perform the following operations:') + self.stdout.write(f' URL : {candlepin_url}') + self.stdout.write(f' Consumer UUID : {consumer_uuid}') + if candlepin_ca: + self.stdout.write(f' CA cert : {candlepin_ca}') + if proxy: + self.stdout.write(f' Proxy : {proxy}') + self.stdout.write(f' Verify TLS : {verify_tls}') + self.stdout.write(' 1. Check in with Candlepin') + if renewal_needed: + reason = 'forced via --force' if force else f'expiry within {renewal_days} days' + self.stdout.write(f' 2. Renew certificate ({reason})') + else: + if info: + self.stdout.write(f' 2. No renewal needed ({info["days_remaining"]} days remaining, threshold: {renewal_days} days)') + else: + self.stdout.write(f' 2. No renewal needed (threshold: {renewal_days} days)') + self.stdout.write('[dry-run] No Candlepin operations performed.') + return True + + client = CandlepinClient(base_url=candlepin_url, candlepin_ca=candlepin_ca, proxy=proxy, verify_tls=verify_tls) + + self.stdout.write(f'Checking in with Candlepin at {candlepin_url} (consumer={consumer_uuid}) ...') + checkin_success = client.checkin(consumer_uuid, cert_pem, key_pem) + + if not checkin_success: + self.stderr.write('Check-in with Candlepin failed. Unable to verify certificate status.') + self.stderr.write('Certificate renewal may still be needed. Use --force to renew anyway, or check logs for details.') + return False + + self.stdout.write('Check-in successful.') + + if not renewal_needed: + if info: + self.stdout.write(f'Certificate has {info["days_remaining"]} days remaining (renewal threshold: {renewal_days} days). No renewal needed.') + else: + self.stdout.write(f'Certificate renewal threshold is {renewal_days} days. No renewal needed.') + return True + + reason = 'forced via --force' if force else f'expiry within {renewal_days} days' + self.stdout.write(f'Renewing certificate ({reason}) ...') + try: + new_cert_pem, new_key_pem = client.regenerate_cert(consumer_uuid, cert_pem, key_pem) + except Exception as e: + self.stderr.write(f'Certificate renewal failed: {e}') + return False + + self.stdout.write('Certificate renewed successfully.') + + # Save to database + if _save_candlepin_cert_to_db(new_cert_pem, new_key_pem): + self.stdout.write('Renewed certificate and key saved to database.') + else: + self.stderr.write('Failed to save renewed certificate to database.') + return False + + # Best-effort certificate metadata display + try: + new_info = parse_cert(new_cert_pem) + if info: + self.stdout.write(f' Old serial : {info["serial"]}') + self.stdout.write(f' New serial : {new_info["serial"]}') + self.stdout.write(f' Valid until : {new_info["not_after"]} ({new_info["days_remaining"]} days remaining)') + except ValueError as e: + self.stdout.write(f'Certificate metadata unavailable: {e}') + + return True diff --git a/awx/main/tests/functional/analytics/test_core.py b/awx/main/tests/functional/analytics/test_core.py index a2c525c836..22565fe82f 100644 --- a/awx/main/tests/functional/analytics/test_core.py +++ b/awx/main/tests/functional/analytics/test_core.py @@ -74,9 +74,9 @@ def temp_analytic_tar(): @pytest.fixture def mock_analytic_post(): - # Patch the Session.post method to return a mock response with status_code 200 - with mock.patch('awx.main.analytics.core.requests.Session.post', return_value=mock.Mock(status_code=200)) as mock_post: - yield mock_post + # Patch get_or_generate_candlepin_certificate to skip mTLS path + with mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate', return_value=(None, None)): + yield @pytest.mark.parametrize( @@ -141,15 +141,22 @@ def mock_analytic_post(): ) @pytest.mark.django_db def test_ship_credential(setting_map, expected_result, expected_auth, temp_analytic_tar, mock_analytic_post): - with override_settings(**setting_map): - result = ship(temp_analytic_tar) + with override_settings(**setting_map, AUTOMATION_ANALYTICS_URL='https://example.com/api'): + with mock.patch('awx.main.analytics.core.OIDCClient') as mock_oidc: + mock_oidc_instance = mock.Mock() + mock_oidc_instance.make_request.return_value = mock.Mock(status_code=200) + mock_oidc.return_value = mock_oidc_instance - assert result == expected_result - if expected_auth: - mock_analytic_post.assert_called_once() - assert mock_analytic_post.call_args[1]['auth'] == expected_auth - else: - mock_analytic_post.assert_not_called() + result = ship(temp_analytic_tar) + + assert result == expected_result + if expected_auth: + # Verify OIDC client was instantiated with correct credentials + mock_oidc.assert_called_once_with(expected_auth[0], expected_auth[1]) + mock_oidc_instance.make_request.assert_called_once() + else: + # When credentials are missing, OIDCClient should not be called + mock_oidc.assert_not_called() @pytest.mark.django_db diff --git a/awx/main/tests/unit/analytics/test_core_ship.py b/awx/main/tests/unit/analytics/test_core_ship.py new file mode 100644 index 0000000000..a544860b35 --- /dev/null +++ b/awx/main/tests/unit/analytics/test_core_ship.py @@ -0,0 +1,271 @@ +# Copyright (c) 2026 Ansible, Inc. +# All Rights Reserved. + +"""Tests for analytics ship() function with mTLS authentication.""" + +import os +import tempfile +from unittest import mock + +from django.test.utils import override_settings + +from awx.main.analytics.core import ship, _get_cert_upload_url + + +class TestGetCertUploadUrl: + """Test _get_cert_upload_url() helper function.""" + + def test_adds_cert_subdomain(self): + """Test that 'cert.' is added to hostname.""" + url = 'https://analytics.example.com/api/ingress/v1/upload' + result = _get_cert_upload_url(url) + assert result == 'https://cert.analytics.example.com/api/ingress/v1/upload' + + def test_preserves_existing_cert_subdomain(self): + """Test that existing 'cert.' subdomain is preserved.""" + url = 'https://cert.analytics.example.com/api/ingress/v1/upload' + result = _get_cert_upload_url(url) + assert result == 'https://cert.analytics.example.com/api/ingress/v1/upload' + + +class TestShipMTLS: + """Test ship() function's mTLS authentication path.""" + + def setup_method(self): + """Create a temporary tarball for testing.""" + self.temp_file = tempfile.NamedTemporaryFile(mode='wb', suffix='.tar.gz', delete=False) + self.temp_file.write(b'test tarball content') + self.temp_file.close() + self.tarball_path = self.temp_file.name + + def teardown_method(self): + """Clean up temporary tarball.""" + if os.path.exists(self.tarball_path): + os.unlink(self.tarball_path) + + @override_settings( + AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload', + INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz', + INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt', + REDHAT_USERNAME='test_user', + REDHAT_PASSWORD='test_pass', # NOSONAR + AWX_TASK_ENV={}, + ) + @mock.patch('awx.main.analytics.core.get_awx_http_client_headers') + @mock.patch('awx.main.analytics.core._temp_cert_files') + @mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate') + @mock.patch('awx.main.analytics.core.requests.Session') + def test_ship_with_mtls_success(self, mock_session_class, mock_get_cert, mock_temp_files, mock_headers): + """Test successful upload with mTLS certificate authentication.""" + # Mock headers to avoid database access + mock_headers.return_value = {'Content-Type': 'application/json'} + + # Mock certificate retrieval + mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data') + + # Mock temp files context manager + mock_temp_files.return_value.__enter__.return_value = ('/tmp/cert.pem', '/tmp/key.pem') + mock_temp_files.return_value.__exit__.return_value = None + + # Mock successful mTLS response + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_session = mock.Mock() + mock_session.headers = {} + mock_session.post.return_value = mock_response + mock_session_class.return_value = mock_session + + result = ship(self.tarball_path) + + assert result is True + mock_get_cert.assert_called_once() + mock_temp_files.assert_called_once_with('cert-pem-data', 'key-pem-data') + mock_session.post.assert_called_once() + + # Verify cert URL is used (cert. subdomain added) + call_args = mock_session.post.call_args + assert call_args[0][0] == 'https://cert.analytics.example.com/api/ingress/v1/upload' + + # Verify mTLS cert was used + call_kwargs = call_args[1] + assert call_kwargs['cert'] == ('/tmp/cert.pem', '/tmp/key.pem') + + @override_settings( + AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload', + INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz', + INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt', + REDHAT_USERNAME='test_user', + REDHAT_PASSWORD='test_pass', # NOSONAR + AWX_TASK_ENV={}, + ) + @mock.patch('awx.main.analytics.core.get_awx_http_client_headers') + @mock.patch('awx.main.analytics.core.OIDCClient') + @mock.patch('awx.main.analytics.core._temp_cert_files') + @mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate') + @mock.patch('awx.main.analytics.core.requests.Session') + def test_ship_mtls_fallback_to_oidc_on_cert_failure(self, mock_session_class, mock_get_cert, mock_temp_files, mock_oidc_client, mock_headers): + """Test fallback to OIDC auth when mTLS cert authentication fails.""" + # Mock headers to avoid database access + mock_headers.return_value = {'Content-Type': 'application/json'} + + # Mock certificate retrieval + mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data') + + # Mock temp files context manager + mock_temp_files.return_value.__enter__.return_value = ('/tmp/cert.pem', '/tmp/key.pem') + mock_temp_files.return_value.__exit__.return_value = None + + # Mock failed mTLS response (401 Unauthorized) + mock_mtls_response = mock.Mock() + mock_mtls_response.status_code = 401 + mock_session = mock.Mock() + mock_session.headers = {} + mock_session.post.return_value = mock_mtls_response + mock_session_class.return_value = mock_session + + # Mock successful OIDC response + mock_oidc_response = mock.Mock() + mock_oidc_response.status_code = 200 + mock_oidc_instance = mock.Mock() + mock_oidc_instance.make_request.return_value = mock_oidc_response + mock_oidc_client.return_value = mock_oidc_instance + + result = ship(self.tarball_path) + + assert result is True + # Both mTLS and OIDC should be attempted + assert mock_session.post.call_count == 1 + mock_oidc_instance.make_request.assert_called_once() + + # Verify mTLS used cert URL + mtls_call_args = mock_session.post.call_args + assert mtls_call_args[0][0] == 'https://cert.analytics.example.com/api/ingress/v1/upload' + + # Verify OIDC used original URL + oidc_call_args = mock_oidc_instance.make_request.call_args + assert oidc_call_args[0][1] == 'https://analytics.example.com/api/ingress/v1/upload' + + @override_settings( + AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload', + INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz', + INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt', + REDHAT_USERNAME='test_user', + REDHAT_PASSWORD='test_pass', # NOSONAR + AWX_TASK_ENV={}, + ) + @mock.patch('awx.main.analytics.core.get_awx_http_client_headers') + @mock.patch('awx.main.analytics.core._temp_cert_files') + @mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate') + @mock.patch('awx.main.analytics.core.OIDCClient') + @mock.patch('awx.main.analytics.core.requests.Session') + def test_ship_mtls_exception_fallback_to_oidc(self, mock_session_class, mock_oidc_client, mock_get_cert, mock_temp_files, mock_headers): + """Test fallback to OIDC auth when mTLS raises an exception.""" + # Mock headers to avoid database access + mock_headers.return_value = {'Content-Type': 'application/json'} + + # Mock certificate retrieval + mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data') + + # Mock temp files context manager raising an exception + mock_temp_files.return_value.__enter__.side_effect = OSError('Temp file creation failed') + + # Mock successful OIDC response + mock_oidc_response = mock.Mock() + mock_oidc_response.status_code = 200 + mock_oidc_instance = mock.Mock() + mock_oidc_instance.make_request.return_value = mock_oidc_response + mock_oidc_client.return_value = mock_oidc_instance + + mock_session = mock.Mock() + mock_session.headers = {} + mock_session_class.return_value = mock_session + + result = ship(self.tarball_path) + + assert result is True + # mTLS should fail, OIDC should succeed + mock_oidc_instance.make_request.assert_called_once() + + @override_settings( + AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload', + INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz', + INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt', + REDHAT_USERNAME='test_user', + REDHAT_PASSWORD='test_pass', # NOSONAR + AWX_TASK_ENV={}, + ) + @mock.patch('awx.main.analytics.core.get_awx_http_client_headers') + @mock.patch('awx.main.analytics.core.OIDCClient') + @mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate') + @mock.patch('awx.main.analytics.core.requests.Session') + def test_ship_no_certificate_available(self, mock_session_class, mock_get_cert, mock_oidc_client, mock_headers): + """Test ship() when no Candlepin certificate is available.""" + # Mock headers to avoid database access + mock_headers.return_value = {'Content-Type': 'application/json'} + + # Mock no certificate available + mock_get_cert.return_value = (None, None) + + # Mock successful OIDC response + mock_oidc_response = mock.Mock() + mock_oidc_response.status_code = 200 + mock_oidc_instance = mock.Mock() + mock_oidc_instance.make_request.return_value = mock_oidc_response + mock_oidc_client.return_value = mock_oidc_instance + + mock_session = mock.Mock() + mock_session.headers = {} + mock_session_class.return_value = mock_session + + result = ship(self.tarball_path) + + assert result is True + # Should skip mTLS and go straight to OIDC + mock_oidc_instance.make_request.assert_called_once() + + @override_settings( + AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload', + INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz', + INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt', + REDHAT_USERNAME='test_user', + REDHAT_PASSWORD='test_pass', # NOSONAR + AWX_TASK_ENV={}, + ) + @mock.patch('awx.main.analytics.core.get_awx_http_client_headers') + @mock.patch('awx.main.analytics.core.OIDCClient') + @mock.patch('awx.main.analytics.core._temp_cert_files') + @mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate') + @mock.patch('awx.main.analytics.core.requests.Session') + def test_ship_both_auth_methods_fail(self, mock_session_class, mock_get_cert, mock_temp_files, mock_oidc_client, mock_headers): + """Test ship() when both mTLS and OIDC authentication fail.""" + # Mock headers to avoid database access + mock_headers.return_value = {'Content-Type': 'application/json'} + + # Mock certificate retrieval + mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data') + + # Mock temp files context manager + mock_temp_files.return_value.__enter__.return_value = ('/tmp/cert.pem', '/tmp/key.pem') + mock_temp_files.return_value.__exit__.return_value = None + + # Mock failed mTLS response + mock_mtls_response = mock.Mock() + mock_mtls_response.status_code = 401 + mock_session = mock.Mock() + mock_session.headers = {} + mock_session.post.return_value = mock_mtls_response + mock_session_class.return_value = mock_session + + # Mock failed OIDC response + mock_oidc_response = mock.Mock() + mock_oidc_response.status_code = 403 + mock_oidc_response.text = 'Forbidden' + mock_oidc_instance = mock.Mock() + mock_oidc_instance.make_request.return_value = mock_oidc_response + mock_oidc_client.return_value = mock_oidc_instance + + result = ship(self.tarball_path) + + assert result is False + mock_session.post.assert_called_once() + mock_oidc_instance.make_request.assert_called_once() diff --git a/awx/main/tests/unit/management/commands/test_candlepin_cert.py b/awx/main/tests/unit/management/commands/test_candlepin_cert.py new file mode 100644 index 0000000000..e9f93a89b4 --- /dev/null +++ b/awx/main/tests/unit/management/commands/test_candlepin_cert.py @@ -0,0 +1,310 @@ +# Copyright (c) 2026 Ansible, Inc. +# All Rights Reserved. + +"""Tests for candlepin_cert management command.""" + +from io import StringIO +from unittest import mock + +import pytest +from django.core.management import call_command +from django.test.utils import override_settings + + +class TestCandlepinCertCommand: + """Tests for candlepin_cert management command.""" + + @mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_registration_to_db') + @mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient') + @mock.patch('awx.main.management.commands.candlepin_cert.resolve_registration_credentials') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + @override_settings( + AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com', + AWX_ANALYTICS_CANDLEPIN_CA=None, + AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None, + ) + def test_register_success(self, mock_fetch_cert, mock_resolve_creds, mock_client_class, mock_save_reg): + """Test successful registration.""" + # No existing cert + mock_fetch_cert.return_value = (None, None, None) + + # Valid credentials + mock_resolve_creds.return_value = ('test_user', 'test_pass', 'test_org', 'install-uuid', None) + + # Mock successful registration + mock_client = mock.Mock() + mock_client.register_consumer.return_value = ('cert-pem', 'key-pem', 'consumer-uuid') + mock_client_class.return_value = mock_client + + # Mock successful save + mock_save_reg.return_value = True + + out = StringIO() + call_command('candlepin_cert', 'register', stdout=out, stderr=StringIO()) + + output = out.getvalue() + assert 'Registered successfully' in output + assert 'consumer-uuid' in output + + mock_client.register_consumer.assert_called_once_with('test_user', 'test_pass', 'test_org', install_uuid='install-uuid') + mock_save_reg.assert_called_once_with('cert-pem', 'key-pem', 'consumer-uuid') + + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + def test_register_already_registered_without_force(self, mock_fetch_cert): + """Test registration fails when cert already exists and --force not provided.""" + # Existing cert + mock_fetch_cert.return_value = ('existing-cert', 'existing-key', 'existing-uuid') + + out = StringIO() + call_command('candlepin_cert', 'register', stdout=out, stderr=StringIO()) + + output = out.getvalue() + assert 'already stored' in output + assert '--force' in output + + @mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_registration_to_db') + @mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient') + @mock.patch('awx.main.management.commands.candlepin_cert.resolve_registration_credentials') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + @override_settings( + AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com', + AWX_ANALYTICS_CANDLEPIN_CA=None, + AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None, + ) + def test_register_with_force_flag(self, mock_fetch_cert, mock_resolve_creds, mock_client_class, mock_save_reg): + """Test registration succeeds with --force even when cert exists.""" + # Existing cert + mock_fetch_cert.return_value = ('existing-cert', 'existing-key', 'existing-uuid') + + # Valid credentials + mock_resolve_creds.return_value = ('test_user', 'test_pass', 'test_org', 'install-uuid', None) + + # Mock successful registration + mock_client = mock.Mock() + mock_client.register_consumer.return_value = ('new-cert-pem', 'new-key-pem', 'new-consumer-uuid') + mock_client_class.return_value = mock_client + + # Mock successful save + mock_save_reg.return_value = True + + out = StringIO() + call_command('candlepin_cert', 'register', '--force', stdout=out, stderr=StringIO()) + + output = out.getvalue() + assert 'Registered successfully' in output + + mock_client.register_consumer.assert_called_once() + mock_save_reg.assert_called_once_with('new-cert-pem', 'new-key-pem', 'new-consumer-uuid') + + @mock.patch('awx.main.management.commands.candlepin_cert.resolve_registration_credentials') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + def test_register_missing_credentials(self, mock_fetch_cert, mock_resolve_creds): + """Test registration fails when credentials are missing.""" + mock_fetch_cert.return_value = (None, None, None) + + # Missing credentials + mock_resolve_creds.return_value = (None, None, None, None, ['username', 'password']) + + err = StringIO() + with pytest.raises(SystemExit) as exc_info: + call_command('candlepin_cert', 'register', stderr=err) + + assert exc_info.value.code == 1 + error_output = err.getvalue() + assert 'Missing required value' in error_output + + @mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_cert_to_db') + @mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient') + @mock.patch('awx.main.management.commands.candlepin_cert.parse_cert') + @mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + @override_settings( + AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com', + AWX_ANALYTICS_CANDLEPIN_CA=None, + AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None, + AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90, + ) + def test_renew_success(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class, mock_save_cert): + """Test successful certificate renewal.""" + # Existing cert + mock_fetch_cert.return_value = ('old-cert', 'old-key', 'consumer-uuid') + + # Parse cert returns metadata + mock_parse_cert.side_effect = [ + {'serial': '123', 'cn': 'test', 'not_after': '2026-06-01', 'days_remaining': 10}, # Current cert + {'serial': '456', 'cn': 'test', 'not_after': '2027-06-01', 'days_remaining': 365}, # Renewed cert + ] + + # Renewal needed + mock_needs_renewal.return_value = True + + # Mock successful check-in and renewal + mock_client = mock.Mock() + mock_client.checkin.return_value = True + mock_client.regenerate_cert.return_value = ('new-cert', 'new-key') + mock_client_class.return_value = mock_client + + mock_save_cert.return_value = True + + out = StringIO() + call_command('candlepin_cert', 'renew', stdout=out, stderr=StringIO()) + + output = out.getvalue() + assert 'Check-in successful' in output + assert 'Certificate renewed successfully' in output + assert 'saved to database' in output + + mock_client.checkin.assert_called_once_with('consumer-uuid', 'old-cert', 'old-key') + mock_client.regenerate_cert.assert_called_once() + mock_save_cert.assert_called_once_with('new-cert', 'new-key') + + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + def test_renew_no_cert_in_db(self, mock_fetch_cert): + """Test renew fails when no certificate exists in database.""" + mock_fetch_cert.return_value = (None, None, None) + + err = StringIO() + with pytest.raises(SystemExit) as exc_info: + call_command('candlepin_cert', 'renew', stderr=err) + + assert exc_info.value.code == 1 + error_output = err.getvalue() + assert 'No Candlepin identity certificate found' in error_output + assert 'Run the register subcommand first' in error_output + + @mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient') + @mock.patch('awx.main.management.commands.candlepin_cert.parse_cert') + @mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + @override_settings( + AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com', + AWX_ANALYTICS_CANDLEPIN_CA=None, + AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None, + AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90, + ) + def test_renew_not_needed(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class): + """Test renew when certificate is still valid and renewal not needed.""" + mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid') + + # Parse cert returns healthy cert + mock_parse_cert.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 200} + + # Renewal not needed + mock_needs_renewal.return_value = False + + # Mock successful check-in + mock_client = mock.Mock() + mock_client.checkin.return_value = True + mock_client_class.return_value = mock_client + + out = StringIO() + call_command('candlepin_cert', 'renew', stdout=out, stderr=StringIO()) + + output = out.getvalue() + assert 'Check-in successful' in output + assert 'No renewal needed' in output + + mock_client.checkin.assert_called_once() + mock_client.regenerate_cert.assert_not_called() + + @mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_cert_to_db') + @mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient') + @mock.patch('awx.main.management.commands.candlepin_cert.parse_cert') + @mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + @override_settings( + AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com', + AWX_ANALYTICS_CANDLEPIN_CA=None, + AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None, + AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90, + ) + def test_renew_with_force_flag(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class, mock_save_cert): + """Test renew --force renews even when not needed.""" + mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid') + + # Parse cert + mock_parse_cert.side_effect = [ + {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 200}, # Current cert (healthy) + {'serial': '456', 'cn': 'test', 'not_after': '2027-06-01', 'days_remaining': 365}, # New cert + ] + + # Would not need renewal without --force + mock_needs_renewal.return_value = False + + # Mock successful operations + mock_client = mock.Mock() + mock_client.checkin.return_value = True + mock_client.regenerate_cert.return_value = ('new-cert', 'new-key') + mock_client_class.return_value = mock_client + + mock_save_cert.return_value = True + + out = StringIO() + call_command('candlepin_cert', 'renew', '--force', stdout=out, stderr=StringIO()) + + output = out.getvalue() + assert 'forced via --force' in output + assert 'Certificate renewed successfully' in output + + mock_client.regenerate_cert.assert_called_once() + + @mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient') + @mock.patch('awx.main.management.commands.candlepin_cert.parse_cert') + @mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + @override_settings( + AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com', + AWX_ANALYTICS_CANDLEPIN_CA=None, + AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None, + AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90, + ) + def test_renew_checkin_failure(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class): + """Test renew handles check-in failure gracefully.""" + mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid') + + mock_parse_cert.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 100} + mock_needs_renewal.return_value = False # Not needed for renewal, just testing check-in failure + + # Mock failed check-in + mock_client = mock.Mock() + mock_client.checkin.return_value = False + mock_client_class.return_value = mock_client + + err = StringIO() + with pytest.raises(SystemExit) as exc_info: + call_command('candlepin_cert', 'renew', stderr=err) + + assert exc_info.value.code == 1 + error_output = err.getvalue() + assert 'Check-in with Candlepin failed' in error_output + + @mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient') + @mock.patch('awx.main.management.commands.candlepin_cert.parse_cert') + @mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal') + @mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db') + @override_settings( + AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com', + AWX_ANALYTICS_CANDLEPIN_CA=None, + AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None, + AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90, + ) + def test_renew_regenerate_cert_failure(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class): + """Test renew handles certificate regeneration failure.""" + mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid') + + mock_parse_cert.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2026-06-01', 'days_remaining': 10} + mock_needs_renewal.return_value = True + + # Mock successful check-in but failed regeneration + mock_client = mock.Mock() + mock_client.checkin.return_value = True + mock_client.regenerate_cert.side_effect = Exception('Certificate regeneration failed') + mock_client_class.return_value = mock_client + + err = StringIO() + with pytest.raises(SystemExit) as exc_info: + call_command('candlepin_cert', 'renew', stderr=err) + + assert exc_info.value.code == 1 + error_output = err.getvalue() + assert 'Certificate renewal failed' in error_output diff --git a/awx/main/tests/unit/utils/test_candlepin_certificate_registration.py b/awx/main/tests/unit/utils/test_candlepin_certificate_registration.py new file mode 100644 index 0000000000..e8ada9bf6d --- /dev/null +++ b/awx/main/tests/unit/utils/test_candlepin_certificate_registration.py @@ -0,0 +1,383 @@ +# Copyright (c) 2026 Ansible, Inc. +# All Rights Reserved. + +from unittest import mock + +from awx.main.utils.candlepin import ( + _discover_org, + _fetch_candlepin_cert_from_db, + _fetch_registration_credentials_from_db, + _save_candlepin_cert_to_db, + _save_candlepin_registration_to_db, + _register_candlepin_consumer, + _run_candlepin_lifecycle, + get_or_generate_candlepin_certificate, + resolve_registration_credentials, +) + + +class TestCandlepinCertificateRegistration: + """Tests for Candlepin integration in certificate registration module.""" + + @mock.patch('awx.main.utils.candlepin.requests.get') + @mock.patch('awx.main.utils.candlepin.get_candlepin_ca') + def test_discover_org_success(self, mock_get_ca, mock_requests_get): + """Test successful organization discovery.""" + mock_get_ca.return_value = '/path/to/ca.pem' + mock_response = mock.Mock() + mock_response.json.return_value = [ + {'key': 'test_org', 'displayName': 'Test Organization'}, + {'key': 'other_org', 'displayName': 'Other Organization'}, + ] + mock_requests_get.return_value = mock_response + + org = _discover_org('https://candlepin.example.com', 'test_user', 'test_pass') + + assert org == 'test_org' + mock_requests_get.assert_called_once_with( + 'https://candlepin.example.com/users/test_user/owners', + auth=('test_user', 'test_pass'), + verify='/path/to/ca.pem', + timeout=30, + ) + + @mock.patch('awx.main.utils.candlepin.requests.get') + @mock.patch('awx.main.utils.candlepin.get_candlepin_ca') + def test_discover_org_no_ca(self, mock_get_ca, mock_requests_get): + """Test organization discovery without custom CA (uses system certs).""" + mock_get_ca.return_value = None + mock_response = mock.Mock() + mock_response.json.return_value = [{'key': 'test_org', 'displayName': 'Test Organization'}] + mock_requests_get.return_value = mock_response + + org = _discover_org('https://candlepin.example.com', 'test_user', 'test_pass') + + assert org == 'test_org' + # Should use True for verify when no CA is configured + mock_requests_get.assert_called_once_with( + 'https://candlepin.example.com/users/test_user/owners', + auth=('test_user', 'test_pass'), + verify=True, + timeout=30, + ) + + @mock.patch('awx.main.utils.candlepin.requests.get') + def test_discover_org_no_verify_tls(self, mock_requests_get): + """Test organization discovery with TLS verification disabled.""" + mock_response = mock.Mock() + mock_response.json.return_value = [{'key': 'test_org', 'displayName': 'Test Organization'}] + mock_requests_get.return_value = mock_response + + org = _discover_org('https://candlepin.example.com', 'test_user', 'test_pass', verify_tls=False) + + assert org == 'test_org' + # Should use False for verify when verify_tls=False + mock_requests_get.assert_called_once_with( + 'https://candlepin.example.com/users/test_user/owners', + auth=('test_user', 'test_pass'), + verify=False, + timeout=30, + ) + + @mock.patch('awx.main.utils.candlepin.settings') + def test_fetch_candlepin_cert_from_db(self, mock_settings): + """Test fetching Candlepin cert from conf_settings.""" + mock_settings.CANDLEPIN_CONSUMER_UUID = 'test-uuid' + mock_settings.CANDLEPIN_CERT_PEM = 'cert-pem-data' + mock_settings.CANDLEPIN_KEY_PEM = 'key-pem-data' + + cert, key, uuid = _fetch_candlepin_cert_from_db() + + assert cert == 'cert-pem-data' + assert key == 'key-pem-data' + assert uuid == 'test-uuid' + + @mock.patch('awx.main.utils.candlepin._discover_org') + @mock.patch('awx.main.utils.candlepin.settings') + def test_fetch_registration_credentials_from_db(self, mock_settings, mock_discover_org): + """Test fetching registration credentials from settings. + + When both REDHAT and SUBSCRIPTIONS credentials exist, REDHAT takes priority + for both authentication and org discovery. + """ + mock_settings.REDHAT_USERNAME = 'test_user' + mock_settings.REDHAT_PASSWORD = 'test_pass' + mock_settings.INSTALL_UUID = 'test-install-uuid' + mock_settings.SUBSCRIPTIONS_USERNAME = 'subs_user' + mock_settings.SUBSCRIPTIONS_PASSWORD = 'subs_pass' + mock_discover_org.return_value = 'test_org' + + username, password, org, install_uuid = _fetch_registration_credentials_from_db() + + assert username == 'test_user' + assert password == 'test_pass' + assert org == 'test_org' + assert install_uuid == 'test-install-uuid' + # Verify _discover_org was called with REDHAT credentials (takes priority) + assert mock_discover_org.call_count == 1 + args = mock_discover_org.call_args[0] + assert args[1] == 'test_user' # REDHAT_USERNAME (selected) + assert args[2] == 'test_pass' # REDHAT_PASSWORD (selected) + + @mock.patch('awx.main.utils.candlepin._discover_org') + @mock.patch('awx.main.utils.candlepin.settings') + def test_fetch_registration_credentials_no_verify_tls(self, mock_settings, mock_discover_org): + """Test fetching credentials passes verify_tls=False to _discover_org. + + Also verifies that selected credentials (REDHAT in this case) are used for org discovery. + """ + mock_settings.REDHAT_USERNAME = 'test_user' + mock_settings.REDHAT_PASSWORD = 'test_pass' + mock_settings.INSTALL_UUID = 'test-install-uuid' + mock_settings.SUBSCRIPTIONS_USERNAME = 'subs_user' + mock_settings.SUBSCRIPTIONS_PASSWORD = 'subs_pass' + mock_discover_org.return_value = 'test_org' + + username, password, org, install_uuid = _fetch_registration_credentials_from_db(verify_tls=False) + + assert username == 'test_user' + assert password == 'test_pass' + assert org == 'test_org' + assert install_uuid == 'test-install-uuid' + # Verify _discover_org was called with verify_tls=False and REDHAT credentials + mock_discover_org.assert_called_once() + call_args = mock_discover_org.call_args + assert call_args[0][1] == 'test_user' # REDHAT_USERNAME (selected) + assert call_args[0][2] == 'test_pass' # REDHAT_PASSWORD (selected) + call_kwargs = call_args[1] + assert call_kwargs['verify_tls'] is False + + @mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db') + def test_resolve_registration_credentials_no_overrides(self, mock_fetch): + """Test resolve_registration_credentials with no overrides.""" + mock_fetch.return_value = ('db_user', 'db_pass', 'db_org', 'install-uuid') + + username, password, org, install_uuid, errors = resolve_registration_credentials() + + assert username == 'db_user' + assert password == 'db_pass' + assert org == 'db_org' + assert install_uuid == 'install-uuid' + assert errors is None + + @mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db') + def test_resolve_registration_credentials_with_overrides(self, mock_fetch): + """Test resolve_registration_credentials with CLI overrides.""" + mock_fetch.return_value = ('db_user', 'db_pass', 'db_org', 'install-uuid') + + username, password, org, install_uuid, errors = resolve_registration_credentials( + username_override='cli_user', password_override='cli_pass', org_override='cli_org' + ) + + assert username == 'cli_user' + assert password == 'cli_pass' + assert org == 'cli_org' + assert install_uuid == 'install-uuid' + assert errors is None + + @mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db') + def test_resolve_registration_credentials_verify_tls_false(self, mock_fetch): + """Test resolve_registration_credentials passes verify_tls=False to fetch function.""" + mock_fetch.return_value = ('db_user', 'db_pass', 'db_org', 'install-uuid') + + username, password, org, install_uuid, errors = resolve_registration_credentials(verify_tls=False) + + # Verify _fetch_registration_credentials_from_db was called with verify_tls=False + mock_fetch.assert_called_once_with(verify_tls=False) + assert username == 'db_user' + assert password == 'db_pass' + assert org == 'db_org' + assert install_uuid == 'install-uuid' + assert errors is None + + @mock.patch('awx.main.utils.candlepin.parse_cert') + @mock.patch('awx.main.utils.candlepin.settings') + def test_save_candlepin_cert_to_db(self, mock_settings, mock_parse_cert): + """Test saving Candlepin cert to conf_settings.""" + mock_parse_cert.return_value = { + 'serial': '123456', + 'cn': 'test-consumer', + 'not_before': '2026-01-01T00:00:00+00:00', + 'not_after': '2027-01-01T00:00:00+00:00', + 'days_remaining': 365, + } + + result = _save_candlepin_cert_to_db('new-cert', 'new-key') + + assert result is True + # Verify settings were assigned + assert mock_settings.CANDLEPIN_CERT_PEM == 'new-cert' + assert mock_settings.CANDLEPIN_KEY_PEM == 'new-key' + assert mock_settings.CANDLEPIN_SERIAL_NUMBER == '123456' + + @mock.patch('awx.main.utils.candlepin.parse_cert') + @mock.patch('awx.main.utils.candlepin.settings') + def test_save_candlepin_registration_to_db(self, mock_settings, mock_parse_cert): + """Test saving Candlepin registration to conf_settings.""" + mock_parse_cert.return_value = { + 'serial': '789012', + 'cn': 'test-consumer', + 'not_before': '2026-01-01T00:00:00+00:00', + 'not_after': '2027-01-01T00:00:00+00:00', + 'days_remaining': 365, + } + + result = _save_candlepin_registration_to_db('cert', 'key', 'uuid') + + assert result is True + # Verify all registration data was saved + assert mock_settings.CANDLEPIN_CONSUMER_UUID == 'uuid' + assert mock_settings.CANDLEPIN_CERT_PEM == 'cert' + assert mock_settings.CANDLEPIN_KEY_PEM == 'key' + assert mock_settings.CANDLEPIN_SERIAL_NUMBER == '789012' + + @mock.patch('awx.main.utils.candlepin._save_candlepin_registration_to_db') + @mock.patch('awx.main.utils.candlepin.CandlepinClient') + @mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db') + @mock.patch('awx.main.utils.candlepin.get_proxy_url') + @mock.patch('awx.main.utils.candlepin.get_candlepin_ca') + @mock.patch('awx.main.utils.candlepin.get_candlepin_url') + def test_register_candlepin_consumer_success(self, mock_get_url, mock_get_ca, mock_get_proxy, mock_fetch_creds, mock_client_class, mock_save): + """Test successful Candlepin consumer registration.""" + mock_get_url.return_value = 'https://candlepin.example.com' + mock_get_ca.return_value = '/path/to/ca.pem' + mock_get_proxy.return_value = None + mock_fetch_creds.return_value = ('user', 'pass', 'org', 'install-uuid') + mock_save.return_value = True + + mock_client = mock.Mock() + mock_client.register_consumer.return_value = ('cert', 'key', 'uuid') + mock_client_class.return_value = mock_client + + cert, key, uuid = _register_candlepin_consumer() + + assert cert == 'cert' + assert key == 'key' + assert uuid == 'uuid' + mock_save.assert_called_once_with('cert', 'key', 'uuid') + + @mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db') + def test_register_candlepin_consumer_missing_credentials(self, mock_fetch_creds): + """Test registration fails when credentials are missing.""" + mock_fetch_creds.return_value = (None, None, None, None) + + cert, key, uuid = _register_candlepin_consumer() + + assert cert is None + assert key is None + assert uuid is None + + @mock.patch('awx.main.utils.candlepin._save_candlepin_cert_to_db') + @mock.patch('awx.main.utils.candlepin.run_candlepin_lifecycle') + @mock.patch('awx.main.utils.candlepin.get_proxy_url') + @mock.patch('awx.main.utils.candlepin.get_candlepin_ca') + @mock.patch('awx.main.utils.candlepin.get_renewal_days') + @mock.patch('awx.main.utils.candlepin.get_candlepin_url') + def test_run_candlepin_lifecycle_with_renewal(self, mock_get_url, mock_get_days, mock_get_ca, mock_get_proxy, mock_lifecycle, mock_save): + """Test lifecycle with certificate renewal.""" + mock_get_url.return_value = 'https://candlepin.example.com' + mock_get_days.return_value = 90 + mock_get_ca.return_value = '/path/to/ca.pem' + mock_get_proxy.return_value = None + mock_lifecycle.return_value = ('new-cert', 'new-key') + mock_save.return_value = True + + cert, key = _run_candlepin_lifecycle('old-cert', 'old-key', 'real-uuid') + + assert cert == 'new-cert' + assert key == 'new-key' + mock_lifecycle.assert_called_once() + mock_save.assert_called_once_with('new-cert', 'new-key') + + @mock.patch('awx.main.utils.candlepin.is_cert_valid') + @mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle') + @mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db') + def test_get_or_generate_candlepin_certificate_existing_valid(self, mock_fetch, mock_lifecycle, mock_is_valid): + """Test get_or_generate with existing valid certificate.""" + mock_fetch.return_value = ('cert-pem', 'key-pem', 'consumer-uuid') + mock_lifecycle.return_value = ('cert-pem', 'key-pem') + mock_is_valid.return_value = True + + cert, key = get_or_generate_candlepin_certificate() + + assert cert == 'cert-pem' + assert key == 'key-pem' + mock_lifecycle.assert_called_once_with('cert-pem', 'key-pem', 'consumer-uuid') + + @mock.patch('awx.main.utils.candlepin.is_cert_valid') + @mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle') + @mock.patch('awx.main.utils.candlepin._register_candlepin_consumer') + @mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db') + def test_get_or_generate_candlepin_certificate_register_new(self, mock_fetch, mock_register, mock_lifecycle, mock_is_valid): + """Test get_or_generate when no certificate exists - registers new.""" + mock_fetch.return_value = (None, None, None) + mock_register.return_value = ('new-cert', 'new-key', 'new-uuid') + mock_lifecycle.return_value = ('new-cert', 'new-key') + mock_is_valid.return_value = True + + cert, key = get_or_generate_candlepin_certificate() + + assert cert == 'new-cert' + assert key == 'new-key' + mock_register.assert_called_once() + mock_lifecycle.assert_called_once_with('new-cert', 'new-key', 'new-uuid') + + @mock.patch('awx.main.utils.candlepin._register_candlepin_consumer') + @mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db') + def test_get_or_generate_candlepin_certificate_registration_fails(self, mock_fetch, mock_register): + """Test get_or_generate when registration fails.""" + mock_fetch.return_value = (None, None, None) + mock_register.return_value = (None, None, None) + + cert, key = get_or_generate_candlepin_certificate() + + assert cert is None + assert key is None + + @mock.patch('awx.main.utils.candlepin.is_cert_valid') + @mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle') + @mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db') + def test_get_or_generate_candlepin_certificate_invalid_cert(self, mock_fetch, mock_lifecycle, mock_is_valid): + """Test get_or_generate when certificate is invalid.""" + mock_fetch.return_value = ('cert-pem', 'key-pem', 'consumer-uuid') + mock_lifecycle.return_value = ('cert-pem', 'key-pem') + mock_is_valid.return_value = False + + cert, key = get_or_generate_candlepin_certificate() + + assert cert is None + assert key is None + + @mock.patch('awx.main.utils.candlepin.is_cert_valid') + @mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle') + @mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db') + def test_get_or_generate_candlepin_certificate_expired_cert_renewed_successfully(self, mock_fetch, mock_lifecycle, mock_is_valid): + """Test get_or_generate with expired certificate that is successfully renewed.""" + mock_fetch.return_value = ('expired-cert', 'old-key', 'consumer-uuid') + # Lifecycle successfully renews + mock_lifecycle.return_value = ('new-cert', 'new-key') + # New certificate is valid + mock_is_valid.return_value = True + + cert, key = get_or_generate_candlepin_certificate() + + assert cert == 'new-cert' + assert key == 'new-key' + mock_lifecycle.assert_called_once_with('expired-cert', 'old-key', 'consumer-uuid') + + @mock.patch('awx.main.utils.candlepin.parse_cert') + @mock.patch('awx.main.utils.candlepin.settings') + def test_save_candlepin_registration_to_db_cert_parse_failure(self, mock_settings, mock_parse_cert): + """Test _save_candlepin_registration_to_db handles cert parsing failure gracefully.""" + # Cert parsing fails + mock_parse_cert.side_effect = ValueError('Invalid certificate format') + + result = _save_candlepin_registration_to_db('invalid-cert', 'key-pem', 'consumer-uuid') + + # Should still save registration even if parsing fails + assert result is True + # Verify UUID, cert, key, and serial (empty string) were saved + assert mock_settings.CANDLEPIN_CONSUMER_UUID == 'consumer-uuid' + assert mock_settings.CANDLEPIN_CERT_PEM == 'invalid-cert' + assert mock_settings.CANDLEPIN_KEY_PEM == 'key-pem' + assert mock_settings.CANDLEPIN_SERIAL_NUMBER == '' diff --git a/awx/main/tests/unit/utils/test_candlepin_client.py b/awx/main/tests/unit/utils/test_candlepin_client.py new file mode 100644 index 0000000000..13c3197b5e --- /dev/null +++ b/awx/main/tests/unit/utils/test_candlepin_client.py @@ -0,0 +1,124 @@ +# Copyright (c) 2026 Ansible, Inc. +# All Rights Reserved. + +import os +from unittest import mock + +from awx.main.utils.candlepin.client import CandlepinClient, _temp_cert_files + + +class TestCandlepinClient: + """Tests for CandlepinClient.""" + + def test_base_url_required(self): + """Test base_url parameter is required.""" + client = CandlepinClient(base_url='https://subscription.example.com/candlepin') + assert client.base_url == 'https://subscription.example.com/candlepin' + + def test_verify_tls_enabled_by_default(self): + """Test TLS verification is enabled by default.""" + client = CandlepinClient(base_url='https://test.example.com') + assert client.verify is True + + def test_verify_tls_with_ca(self): + """Test TLS verification with custom CA.""" + client = CandlepinClient(base_url='https://test.example.com', candlepin_ca='/path/to/ca.pem') + assert client.verify == '/path/to/ca.pem' + + def test_proxy_configuration(self): + """Test proxy configuration.""" + client = CandlepinClient(base_url='https://test.example.com', proxy='http://proxy.example.com:8080') + assert client.proxies == {'https': 'http://proxy.example.com:8080', 'http': 'http://proxy.example.com:8080'} + + def test_temp_cert_files_cleanup(self): + """Test temporary certificate files are created and cleaned up.""" + cert_pem = '-----BEGIN CERTIFICATE-----\ntest_cert\n-----END CERTIFICATE-----' + key_pem = '-----BEGIN PRIVATE KEY-----\ntest_key\n-----END PRIVATE KEY-----' + + with _temp_cert_files(cert_pem, key_pem) as (cert_path, key_path): + assert os.path.exists(cert_path) + assert os.path.exists(key_path) + # Verify file permissions + cert_stat = os.stat(cert_path) + assert oct(cert_stat.st_mode)[-3:] == '600' + + # Verify cleanup + assert not os.path.exists(cert_path) + assert not os.path.exists(key_path) + + @mock.patch('awx.main.utils.candlepin.client.requests.post') + def test_register_consumer_success(self, mock_post): + """Test successful consumer registration.""" + mock_response = mock.Mock() + mock_response.ok = True + mock_response.json.return_value = { + 'uuid': 'test-consumer-uuid', + 'idCert': { + 'cert': '-----BEGIN CERTIFICATE-----\ncert_data\n-----END CERTIFICATE-----', + 'key': '-----BEGIN PRIVATE KEY-----\nkey_data\n-----END PRIVATE KEY-----', + }, + } + mock_post.return_value = mock_response + + client = CandlepinClient(base_url='https://test.example.com') + cert_pem, key_pem, consumer_uuid = client.register_consumer('test_user', 'test_pass', 'test_org', install_uuid='test-install-uuid') + + assert consumer_uuid == 'test-consumer-uuid' + assert '-----BEGIN CERTIFICATE-----' in cert_pem + assert '-----BEGIN PRIVATE KEY-----' in key_pem + + @mock.patch('awx.main.utils.candlepin.client.requests.put') + def test_checkin_success(self, mock_put): + """Test successful check-in.""" + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + client = CandlepinClient(base_url='https://test.example.com') + cert_pem = '-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----' + key_pem = '-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----' + + result = client.checkin('test-uuid', cert_pem, key_pem) + assert result is True + + @mock.patch('awx.main.utils.candlepin.client.requests.get') + def test_get_consumer_success(self, mock_get): + """Test successful consumer retrieval.""" + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'uuid': 'test-consumer-uuid', + 'name': 'aap-12345678', + 'idCert': {'cert': '-----BEGIN CERTIFICATE-----\nserver_cert\n-----END CERTIFICATE-----', 'serial': {'serial': 123456789}}, + } + mock_get.return_value = mock_response + + client = CandlepinClient(base_url='https://test.example.com') + cert_pem = '-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----' + key_pem = '-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----' + + result = client.get_consumer('test-uuid', cert_pem, key_pem) + assert result is not None + assert result['uuid'] == 'test-consumer-uuid' + assert 'idCert' in result + + @mock.patch('awx.main.utils.candlepin.client.requests.post') + def test_regenerate_cert_success(self, mock_post): + """Test successful certificate regeneration.""" + mock_response = mock.Mock() + mock_response.ok = True + mock_response.json.return_value = { + 'idCert': { + 'cert': '-----BEGIN CERTIFICATE-----\nnew_cert\n-----END CERTIFICATE-----', + 'key': '-----BEGIN PRIVATE KEY-----\nnew_key\n-----END PRIVATE KEY-----', + } + } + mock_post.return_value = mock_response + + client = CandlepinClient(base_url='https://test.example.com') + old_cert = '-----BEGIN CERTIFICATE-----\nold\n-----END CERTIFICATE-----' + old_key = '-----BEGIN PRIVATE KEY-----\nold\n-----END PRIVATE KEY-----' + + new_cert, new_key = client.regenerate_cert('test-uuid', old_cert, old_key) + assert 'new_cert' in new_cert + assert 'new_key' in new_key diff --git a/awx/main/tests/unit/utils/test_candlepin_lifecycle.py b/awx/main/tests/unit/utils/test_candlepin_lifecycle.py new file mode 100644 index 0000000000..f9762ed2a8 --- /dev/null +++ b/awx/main/tests/unit/utils/test_candlepin_lifecycle.py @@ -0,0 +1,222 @@ +# Copyright (c) 2026 Ansible, Inc. +# All Rights Reserved. + +from datetime import datetime, timezone +from unittest import mock + +from awx.main.utils.candlepin.lifecycle import ( + parse_cert, + needs_renewal, + run_candlepin_lifecycle, + get_candlepin_url, + get_renewal_days, + get_candlepin_ca, + get_proxy_url, +) + +# Sample test certificate (expires far in the future for testing) +SAMPLE_CERT_PEM = """-----BEGIN CERTIFICATE----- +MIIDXTCCAkWgAwIBAgIJAKJ5VZ2cPQE5MA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMjYwMTAxMDAwMDAwWhcNMjcwMTAxMDAwMDAwWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEA0a7Y3l3X4L7pKq3xDl8vCRrRK6qU5dF7r3xQH5YRz4hZJN9wE3xW0qDT +-----END CERTIFICATE-----""" + + +class TestCandlepinLifecycle: + """Tests for Candlepin lifecycle functions.""" + + @mock.patch('awx.main.utils.candlepin.lifecycle.settings') + def test_get_candlepin_url_default(self, mock_settings): + """Test default Candlepin URL from defaults.py.""" + mock_settings.AWX_ANALYTICS_CANDLEPIN_URL = 'https://subscription.example.com/candlepin/' + url = get_candlepin_url() + assert url == 'https://subscription.example.com/candlepin/' + + @mock.patch('awx.main.utils.candlepin.lifecycle.settings') + def test_get_renewal_days_from_settings(self, mock_settings): + """Test renewal days from Django settings.""" + mock_settings.AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS = 45 + days = get_renewal_days() + assert days == 45 + + @mock.patch('awx.main.utils.candlepin.lifecycle.os.path.isfile') + @mock.patch('awx.main.utils.candlepin.lifecycle.settings') + def test_get_candlepin_ca_from_settings(self, mock_settings, mock_isfile): + """Test Candlepin CA from Django settings when file exists.""" + mock_settings.AWX_ANALYTICS_CANDLEPIN_CA = '/path/to/ca.pem' + mock_isfile.return_value = True + ca = get_candlepin_ca() + assert ca == '/path/to/ca.pem' + + @mock.patch('awx.main.utils.candlepin.lifecycle.os.path.isfile') + @mock.patch('awx.main.utils.candlepin.lifecycle.settings') + def test_get_candlepin_ca_file_not_found(self, mock_settings, mock_isfile): + """Test Candlepin CA returns None when configured path doesn't exist.""" + mock_settings.AWX_ANALYTICS_CANDLEPIN_CA = '/path/to/missing.pem' + mock_isfile.return_value = False + ca = get_candlepin_ca() + assert ca is None + + @mock.patch('awx.main.utils.candlepin.lifecycle.settings') + def test_get_proxy_url_from_settings(self, mock_settings): + """Test proxy URL from Django settings.""" + mock_settings.AWX_ANALYTICS_CANDLEPIN_PROXY_URL = 'http://proxy.example.com:8080' + proxy = get_proxy_url() + assert proxy == 'http://proxy.example.com:8080' + + @mock.patch('awx.main.utils.candlepin.lifecycle.x509.load_pem_x509_certificate') + def test_parse_cert(self, mock_load_cert): + """Test certificate parsing.""" + # Mock a certificate object + mock_cert = mock.Mock() + mock_cert.serial_number = 123456 + mock_cert.not_valid_before_utc = datetime(2026, 1, 1, tzinfo=timezone.utc) + mock_cert.not_valid_after_utc = datetime(2027, 1, 1, tzinfo=timezone.utc) + + # Mock subject and issuer + mock_attr = mock.Mock() + mock_attr.oid._name = 'commonName' + mock_attr.value = 'test-cn' + mock_cert.subject = [mock_attr] + mock_cert.issuer = [mock_attr] + + mock_load_cert.return_value = mock_cert + + result = parse_cert('fake-pem') + + assert result['serial'] == '123456' + assert result['cn'] == 'test-cn' + assert 'not_before' in result + assert 'not_after' in result + assert 'days_remaining' in result + + @mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert') + def test_needs_renewal_true(self, mock_parse): + """Test needs_renewal returns True when cert is expiring soon.""" + mock_parse.return_value = {'days_remaining': 10} + + result = needs_renewal('fake-cert', days_before_expiry=30) + assert result is True + + @mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert') + def test_needs_renewal_false(self, mock_parse): + """Test needs_renewal returns False when cert has time remaining.""" + mock_parse.return_value = {'days_remaining': 100} + + result = needs_renewal('fake-cert', days_before_expiry=30) + assert result is False + + @mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient') + @mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert') + def test_run_candlepin_lifecycle_no_renewal_needed(self, mock_parse, mock_client_class): + """Test lifecycle when no renewal is needed.""" + mock_parse.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01T00:00:00+00:00', 'days_remaining': 100} + + mock_client = mock.Mock() + mock_client.checkin.return_value = True + mock_client.get_consumer.return_value = None # Skip serial comparison + mock_client_class.return_value = mock_client + + cert_pem, key_pem = run_candlepin_lifecycle('cert-pem', 'key-pem', 'consumer-uuid', candlepin_url='https://test.example.com', renewal_days=30) + + assert cert_pem == 'cert-pem' + assert key_pem == 'key-pem' + mock_client.checkin.assert_called_once() + mock_client.regenerate_cert.assert_not_called() + + @mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient') + @mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert') + def test_run_candlepin_lifecycle_with_renewal(self, mock_parse, mock_client_class): + """Test lifecycle when renewal is needed.""" + # parse_cert is called multiple times: + # 1. Parse original cert + # 2. In needs_renewal() to check expiry + # 3. Parse new cert after renewal for logging + mock_parse.side_effect = [ + {'serial': '123', 'cn': 'test', 'not_after': '2026-02-01', 'days_remaining': 10}, # Original cert + {'serial': '123', 'cn': 'test', 'not_after': '2026-02-01', 'days_remaining': 10}, # needs_renewal check + {'serial': '456', 'cn': 'test', 'not_after': '2027-02-01', 'days_remaining': 365}, # New cert + ] + + mock_client = mock.Mock() + mock_client.checkin.return_value = True + mock_client.get_consumer.return_value = None # Skip serial comparison + mock_client.regenerate_cert.return_value = ('new-cert', 'new-key') + mock_client_class.return_value = mock_client + + cert_pem, key_pem = run_candlepin_lifecycle('old-cert', 'old-key', 'consumer-uuid', renewal_days=90) + + assert cert_pem == 'new-cert' + assert key_pem == 'new-key' + mock_client.regenerate_cert.assert_called_once() + + @mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient') + @mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert') + def test_run_candlepin_lifecycle_expired_cert_renewal(self, mock_parse, mock_client_class): + """Test lifecycle renews an expired certificate.""" + # parse_cert called for: + # 1. Parse original expired cert + # 2. needs_renewal check (expired, so returns True) + # 3. Parse new cert after renewal + mock_parse.side_effect = [ + {'serial': '123', 'cn': 'test', 'not_after': '2025-12-31', 'days_remaining': -120}, # Expired cert + {'serial': '123', 'cn': 'test', 'not_after': '2025-12-31', 'days_remaining': -120}, # needs_renewal + {'serial': '456', 'cn': 'test', 'not_after': '2027-06-01', 'days_remaining': 365}, # New cert + ] + + mock_client = mock.Mock() + mock_client.checkin.return_value = True + mock_client.get_consumer.return_value = None + mock_client.regenerate_cert.return_value = ('new-cert', 'new-key') + mock_client_class.return_value = mock_client + + cert_pem, key_pem = run_candlepin_lifecycle('expired-cert', 'old-key', 'consumer-uuid', renewal_days=90) + + assert cert_pem == 'new-cert' + assert key_pem == 'new-key' + mock_client.regenerate_cert.assert_called_once() + + @mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient') + @mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert') + def test_run_candlepin_lifecycle_checkin_failure_revoked_cert(self, mock_parse, mock_client_class): + """Test lifecycle handles check-in failure (e.g., revoked certificate).""" + mock_parse.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 100} + + # Check-in fails (could indicate revoked cert or deleted consumer) + mock_client = mock.Mock() + mock_client.checkin.return_value = False + mock_client.get_consumer.return_value = None # get_consumer also fails + mock_client_class.return_value = mock_client + + # Lifecycle should continue and return original cert + cert_pem, key_pem = run_candlepin_lifecycle('cert-pem', 'key-pem', 'consumer-uuid', renewal_days=30) + + assert cert_pem == 'cert-pem' + assert key_pem == 'key-pem' + mock_client.checkin.assert_called_once() + # Regeneration should not be attempted since get_consumer indicates consumer doesn't exist + mock_client.regenerate_cert.assert_not_called() + + @mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient') + @mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert') + def test_run_candlepin_lifecycle_consumer_deleted_server_side(self, mock_parse, mock_client_class): + """Test lifecycle detects when consumer was deleted from Candlepin server.""" + mock_parse.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 100} + + # Both check-in and get_consumer fail (consumer deleted) + mock_client = mock.Mock() + mock_client.checkin.return_value = False + mock_client.get_consumer.return_value = None + mock_client_class.return_value = mock_client + + cert_pem, key_pem = run_candlepin_lifecycle('cert-pem', 'key-pem', 'consumer-uuid', renewal_days=30) + + # Should return original cert (caller can attempt mTLS, which will fail and fall back to service account) + assert cert_pem == 'cert-pem' + assert key_pem == 'key-pem' + mock_client.checkin.assert_called_once() + mock_client.get_consumer.assert_called_once() + mock_client.regenerate_cert.assert_not_called() diff --git a/awx/main/utils/candlepin/__init__.py b/awx/main/utils/candlepin/__init__.py new file mode 100644 index 0000000000..191c3d9640 --- /dev/null +++ b/awx/main/utils/candlepin/__init__.py @@ -0,0 +1,349 @@ +# Copyright (c) 2026 Ansible, Inc. +# All Rights Reserved. + +""" +Candlepin integration for mTLS-based authentication. + +This package provides Candlepin consumer identity certificate support, +enabling AAP controller instances to authenticate analytics uploads using +mTLS instead of service account credentials. +""" + +import logging +import requests + +from django.conf import settings + +from .client import CandlepinClient +from .lifecycle import ( + get_candlepin_ca, + get_candlepin_url, + get_proxy_url, + get_renewal_days, + is_cert_valid, + parse_cert, + run_candlepin_lifecycle, +) + +logger = logging.getLogger('awx.main.utils.candlepin') + + +def _fetch_candlepin_cert_from_db(): + """Read cert PEM, key PEM, and consumer UUID from AWX conf_settings. + + Returns (cert_pem, key_pem, consumer_uuid) if valid certificate data exists, + or (None, None, None) if placeholder/unregistered data. + Best-effort: failures are logged as warnings and never propagate. + """ + try: + consumer_uuid = getattr(settings, 'CANDLEPIN_CONSUMER_UUID', '') + cert_pem = getattr(settings, 'CANDLEPIN_CERT_PEM', '') + key_pem = getattr(settings, 'CANDLEPIN_KEY_PEM', '') + + # Check if we have valid data + if not consumer_uuid or not cert_pem or not key_pem: + return None, None, None + + return cert_pem, key_pem, consumer_uuid + except Exception as e: + logger.warning(f'Could not fetch Candlepin lifecycle data from settings: {e}') + return None, None, None + + +def _save_candlepin_cert_to_db(cert_pem, key_pem): + """Persist a renewed Candlepin identity cert and key to AWX conf_settings. + + Returns: + bool: True if save succeeded, False on any error. + """ + try: + # Parse certificate to extract metadata + try: + cert_info = parse_cert(cert_pem) + serial_number = cert_info.get('serial', '') + except Exception as e: + logger.warning(f'Could not parse certificate metadata: {e}') + serial_number = '' + + # Update conf_settings via settings wrapper + settings.CANDLEPIN_CERT_PEM = cert_pem + settings.CANDLEPIN_KEY_PEM = key_pem + settings.CANDLEPIN_SERIAL_NUMBER = serial_number + + logger.info('Renewed Candlepin cert and key saved to conf_settings.') + return True + except Exception as e: + logger.error(f'Could not save renewed Candlepin cert to conf_settings: {e}') + return False + + +def _discover_org(candlepin_url, username, password, verify_tls=True): + """Discover org key via GET /users/{username}/owners. + + Args: + candlepin_url: Candlepin base URL + username: Username for authentication + password: Password for authentication + verify_tls: Whether to verify TLS certificates (default: True) + + Returns: + str: Organization key if found, None on any failure. + """ + try: + url = f"{candlepin_url}/users/{username}/owners" + if verify_tls: + candlepin_ca = get_candlepin_ca() + verify = candlepin_ca if candlepin_ca else True + else: + verify = False + + resp = requests.get(url, auth=(username, password), verify=verify, timeout=30) + resp.raise_for_status() + + owners = resp.json() + if not owners: + logger.warning(f'No organizations found for user {username}') + return None + + # Pick the first org, but warn if multiple exist + if len(owners) > 1: + logger.warning(f'User {username} has access to {len(owners)} organizations. Using first: {owners[0]}') + first_org = owners[0] + org = first_org.get('key') + if not org: + logger.warning(f'Organization key missing in first org entry for user {username}') + return None + + return org + except requests.exceptions.RequestException as e: + logger.warning(f'Failed to discover organization for user {username}: {e}') + return None + except Exception as e: + logger.warning(f'Unexpected error discovering organization for user {username}: {e}') + return None + + +def _fetch_registration_credentials_from_db(verify_tls=True): + """Read Candlepin registration credentials from AWX settings. + + Tries several options to retrieve the Candlepin credentials (set by AWX when the + customer configures their Red Hat subscription), and to discover the org (org + key for the Candlepin /consumers endpoint), and INSTALL_UUID (used as the + consumer's aap.instance_uuid fact). + + Priority for authentication credentials: + - If both REDHAT_USERNAME and SUBSCRIPTIONS_USERNAME exist: use REDHAT_USERNAME + - If only SUBSCRIPTIONS_USERNAME exists: use SUBSCRIPTIONS_USERNAME + + Args: + verify_tls: Whether to verify TLS certificates during org discovery (default: True) + + Returns (username, password, org, install_uuid), any of which may be None + if the corresponding setting is not configured. + """ + candlepin_url = get_candlepin_url() + try: + username = getattr(settings, 'REDHAT_USERNAME', None) + password = getattr(settings, 'REDHAT_PASSWORD', None) + + if not (username and password): + username = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None) + password = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None) + + install_uuid = getattr(settings, 'INSTALL_UUID', None) + + org = _discover_org(candlepin_url, username, password, verify_tls=verify_tls) if username and password else None + + return username, password, org, install_uuid + except Exception as e: + logger.warning(f'Could not fetch Candlepin registration credentials from settings: {e}') + return None, None, None, None + + +def resolve_registration_credentials(username_override=None, password_override=None, org_override=None, verify_tls=True): + """Resolve Candlepin registration credentials with optional overrides. + + Fetches credentials from database settings and merges with any provided overrides. + Validates that all required fields are present. + + Args: + username_override: Optional username to use instead of database value + password_override: Optional password to use instead of database value + org_override: Optional org to use instead of auto-discovered value + verify_tls: Whether to verify TLS certificates during org discovery (default: True) + + Returns: + Tuple (username, password, org, install_uuid) if all required fields present, + or (None, None, None, None, error_messages) if validation fails. + error_messages is a list of strings describing missing values. + """ + db_username, db_password, db_org, db_install_uuid = _fetch_registration_credentials_from_db(verify_tls=verify_tls) + + username = username_override or db_username + password = password_override or db_password + org = org_override or db_org + + # Validate all required fields are present + missing = [] + if not username: + missing.append('username (provide --username or set REDHAT_USERNAME in database)') + if not password: + missing.append('password (provide password or set REDHAT_PASSWORD in database)') + if not org: + missing.append('org (provide --org or ensure SUBSCRIPTIONS_USERNAME/PASSWORD are configured for auto-discovery)') + + if missing: + return None, None, None, None, missing + + return username, password, org, db_install_uuid, None + + +def _save_candlepin_registration_to_db(cert_pem, key_pem, consumer_uuid): + """Persist a new Candlepin consumer registration (cert, key, UUID) to AWX conf_settings. + + Returns: + bool: True if save succeeded, False on any error. + """ + try: + # Parse certificate to extract metadata + try: + cert_info = parse_cert(cert_pem) + serial_number = cert_info.get('serial', '') + except Exception as e: + logger.warning(f'Could not parse certificate metadata: {e}') + serial_number = '' + + # Update conf_settings with all registration data via settings wrapper + settings.CANDLEPIN_CONSUMER_UUID = consumer_uuid + settings.CANDLEPIN_CERT_PEM = cert_pem + settings.CANDLEPIN_KEY_PEM = key_pem + settings.CANDLEPIN_SERIAL_NUMBER = serial_number + + logger.info(f'Candlepin consumer registration saved to conf_settings (uuid={consumer_uuid}).') + return True + except Exception as e: + logger.error(f'Could not save Candlepin registration to conf_settings: {e}') + return False + + +def _register_candlepin_consumer(): + """Register a new Candlepin consumer using credentials from AWX settings. + + Called when no identity cert exists in the DB. + + Reads the Candlepin credentials and the org key and then calls + POST /consumers on Candlepin to obtain an identity certificate. + On success the cert, key, and consumer UUID are persisted to conf_settings. + + Returns (cert_pem, key_pem, consumer_uuid) on success, (None, None, None) on + any failure. Best-effort: logs errors but never propagates. + """ + username, password, org, install_uuid = _fetch_registration_credentials_from_db() + + if not username or not password: + logger.warning('Candlepin registration is enabled but credentials are not set; skipping registration.') + return None, None, None + + if not org: + logger.warning('Candlepin registration is enabled but subscription org is not available; skipping registration.') + return None, None, None + + candlepin_url = get_candlepin_url() + candlepin_ca = get_candlepin_ca() + proxy = get_proxy_url() + client = CandlepinClient(base_url=candlepin_url, candlepin_ca=candlepin_ca, proxy=proxy) + + try: + cert_pem, key_pem, consumer_uuid = client.register_consumer(username, password, org, install_uuid) + except Exception as e: + logger.error(f'Candlepin consumer registration failed: {e}') + return None, None, None + + if not _save_candlepin_registration_to_db(cert_pem, key_pem, consumer_uuid): + logger.error('Candlepin consumer registration succeeded but failed to save to database.') + return None, None, None + return cert_pem, key_pem, consumer_uuid + + +def _run_candlepin_lifecycle(cert_pem, key_pem, consumer_uuid): + """Orchestrate Candlepin check-in and proactive cert renewal. + + Returns the (possibly renewed) (cert_pem, key_pem) tuple. If renewal fails, the + original cert is returned and the caller will validate it with is_cert_valid(). + If invalid, the caller skips mTLS and falls back directly to OIDC authentication. + """ + if not consumer_uuid: + logger.warning('Candlepin lifecycle is enabled but consumer UUID is not set; skipping check-in and renewal.') + return cert_pem, key_pem + + candlepin_url = get_candlepin_url() + renewal_days = get_renewal_days() + candlepin_ca = get_candlepin_ca() + proxy = get_proxy_url() + + try: + new_cert_pem, new_key_pem = run_candlepin_lifecycle( + cert_pem, + key_pem, + consumer_uuid, + candlepin_url=candlepin_url, + renewal_days=renewal_days, + candlepin_ca=candlepin_ca, + proxy=proxy, + ) + if (new_cert_pem, new_key_pem) != (cert_pem, key_pem): + if not _save_candlepin_cert_to_db(new_cert_pem, new_key_pem): + logger.warning('Renewed certificate will be used for this request, but failed to persist to database for future use.') + return new_cert_pem, new_key_pem + except Exception as e: + logger.error(f'Candlepin lifecycle (check-in / renewal) failed: {e}; will attempt mTLS with existing cert') + return cert_pem, key_pem + + +def get_or_generate_candlepin_certificate(): + """ + Get or generate Candlepin certificate for analytics authentication. + + This function provides certificate-based authentication for analytics uploads. + It will: + 1. Check for existing certificate in conf_settings + 2. If missing, attempt to register with Candlepin (credentials from settings) + 3. If exists, check for renewal needs and refresh if needed + 4. Return the certificate and key as PEM strings + + Returns: + Tuple (cert_pem, key_pem) as strings if certificate is available, (None, None) otherwise. + + Note: + Credentials for registration are retrieved from Django settings internally + (REDHAT_USERNAME/PASSWORD, SUBSCRIPTIONS_USERNAME/PASSWORD, or + SUBSCRIPTIONS_CLIENT_ID/CLIENT_SECRET in priority order). + """ + cert_pem, key_pem, consumer_uuid = _fetch_candlepin_cert_from_db() + + # If no certificate exists, attempt registration + if not cert_pem or not key_pem: + logger.info('No Candlepin certificate found, attempting registration') + cert_pem, key_pem, consumer_uuid = _register_candlepin_consumer() + + if not cert_pem or not key_pem: + logger.debug('Candlepin certificate registration failed or not configured') + return None, None + + # Run lifecycle (check-in and renewal if needed) + if consumer_uuid: + cert_pem, key_pem = _run_candlepin_lifecycle(cert_pem, key_pem, consumer_uuid) + + # Validate certificate is still usable + if not is_cert_valid(cert_pem): + logger.warning('Candlepin certificate is not valid (expired or not yet valid)') + return None, None + + # Return raw PEM strings - caller will create temp files if needed + return cert_pem, key_pem + + +__all__ = [ + 'get_or_generate_candlepin_certificate', + 'resolve_registration_credentials', +] diff --git a/awx/main/utils/candlepin/client.py b/awx/main/utils/candlepin/client.py new file mode 100644 index 0000000000..5a036977fd --- /dev/null +++ b/awx/main/utils/candlepin/client.py @@ -0,0 +1,258 @@ +import os +import tempfile +import uuid as _uuid_mod +from datetime import datetime, timezone +import requests +import logging + +logger = logging.getLogger('awx.main.utils.candlepin') + + +class _temp_cert_files: + """ + Context manager: writes cert + key to secure temp files, auto-deletes on exit. + + Uses NamedTemporaryFile with delete=True for better cleanup on process termination. + Files are unlinked immediately on Unix systems, providing better security against + orphaned private keys in /tmp. + """ + + def __init__(self, cert_pem, key_pem): + self._cert_pem = cert_pem + self._key_pem = key_pem + self._cert_file = None + self._key_file = None + + def __enter__(self): + try: + # Create temp file for certificate + self._cert_file = tempfile.NamedTemporaryFile(mode='w', prefix='candlepin_cert_', suffix='.pem', delete=True) + self._cert_file.write(self._cert_pem) + self._cert_file.flush() + os.chmod(self._cert_file.name, 0o600) + + # Create temp file for private key + self._key_file = tempfile.NamedTemporaryFile(mode='w', prefix='candlepin_key_', suffix='.pem', delete=True) + self._key_file.write(self._key_pem) + self._key_file.flush() + os.chmod(self._key_file.name, 0o600) + + return self._cert_file.name, self._key_file.name + except Exception: + # Clean up on error + if self._cert_file: + self._cert_file.close() + if self._key_file: + self._key_file.close() + raise + + def __exit__(self, *_): + # Closing NamedTemporaryFile automatically deletes it + if self._cert_file: + try: + self._cert_file.close() + except Exception as e: + logger.warning(f'Error closing cert temp file: {e}') + if self._key_file: + try: + self._key_file.close() + except Exception as e: + logger.warning(f'Error closing key temp file: {e}') + + +class CandlepinClient: + """ + Minimal Candlepin REST client for certificate lifecycle operations. + + All API calls authenticate with the consumer identity certificate (mTLS), + matching the pattern used by subscription-manager after initial registration. + + TLS server verification is **enabled** by default (``verify_tls=True``). + Pass ``candlepin_ca`` to verify against a specific CA bundle rather than the + system trust store. Verification can only be disabled by explicitly passing + ``verify_tls=False``; this should be used only in controlled test environments + and never in production. + """ + + def __init__(self, base_url, candlepin_ca=None, proxy=None, verify_tls=True): + self.base_url = base_url.rstrip('/') + if candlepin_ca: + self.verify = candlepin_ca + elif verify_tls: + self.verify = True + else: + # Explicit opt-in required to reach this branch — never set by default. + logger.warning('CandlepinClient: TLS verification is DISABLED (verify_tls=False). Do not use in production.') + self.verify = False + if proxy: + # Use the caller-supplied URL as-is for HTTPS targets (preserves the + # intended scheme — usually http:// so requests uses plain HTTP to reach + # the proxy and issues CONNECT for TLS tunneling, but https:// is also + # accepted for the rare case of an HTTPS-fronted proxy). + # The http:// key always uses plain HTTP since non-TLS traffic never + # needs TLS to the proxy itself. + host = proxy.split('://', 1)[-1] + self.proxies = {'https': proxy, 'http': f'http://{host}'} + else: + self.proxies = {} + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def register_consumer(self, username, password, org, install_uuid=None): + """POST /consumers?owner={org} — register a new AAP consumer with basic auth. + + Uses the customer's Red Hat subscription credentials (REDHAT_USERNAME / + REDHAT_PASSWORD from AWX conf_setting) to register this controller + instance as a Candlepin consumer and obtain an identity certificate for mTLS. + + Args: + username: Red Hat subscription username (from REDHAT_USERNAME). + password: Red Hat subscription password (from REDHAT_PASSWORD). + org: Candlepin owner/org key (retrieved with subscription credentials). + install_uuid: AWX INSTALL_UUID used as the consumer's aap.instance_uuid + fact; falls back to a random UUID if not provided. + + Returns: + Tuple ``(cert_pem, key_pem, consumer_uuid)``. + + Raises: + RuntimeError on any network or API failure. + """ + url = f'{self.base_url}/consumers' + instance_uuid = install_uuid or str(_uuid_mod.uuid4()) + payload = { + 'name': f'aap-{instance_uuid[:8]}', + 'type': {'label': 'aap'}, + 'facts': { + 'system.certificate_version': '3.3', + 'system.name': 'aap-controller', + 'aap.instance_uuid': instance_uuid, + }, + } + try: + resp = requests.post( + url, + params={'owner': org}, + auth=(username, password), + json=payload, + headers={'Content-Type': 'application/json'}, + verify=self.verify, + proxies=self.proxies, + timeout=120, + ) + except Exception as e: + raise RuntimeError(f'Candlepin register_consumer network error: {e}') from e + + if not resp.ok: + raise RuntimeError(f'Candlepin register_consumer failed with status {resp.status_code}: {resp.text}') + + try: + body = resp.json() + consumer_uuid = body.get('uuid') + id_cert = body.get('idCert', {}) + cert_pem = id_cert.get('cert') + key_pem = id_cert.get('key') + except Exception as e: + raise RuntimeError(f'Candlepin register_consumer: could not parse response JSON: {e}') from e + + if not consumer_uuid or not cert_pem or not key_pem: + raise RuntimeError('Candlepin register_consumer: response missing uuid, idCert.cert or idCert.key') + + logger.info(f'Candlepin consumer registered successfully (uuid={consumer_uuid})') + return cert_pem, key_pem, consumer_uuid + + def get_consumer(self, consumer_uuid, cert_pem, key_pem): + """GET /consumers/{uuid} — retrieve consumer information from server. + + Best-effort: logs a warning on failure but never raises. + + Returns: + Dict with consumer data (including 'idCert' with serial) on success, + None on any failure. + """ + url = f'{self.base_url}/consumers/{consumer_uuid}' + try: + with _temp_cert_files(cert_pem, key_pem) as (cert_path, key_path): + resp = requests.get( + url, + cert=(cert_path, key_path), + verify=self.verify, + proxies=self.proxies, + timeout=30, + ) + if resp.status_code == 200: + logger.debug(f'Candlepin get_consumer successful for consumer {consumer_uuid}') + return resp.json() + logger.warning(f'Candlepin get_consumer returned unexpected status {resp.status_code} for consumer {consumer_uuid}') + return None + except Exception as e: + logger.warning(f'Candlepin get_consumer failed for consumer {consumer_uuid}: {e}') + return None + + def checkin(self, consumer_uuid, cert_pem, key_pem): + """PUT /consumers/{uuid} — reset inactivity timer. + + Best-effort: logs a warning on failure but never raises so that a + transient Candlepin outage cannot abort a gather run. + + Returns True on success, False on any failure. + """ + url = f'{self.base_url}/consumers/{consumer_uuid}' + try: + with _temp_cert_files(cert_pem, key_pem) as (cert_path, key_path): + resp = requests.put( + url, + cert=(cert_path, key_path), + json={'facts': {'aap.last_checkin': datetime.now(timezone.utc).isoformat()}}, + headers={'Content-Type': 'application/json'}, + verify=self.verify, + proxies=self.proxies, + timeout=30, + ) + if resp.status_code in (200, 204): + logger.info(f'Candlepin check-in successful for consumer {consumer_uuid}') + return True + logger.warning(f'Candlepin check-in returned unexpected status {resp.status_code} for consumer {consumer_uuid}') + return False + except Exception as e: + logger.warning(f'Candlepin check-in failed for consumer {consumer_uuid}: {e}') + return False + + def regenerate_cert(self, consumer_uuid, cert_pem, key_pem): + """POST /consumers/{uuid} — regenerate the identity certificate. + + Returns ``(new_cert_pem, new_key_pem)`` on success. + Raises ``RuntimeError`` on API or parsing failure so the caller can + decide whether to fall back to service-account auth. + """ + url = f'{self.base_url}/consumers/{consumer_uuid}' + with _temp_cert_files(cert_pem, key_pem) as (cert_path, key_path): + try: + resp = requests.post( + url, + cert=(cert_path, key_path), + verify=self.verify, + proxies=self.proxies, + timeout=120, + ) + except Exception as e: + raise RuntimeError(f'Candlepin regenerate_cert network error for consumer {consumer_uuid}: {e}') from e + + if not resp.ok: + raise RuntimeError(f'Candlepin regenerate_cert failed with status {resp.status_code} for consumer {consumer_uuid}: {resp.text}') + + try: + body = resp.json() + id_cert = body.get('idCert', {}) + new_cert_pem = id_cert.get('cert') + new_key_pem = id_cert.get('key') + except Exception as e: + raise RuntimeError(f'Candlepin regenerate_cert: could not parse response JSON: {e}') from e + + if not new_cert_pem or not new_key_pem: + raise RuntimeError(f'Candlepin regenerate_cert: response did not contain idCert.cert / idCert.key for consumer {consumer_uuid}') + + logger.info(f'Candlepin cert regenerated successfully for consumer {consumer_uuid}') + return new_cert_pem, new_key_pem diff --git a/awx/main/utils/candlepin/lifecycle.py b/awx/main/utils/candlepin/lifecycle.py new file mode 100644 index 0000000000..dba476f80d --- /dev/null +++ b/awx/main/utils/candlepin/lifecycle.py @@ -0,0 +1,221 @@ +""" +Candlepin certificate lifecycle helpers. + +is_cert_valid — quick parseable/non-expired guard used at ship time +parse_cert — extract metadata from a PEM cert string +needs_renewal — check whether a cert is within the renewal window +run_candlepin_lifecycle — orchestrate check-in + proactive renewal per gather run +""" + +import os +from datetime import datetime, timezone + +from cryptography import x509 +from django.conf import settings + +import logging + +logger = logging.getLogger('awx.main.utils.candlepin') + +from .client import CandlepinClient + +# --------------------------------------------------------------------------- +# Certificate helpers +# --------------------------------------------------------------------------- + + +def parse_cert(pem_text): + """Parse a PEM certificate and return a metadata dict. + + Returns a dict with keys: serial, cn, issuer_cn, issuer_org, + not_before, not_after, days_remaining, validity_days. + + Raises ``ValueError`` if the PEM cannot be parsed. + """ + data = pem_text.encode('utf-8') if isinstance(pem_text, str) else pem_text + try: + cert = x509.load_pem_x509_certificate(data) + except Exception as e: + raise ValueError(f'Could not parse PEM certificate: {e}') from e + + expiry = cert.not_valid_after_utc + remaining = expiry - datetime.now(timezone.utc) + + subject = {attr.oid._name: attr.value for attr in cert.subject} + issuer = {attr.oid._name: attr.value for attr in cert.issuer} + + return { + 'serial': str(cert.serial_number), + 'cn': subject.get('commonName', 'unknown'), + 'issuer_cn': issuer.get('commonName', 'unknown'), + 'issuer_org': issuer.get('organizationName', 'unknown'), + 'not_before': cert.not_valid_before_utc.isoformat(), + 'not_after': expiry.isoformat(), + 'days_remaining': remaining.days, + 'validity_days': (expiry - cert.not_valid_before_utc).days, + } + + +def is_cert_valid(cert_pem: str) -> bool: + """Return True if cert_pem is parseable, already valid, and not yet expired. + + Logs a warning (suitable for operator visibility) when the cert is not yet + valid, expired, or unparseable, then returns False so the caller can fall + back to service-account authentication. + """ + try: + info = parse_cert(cert_pem) + now = datetime.now(timezone.utc) + not_before = datetime.fromisoformat(info['not_before']) + if now < not_before: + logger.warning(f'Candlepin cert is not yet valid (not_before={info["not_before"]}); falling back to service account auth') + return False + if info['days_remaining'] < 0: + logger.warning(f'Candlepin cert expired at {info["not_after"]}; falling back to service account auth') + return False + return True + except ValueError as e: + logger.warning(f'Could not parse Candlepin cert: {e}') + return False + + +def needs_renewal(pem_text, days_before_expiry): + """Return True if the cert expires within ``days_before_expiry`` days. + + Also returns True if the cert is already expired (days_remaining < 0). + Raises ``ValueError`` if the PEM cannot be parsed. + """ + info = parse_cert(pem_text) + return info['days_remaining'] <= days_before_expiry + + +# --------------------------------------------------------------------------- +# Lifecycle orchestration +# --------------------------------------------------------------------------- + + +def run_candlepin_lifecycle(cert_pem, key_pem, consumer_uuid, *, candlepin_url=None, renewal_days=90, candlepin_ca=None, proxy=None): + """Perform check-in and, if needed, proactive cert renewal. + + Called once per gather run. Returns ``(cert_pem, key_pem)`` — either + the originals (if no renewal was needed) or the freshly regenerated pair. + + Args: + cert_pem: Consumer identity certificate PEM string. + key_pem: Consumer identity key PEM string. + consumer_uuid: Candlepin consumer UUID string. + candlepin_url: Candlepin base URL (defaults to prod). + renewal_days: Renew if expiry is within this many days (default 90). + candlepin_ca: Path to Candlepin CA cert for server verification + (default None → uses system trust store). + proxy: Optional HTTP/HTTPS proxy URL string. + + Returns: + Tuple ``(cert_pem, key_pem)`` — possibly updated after renewal. + + Raises: + RuntimeError if cert regeneration is attempted and fails. + """ + client = CandlepinClient(base_url=candlepin_url, candlepin_ca=candlepin_ca, proxy=proxy) + + # Step 1: Inspect cert metadata for diagnostics and renewal decision. + try: + info = parse_cert(cert_pem) + except ValueError as e: + logger.warning(f'Candlepin lifecycle: could not parse cert, skipping lifecycle: {e}') + return cert_pem, key_pem + + logger.info(f'Candlepin cert: serial={info["serial"]}, CN={info["cn"]}, expires={info["not_after"]}, days_remaining={info["days_remaining"]}') + + # Step 2: Check-in (best-effort, never raises). + checkin_success = client.checkin(consumer_uuid, cert_pem, key_pem) + if not checkin_success: + logger.warning( + f'Candlepin check-in failed for consumer {consumer_uuid}. ' + f'Consumer may have been deleted server-side or certificate is invalid. ' + f'Lifecycle will continue but may fail.' + ) + + # Step 3: Compare local cert serial with server's serial. + # If they differ, the server has issued a new cert (e.g., admin regenerated it). + consumer_data = client.get_consumer(consumer_uuid, cert_pem, key_pem) + if not consumer_data: + if not checkin_success: + logger.error( + f'Both check-in and get_consumer failed for consumer {consumer_uuid}. ' + f'Consumer was likely deleted from Candlepin server. ' + f'Re-registration may be required. Will attempt cert renewal anyway.' + ) + else: + logger.warning(f'Could not retrieve consumer data for {consumer_uuid} but check-in succeeded. Continuing lifecycle.') + else: + server_cert_pem = consumer_data.get('idCert', {}).get('cert') + if server_cert_pem: + try: + server_info = parse_cert(server_cert_pem) + server_serial = server_info['serial'] + local_serial = info['serial'] + + if server_serial != local_serial: + logger.warning( + f'Candlepin cert serial mismatch: local={local_serial}, server={server_serial}. ' + f'Server has issued a new certificate; requesting updated cert.' + ) + # Fetch the new cert from the server + new_cert_pem, new_key_pem = client.regenerate_cert(consumer_uuid, cert_pem, key_pem) + + try: + new_info = parse_cert(new_cert_pem) + logger.info(f'Candlepin cert updated: old serial={local_serial}, new serial={new_info["serial"]}, new expiry={new_info["not_after"]}') + except ValueError: + logger.warning('Candlepin lifecycle: could not parse updated cert for logging') + + return new_cert_pem, new_key_pem + else: + logger.debug(f'Candlepin cert serial matches server: {local_serial}') + except ValueError as e: + logger.warning(f'Candlepin lifecycle: could not parse server cert from get_consumer: {e}') + + # Step 4: Proactive renewal if within the renewal window (or already expired). + if needs_renewal(cert_pem, renewal_days): + logger.info(f'Candlepin cert expires in {info["days_remaining"]} days (threshold: {renewal_days}); requesting renewal for consumer {consumer_uuid}') + new_cert_pem, new_key_pem = client.regenerate_cert(consumer_uuid, cert_pem, key_pem) + + try: + new_info = parse_cert(new_cert_pem) + logger.info(f'Candlepin cert renewed: old serial={info["serial"]}, new serial={new_info["serial"]}, new expiry={new_info["not_after"]}') + except ValueError: + logger.warning('Candlepin lifecycle: could not parse renewed cert for logging') + + return new_cert_pem, new_key_pem + + logger.info(f'Candlepin cert is healthy ({info["days_remaining"]} days remaining); no renewal needed') + return cert_pem, key_pem + + +def get_candlepin_url(): + """Get Candlepin base URL from Django settings.""" + return settings.AWX_ANALYTICS_CANDLEPIN_URL + + +def get_renewal_days(): + """Get certificate renewal threshold in days from Django settings.""" + return settings.AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS + + +def get_candlepin_ca(): + """Get Candlepin CA certificate path from Django settings. + + Returns: + str: Path to CA certificate file if configured and exists, None otherwise. + """ + ca_path = settings.AWX_ANALYTICS_CANDLEPIN_CA + if ca_path and not os.path.isfile(ca_path): + logger.warning(f'Configured Candlepin CA certificate not found at {ca_path}, using system default CA bundle') + return None + return ca_path + + +def get_proxy_url(): + """Get proxy URL from Django settings.""" + return settings.AWX_ANALYTICS_CANDLEPIN_PROXY_URL diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 30874b6d61..8a67da9e31 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -538,6 +538,9 @@ AUTOMATION_ANALYTICS_LAST_GATHER = None # Last gathered entries for expensive Analytics AUTOMATION_ANALYTICS_LAST_ENTRIES = '' +# Candlepin integration settings for analytics authentication +AWX_ANALYTICS_CANDLEPIN_URL = 'https://subscription.rhsm.redhat.com/subscription/' + # Default list of modules allowed for ad hoc commands. # Note: This setting may be overridden by database settings. AD_HOC_COMMANDS = [