mirror of
https://github.com/ansible/awx.git
synced 2026-06-23 07:37:50 -02:30
Implement Candlepin certificate integration (#16388)
* AAP-12516 [option 2] Handle nested workflow artifacts via root node `ancestor_artifacts` (#16381) * Add new test for artfact precedence upstream node vs outer workflow * Fix bugs, upstream artifacts come first for precedence * Track nested artifacts path through ancestor_artifacts on root nodes * Fix case where first root node did not get the vars * touchup comment * Prevent conflict with sliced jobs hack * Reorder URLs so that Django debug toolbar can work (#16352) * Reorder URLs so that Django debug toolbar can work * Move comment with URL move * feat: support for oidc credential /test endpoint (#16370) Adds support for testing external credentials that use OIDC workload identity tokens. When FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED is enabled, the /test endpoints return JWT payload details alongside test results. - Add OIDC credential test endpoints with job template selection - Return JWT payload and secret value in test response - Maintain backward compatibility (detail field for errors) - Add comprehensive unit and functional tests - Refactor shared error handling logic Co-authored-by: Daniel Finca <dfinca@redhat.com> Co-authored-by: melissalkelly <melissalkelly1@gmail.com> * Bind the install bundle to the ansible.receptor collection 2.0.8 version (#16396) * [Devel] Config Endpoint Optimization (#16389) * Improved performance of the config endpoint by reducing database queries in GET /api/controller/v2/config/ * Fix OIDC workload identity for inventory sync (#16390) The cloud credential used by inventory updates was not going through the OIDC workload identity token flow because it lives outside the normal _credentials list. This overrides populate_workload_identity_tokens in RunInventoryUpdate to include the cloud credential as an additional_credentials argument to the base implementation, and patches get_cloud_credential on the instance so the injector picks up the credential with OIDC context intact. Co-authored-by: Alan Rominger <arominge@redhat.com> Co-authored-by: Dave Mulford <dmulford@redhat.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: integrate awx-tui to the awx_devel image (#16399) * Aap 45980 (#16395) * support bitbucket_dc webhooks * add test * update docs * fix import for refactored method (#16394) retrieve_workload_identity_jwt_with_claims is now in a separate utility file, not in jobs.py Signed-off-by: Seth Foster <fosterbseth@gmail.com> * AAP-70257 controller collection should retry transient HTTP errors with exponential backoff. (#16415) controller collection should retry transient HTTP errors with exponential backoff * AAP-71844 Fix rrule fast-forward across DST boundaries (#16407) Fix rrule fast-forward producing wrong occurrences across DST boundaries The UTC round-trip in _fast_forward_rrule shifts the dtstart's local hour when the original and fast-forwarded times are in different DST periods. Since dateutil generates HOURLY occurrences by stepping in local time, the shifted hour changes the set of reachable hours. With BYHOUR constraints this causes a ValueError crash; without BYHOUR, occurrences are silently shifted by 1 hour. Fix by performing all arithmetic in the dtstart's original timezone. Python aware-datetime subtraction already computes absolute elapsed time regardless of timezone, so the UTC conversion was unnecessary for correctness and actively harmful during fall-back ambiguity. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Correctly restrict push actions to ownership repos (#16398) * Correctly restrict push actions to ownership repos * Use standard action to see if push actions should run * Run spec job for 2.6 and higher * Be even more restrictve, do not push if on a fork * [Devel] Performance Optimization for Select Hosts Query (#16413) * Fixed black reformating * Make test simulate 500k hosts in real world scenario * feat: improve unauthorized response on aap deployments (#16422) * fix: do not include secret values in the credentials test endpoint an… (#16425) fix: do not include secret values in the credentials test endpoint and add a guard to make sure credentials are testable * [devel backport] AAP-41742: Fix workflow node update failing when JT has unprompted labels (#16426) * AAP-41742: Fix workflow node update failing when JT has unprompted labels PATCH extra_data on a workflow node fails with {"labels":["Field is not configured to prompt on launch."]} when the node has labels associated but the JT has ask_labels_on_launch=False. The serializer was passing all persisted M2M state from prompts_dict() to _accept_or_ignore_job_kwargs() on every PATCH, re-validating unchanged fields. Fix scopes validation to only the fields in the request; full re-validation still occurs when unified_job_template is being changed. * Capture attrs keys before _build_mock_obj mutates them _build_mock_obj() pops pseudo-fields (limit, scm_branch, job_tags, etc.) from attrs. Computing requested_prompt_fields after the pop would miss those fields and skip their ask_on_launch validation. * Include survey_passwords when validating extra_vars prompts prompts_dict() emits survey_passwords alongside extra_vars. _accept_or_ignore_job_kwargs uses it to decrypt encrypted survey values before validation. Without it, encrypted password blobs are validated as-is against the survey spec. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> * feat: add test to ensure credential secret values are not returned (#16434) * AAP-68024 perf: derive last_job_host_summary from query instead of denormalized FK (#16332) * perf: stop eagerly updating Host.last_job_host_summary on every job completion The playbook_on_stats wrapup path bulk-updates last_job_host_summary_id on every host touched by a job. In the Q4CY25 scale lab this query had a median execution time of 75 seconds due to index churn on main_host. Replace all reads of the denormalized FK with a new classmethod JobHostSummary.latest_for_host(host_id) that queries for the most recent summary on demand. This eliminates the write-side bulk_update of last_job_host_summary_id entirely. Changes: - Add JobHostSummary.latest_for_host() classmethod - Serializer: use latest_for_host() instead of obj.last_job_host_summary - Dashboard view: use subquery instead of FK traversal for failed hosts - Inventory.update_computed_fields: use subquery for failed host count - events.py: remove last_job_host_summary_id from bulk_update - signals.py: simplify _update_host_last_jhs to only update last_job - access.py/managers.py: remove select_related/defer through the FK The FK field on Host is left in place for now (removal requires a migration) but is no longer written to. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix .pk AttributeError, add job_template annotations, annotate host sublists - Add 'pk' to AnnotatedSummary dynamic type (fixes AttributeError in get_related) - Add job_template_id and job_template_name to subquery annotations so list views include these fields in summary_fields.last_job (matching detail views) - Traverse job__ FK from JobHostSummary instead of using separate UnifiedJob subquery with OuterRef on another annotation (cleaner SQL, avoids alias issue) - Annotate all host sublist views (InventoryHostsList, GroupHostsList, GroupAllHostsList, InventorySourceHostsList) to prevent N+1 queries Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Update test_events to use JobHostSummary.latest_for_host instead of stale FKs Tests were asserting host.last_job_id and host.last_job_host_summary_id which are no longer updated. Use JobHostSummary.latest_for_host() to derive the same data, matching the new read-time derivation approach. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Remove stale failures_url from deprecated DashboardView The failures_url linked to ?last_job_host_summary__failed=True which filters on the now-stale FK. The dashboard count itself was already fixed to use a subquery annotation. Since DashboardView is deprecated and has_active_failures is a SerializerMethodField (not filterable), remove the failures_url entirely rather than creating a custom filter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Apply black formatting to changed files Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Refactor: replace 10 subquery annotations with bulk prefetch Instead of annotating every host queryset with 10 correlated subqueries (summary + job + job_template fields), annotate only _latest_summary_id and bulk-fetch the full JobHostSummary objects after pagination via select_related('job', 'job__job_template'). This reduces the SQL from 10 correlated subqueries to 1 subquery + 1 IN query, addressing review feedback about annotation overhead on host list views. - _annotate_host_latest_summary: only annotates _latest_summary_id - _prefetch_latest_summaries: bulk-fetches and attaches to host objects - HostSummaryPrefetchMixin: hooks into list() after pagination - Serializer uses real JobHostSummary objects (no more AnnotatedSummary) - to_representation always overwrites stale FK values Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Refactor: move latest summary to QuerySet._fetch_all + Host.latest_summary Per review feedback, replace the view-level HostSummaryPrefetchMixin with a custom QuerySet that bulk-attaches summaries at evaluation time (like prefetch_related), and a Host.latest_summary property as the single access point. - HostLatestSummaryQuerySet: overrides _fetch_all() to bulk-fetch JobHostSummary objects with select_related after queryset evaluation - HostManager now inherits from the custom queryset via from_queryset() - Host.latest_summary property: uses cache if available, falls back to individual query - Remove _annotate_host_latest_summary, _prefetch_latest_summaries, HostSummaryPrefetchMixin from views — no more list() override needed - Remove last_job/last_job_host_summary from SUMMARIZABLE_FK_FIELDS - Serializer uses obj.latest_summary and DEFAULT_SUMMARY_FIELDS loop Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix: scope annotation to views, restore license_error/canceled_on - Remove with_latest_summary_id() from HostManager.get_queryset() to avoid applying the correlated subquery to every Host query globally (count, exists, internal relations) - Apply with_latest_summary_id() in get_queryset() of the 6 host-serving views only - Restore license_error and canceled_on to last_job summary fields to avoid breaking API change Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Guard _fetch_all() to skip bulk-attach on non-annotated querysets Without this guard, _fetch_all() would set _latest_summary_cache=None on every host in non-annotated querysets (e.g. Host.objects.filter()), masking the per-object fallback query in Host.latest_summary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Remove name from last_job_host_summary and canceled_on from last_job summary Per reviewer feedback: these fields were not in the original API contract via SUMMARIZABLE_FK_FIELDS and their addition would be an API change. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add functional tests for HostLatestSummaryQuerySet and Host.latest_summary Tests cover: - with_latest_summary_id() annotation and most-recent selection - _fetch_all() bulk-attach behavior on annotated querysets - _fetch_all() skips non-annotated querysets (preserves fallback) - .count() and .exists() do NOT trigger _fetch_all - Host.latest_summary cache hits (zero queries) and fallback - Host.latest_job property - select_related on bulk-attached summaries (no N+1) - Chaining preserves annotation - Multiple jobs / partial host coverage Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Apply black formatting to test_host_queryset.py Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Ben Thomasson <bthomass@redhat.com> * Fix flake8 F841: remove unused job1/job2 variables in tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Ben Thomasson <bthomass@redhat.com> * Add comment explaining why Prefetch was not used for host latest summary Django Prefetch cannot handle latest per group -- [:1] slicing fetches 1 record globally, not per host (Django ticket #26780). The custom _fetch_all override uses the same 2-query pattern as prefetch_related internally, customized for this use case. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Fix null handling to keep old behavior --------- Signed-off-by: Ben Thomasson <bthomass@redhat.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: AlanCoding <arominge@redhat.com> * [AAP-72722] Use url instead of jwt_aud for workload identity audience (#16432) * [AAP-72722] Use url instead of jwt_aud for workload identity audience The OIDC credential plugin's jwt_aud field is being removed. Use the plugin's url field as the audience when requesting workload identity tokens, since the target service URL is the appropriate audience value. Assisted-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [Devel] Optimize host_list_rbac query (#16408) * Defer ansible_facts in HostManager to avoid fetching large JSON column in host list queries (AAP-68023) The host list endpoint (GET /api/v2/hosts/) fetches the ansible_facts JSON column unnecessarily, contributing to the 7.8s median query time at scale. This column can be very large and is not used by the list serializer. Changes: - HostManager.get_queryset() now defers ansible_facts - finish_fact_cache call site uses .only(*HOST_FACTS_FIELDS) to eagerly load ansible_facts when actually needed, avoiding N+1 queries - Unit test mocks updated to support .only() queryset chaining - Points DAB dependency at the RBAC query optimization branch for combined testing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- * fix: constructed inventories no longer increase the host count (#16433) * Fix version worktree (#16431) * git worktree friendly precomit install * worktrees don't have a .git directory. Before, docker-compose would trigger pre-commit install and fail. * make docker-compose work in git worktree * AWX tries to discover the version via info stored in .git/ dir. setuptools-scm is capable of finding the .git/ dir, starting from a worktree, but is unable because only the worktree is mapped into the container, not the .git/ dir itself. Thus, we have to detect and pass the version into the container from outside. That is why this change landed in the Makefile. * fix: as_user() gateway session cookie fallback (#16437) Add a fallback that checks for `gateway_sessionid` when no cookie matches `session_cookie_name`, mirroring the existing fallback in `Connection.login()`. The finally block now cleans up whichever cookie name was actually used. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> * Pass setting to dispatcherd so it can be configured (#16438) * fix: allow blank password field to fix OpenAPI schema validation (#16440) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> * first pass porting over metrics * move settings to defaults * add more unit tests * update unit tests * lint fixes * more lint fixes * refactor and address feedback * remove the api views * remove model and move helper functions out of licensing * add settings to API, fix tests, refactoring * fix circular import * update tests * remove duplicate code, handle edge cases, use clearer naming, add test coverage * update test for changes in ship() * remove unneeded setting * _discover_org should account for verify-tls=False * directly assign settings, detect url, update tests * log errors close to occurance * rename function for clarity, focus on critical tests * rename for clarity, lint fixes * fix test params, priority for org discovery * fix test failures and linting --------- Signed-off-by: Seth Foster <fosterbseth@gmail.com> Signed-off-by: Ben Thomasson <bthomass@redhat.com> Co-authored-by: Alan Rominger <arominge@redhat.com> Co-authored-by: Daniel Finca <dfinca@redhat.com> Co-authored-by: melissalkelly <melissalkelly1@gmail.com> Co-authored-by: Tong He <68936428+unnecessary-username@users.noreply.github.com> Co-authored-by: Stevenson Michel <iamstevensonmichel@outlook.com> Co-authored-by: Seth Foster <fosterseth@users.noreply.github.com> Co-authored-by: Dave Mulford <dmulford@redhat.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Adrià Sala <22398818+adrisala@users.noreply.github.com> Co-authored-by: Peter Braun <pbraun@redhat.com> Co-authored-by: Sean Sullivan <ssulliva@redhat.com> Co-authored-by: Dirk Julich <djulich@redhat.com> Co-authored-by: Ben Thomasson <bthomass@redhat.com> Co-authored-by: Dan Leehr <dleehr@users.noreply.github.com> Co-authored-by: Lila Yasin <lyasin@redhat.com> Co-authored-by: Chris Meyers <chrismeyersfsu@users.noreply.github.com>
This commit is contained in:
@@ -74,9 +74,9 @@ def temp_analytic_tar():
|
||||
|
||||
@pytest.fixture
|
||||
def mock_analytic_post():
|
||||
# Patch the Session.post method to return a mock response with status_code 200
|
||||
with mock.patch('awx.main.analytics.core.requests.Session.post', return_value=mock.Mock(status_code=200)) as mock_post:
|
||||
yield mock_post
|
||||
# Patch get_or_generate_candlepin_certificate to skip mTLS path
|
||||
with mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate', return_value=(None, None)):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -141,15 +141,22 @@ def mock_analytic_post():
|
||||
)
|
||||
@pytest.mark.django_db
|
||||
def test_ship_credential(setting_map, expected_result, expected_auth, temp_analytic_tar, mock_analytic_post):
|
||||
with override_settings(**setting_map):
|
||||
result = ship(temp_analytic_tar)
|
||||
with override_settings(**setting_map, AUTOMATION_ANALYTICS_URL='https://example.com/api'):
|
||||
with mock.patch('awx.main.analytics.core.OIDCClient') as mock_oidc:
|
||||
mock_oidc_instance = mock.Mock()
|
||||
mock_oidc_instance.make_request.return_value = mock.Mock(status_code=200)
|
||||
mock_oidc.return_value = mock_oidc_instance
|
||||
|
||||
assert result == expected_result
|
||||
if expected_auth:
|
||||
mock_analytic_post.assert_called_once()
|
||||
assert mock_analytic_post.call_args[1]['auth'] == expected_auth
|
||||
else:
|
||||
mock_analytic_post.assert_not_called()
|
||||
result = ship(temp_analytic_tar)
|
||||
|
||||
assert result == expected_result
|
||||
if expected_auth:
|
||||
# Verify OIDC client was instantiated with correct credentials
|
||||
mock_oidc.assert_called_once_with(expected_auth[0], expected_auth[1])
|
||||
mock_oidc_instance.make_request.assert_called_once()
|
||||
else:
|
||||
# When credentials are missing, OIDCClient should not be called
|
||||
mock_oidc.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
||||
271
awx/main/tests/unit/analytics/test_core_ship.py
Normal file
271
awx/main/tests/unit/analytics/test_core_ship.py
Normal file
@@ -0,0 +1,271 @@
|
||||
# Copyright (c) 2026 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
"""Tests for analytics ship() function with mTLS authentication."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from awx.main.analytics.core import ship, _get_cert_upload_url
|
||||
|
||||
|
||||
class TestGetCertUploadUrl:
|
||||
"""Test _get_cert_upload_url() helper function."""
|
||||
|
||||
def test_adds_cert_subdomain(self):
|
||||
"""Test that 'cert.' is added to hostname."""
|
||||
url = 'https://analytics.example.com/api/ingress/v1/upload'
|
||||
result = _get_cert_upload_url(url)
|
||||
assert result == 'https://cert.analytics.example.com/api/ingress/v1/upload'
|
||||
|
||||
def test_preserves_existing_cert_subdomain(self):
|
||||
"""Test that existing 'cert.' subdomain is preserved."""
|
||||
url = 'https://cert.analytics.example.com/api/ingress/v1/upload'
|
||||
result = _get_cert_upload_url(url)
|
||||
assert result == 'https://cert.analytics.example.com/api/ingress/v1/upload'
|
||||
|
||||
|
||||
class TestShipMTLS:
|
||||
"""Test ship() function's mTLS authentication path."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Create a temporary tarball for testing."""
|
||||
self.temp_file = tempfile.NamedTemporaryFile(mode='wb', suffix='.tar.gz', delete=False)
|
||||
self.temp_file.write(b'test tarball content')
|
||||
self.temp_file.close()
|
||||
self.tarball_path = self.temp_file.name
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up temporary tarball."""
|
||||
if os.path.exists(self.tarball_path):
|
||||
os.unlink(self.tarball_path)
|
||||
|
||||
@override_settings(
|
||||
AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload',
|
||||
INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz',
|
||||
INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt',
|
||||
REDHAT_USERNAME='test_user',
|
||||
REDHAT_PASSWORD='test_pass', # NOSONAR
|
||||
AWX_TASK_ENV={},
|
||||
)
|
||||
@mock.patch('awx.main.analytics.core.get_awx_http_client_headers')
|
||||
@mock.patch('awx.main.analytics.core._temp_cert_files')
|
||||
@mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate')
|
||||
@mock.patch('awx.main.analytics.core.requests.Session')
|
||||
def test_ship_with_mtls_success(self, mock_session_class, mock_get_cert, mock_temp_files, mock_headers):
|
||||
"""Test successful upload with mTLS certificate authentication."""
|
||||
# Mock headers to avoid database access
|
||||
mock_headers.return_value = {'Content-Type': 'application/json'}
|
||||
|
||||
# Mock certificate retrieval
|
||||
mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data')
|
||||
|
||||
# Mock temp files context manager
|
||||
mock_temp_files.return_value.__enter__.return_value = ('/tmp/cert.pem', '/tmp/key.pem')
|
||||
mock_temp_files.return_value.__exit__.return_value = None
|
||||
|
||||
# Mock successful mTLS response
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_session = mock.Mock()
|
||||
mock_session.headers = {}
|
||||
mock_session.post.return_value = mock_response
|
||||
mock_session_class.return_value = mock_session
|
||||
|
||||
result = ship(self.tarball_path)
|
||||
|
||||
assert result is True
|
||||
mock_get_cert.assert_called_once()
|
||||
mock_temp_files.assert_called_once_with('cert-pem-data', 'key-pem-data')
|
||||
mock_session.post.assert_called_once()
|
||||
|
||||
# Verify cert URL is used (cert. subdomain added)
|
||||
call_args = mock_session.post.call_args
|
||||
assert call_args[0][0] == 'https://cert.analytics.example.com/api/ingress/v1/upload'
|
||||
|
||||
# Verify mTLS cert was used
|
||||
call_kwargs = call_args[1]
|
||||
assert call_kwargs['cert'] == ('/tmp/cert.pem', '/tmp/key.pem')
|
||||
|
||||
@override_settings(
|
||||
AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload',
|
||||
INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz',
|
||||
INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt',
|
||||
REDHAT_USERNAME='test_user',
|
||||
REDHAT_PASSWORD='test_pass', # NOSONAR
|
||||
AWX_TASK_ENV={},
|
||||
)
|
||||
@mock.patch('awx.main.analytics.core.get_awx_http_client_headers')
|
||||
@mock.patch('awx.main.analytics.core.OIDCClient')
|
||||
@mock.patch('awx.main.analytics.core._temp_cert_files')
|
||||
@mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate')
|
||||
@mock.patch('awx.main.analytics.core.requests.Session')
|
||||
def test_ship_mtls_fallback_to_oidc_on_cert_failure(self, mock_session_class, mock_get_cert, mock_temp_files, mock_oidc_client, mock_headers):
|
||||
"""Test fallback to OIDC auth when mTLS cert authentication fails."""
|
||||
# Mock headers to avoid database access
|
||||
mock_headers.return_value = {'Content-Type': 'application/json'}
|
||||
|
||||
# Mock certificate retrieval
|
||||
mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data')
|
||||
|
||||
# Mock temp files context manager
|
||||
mock_temp_files.return_value.__enter__.return_value = ('/tmp/cert.pem', '/tmp/key.pem')
|
||||
mock_temp_files.return_value.__exit__.return_value = None
|
||||
|
||||
# Mock failed mTLS response (401 Unauthorized)
|
||||
mock_mtls_response = mock.Mock()
|
||||
mock_mtls_response.status_code = 401
|
||||
mock_session = mock.Mock()
|
||||
mock_session.headers = {}
|
||||
mock_session.post.return_value = mock_mtls_response
|
||||
mock_session_class.return_value = mock_session
|
||||
|
||||
# Mock successful OIDC response
|
||||
mock_oidc_response = mock.Mock()
|
||||
mock_oidc_response.status_code = 200
|
||||
mock_oidc_instance = mock.Mock()
|
||||
mock_oidc_instance.make_request.return_value = mock_oidc_response
|
||||
mock_oidc_client.return_value = mock_oidc_instance
|
||||
|
||||
result = ship(self.tarball_path)
|
||||
|
||||
assert result is True
|
||||
# Both mTLS and OIDC should be attempted
|
||||
assert mock_session.post.call_count == 1
|
||||
mock_oidc_instance.make_request.assert_called_once()
|
||||
|
||||
# Verify mTLS used cert URL
|
||||
mtls_call_args = mock_session.post.call_args
|
||||
assert mtls_call_args[0][0] == 'https://cert.analytics.example.com/api/ingress/v1/upload'
|
||||
|
||||
# Verify OIDC used original URL
|
||||
oidc_call_args = mock_oidc_instance.make_request.call_args
|
||||
assert oidc_call_args[0][1] == 'https://analytics.example.com/api/ingress/v1/upload'
|
||||
|
||||
@override_settings(
|
||||
AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload',
|
||||
INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz',
|
||||
INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt',
|
||||
REDHAT_USERNAME='test_user',
|
||||
REDHAT_PASSWORD='test_pass', # NOSONAR
|
||||
AWX_TASK_ENV={},
|
||||
)
|
||||
@mock.patch('awx.main.analytics.core.get_awx_http_client_headers')
|
||||
@mock.patch('awx.main.analytics.core._temp_cert_files')
|
||||
@mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate')
|
||||
@mock.patch('awx.main.analytics.core.OIDCClient')
|
||||
@mock.patch('awx.main.analytics.core.requests.Session')
|
||||
def test_ship_mtls_exception_fallback_to_oidc(self, mock_session_class, mock_oidc_client, mock_get_cert, mock_temp_files, mock_headers):
|
||||
"""Test fallback to OIDC auth when mTLS raises an exception."""
|
||||
# Mock headers to avoid database access
|
||||
mock_headers.return_value = {'Content-Type': 'application/json'}
|
||||
|
||||
# Mock certificate retrieval
|
||||
mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data')
|
||||
|
||||
# Mock temp files context manager raising an exception
|
||||
mock_temp_files.return_value.__enter__.side_effect = OSError('Temp file creation failed')
|
||||
|
||||
# Mock successful OIDC response
|
||||
mock_oidc_response = mock.Mock()
|
||||
mock_oidc_response.status_code = 200
|
||||
mock_oidc_instance = mock.Mock()
|
||||
mock_oidc_instance.make_request.return_value = mock_oidc_response
|
||||
mock_oidc_client.return_value = mock_oidc_instance
|
||||
|
||||
mock_session = mock.Mock()
|
||||
mock_session.headers = {}
|
||||
mock_session_class.return_value = mock_session
|
||||
|
||||
result = ship(self.tarball_path)
|
||||
|
||||
assert result is True
|
||||
# mTLS should fail, OIDC should succeed
|
||||
mock_oidc_instance.make_request.assert_called_once()
|
||||
|
||||
@override_settings(
|
||||
AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload',
|
||||
INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz',
|
||||
INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt',
|
||||
REDHAT_USERNAME='test_user',
|
||||
REDHAT_PASSWORD='test_pass', # NOSONAR
|
||||
AWX_TASK_ENV={},
|
||||
)
|
||||
@mock.patch('awx.main.analytics.core.get_awx_http_client_headers')
|
||||
@mock.patch('awx.main.analytics.core.OIDCClient')
|
||||
@mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate')
|
||||
@mock.patch('awx.main.analytics.core.requests.Session')
|
||||
def test_ship_no_certificate_available(self, mock_session_class, mock_get_cert, mock_oidc_client, mock_headers):
|
||||
"""Test ship() when no Candlepin certificate is available."""
|
||||
# Mock headers to avoid database access
|
||||
mock_headers.return_value = {'Content-Type': 'application/json'}
|
||||
|
||||
# Mock no certificate available
|
||||
mock_get_cert.return_value = (None, None)
|
||||
|
||||
# Mock successful OIDC response
|
||||
mock_oidc_response = mock.Mock()
|
||||
mock_oidc_response.status_code = 200
|
||||
mock_oidc_instance = mock.Mock()
|
||||
mock_oidc_instance.make_request.return_value = mock_oidc_response
|
||||
mock_oidc_client.return_value = mock_oidc_instance
|
||||
|
||||
mock_session = mock.Mock()
|
||||
mock_session.headers = {}
|
||||
mock_session_class.return_value = mock_session
|
||||
|
||||
result = ship(self.tarball_path)
|
||||
|
||||
assert result is True
|
||||
# Should skip mTLS and go straight to OIDC
|
||||
mock_oidc_instance.make_request.assert_called_once()
|
||||
|
||||
@override_settings(
|
||||
AUTOMATION_ANALYTICS_URL='https://analytics.example.com/api/ingress/v1/upload',
|
||||
INSIGHTS_AGENT_MIME='application/vnd.redhat.tower.analytics+tgz',
|
||||
INSIGHTS_CERT_PATH='/etc/pki/tls/certs/ca-bundle.crt',
|
||||
REDHAT_USERNAME='test_user',
|
||||
REDHAT_PASSWORD='test_pass', # NOSONAR
|
||||
AWX_TASK_ENV={},
|
||||
)
|
||||
@mock.patch('awx.main.analytics.core.get_awx_http_client_headers')
|
||||
@mock.patch('awx.main.analytics.core.OIDCClient')
|
||||
@mock.patch('awx.main.analytics.core._temp_cert_files')
|
||||
@mock.patch('awx.main.analytics.core.get_or_generate_candlepin_certificate')
|
||||
@mock.patch('awx.main.analytics.core.requests.Session')
|
||||
def test_ship_both_auth_methods_fail(self, mock_session_class, mock_get_cert, mock_temp_files, mock_oidc_client, mock_headers):
|
||||
"""Test ship() when both mTLS and OIDC authentication fail."""
|
||||
# Mock headers to avoid database access
|
||||
mock_headers.return_value = {'Content-Type': 'application/json'}
|
||||
|
||||
# Mock certificate retrieval
|
||||
mock_get_cert.return_value = ('cert-pem-data', 'key-pem-data')
|
||||
|
||||
# Mock temp files context manager
|
||||
mock_temp_files.return_value.__enter__.return_value = ('/tmp/cert.pem', '/tmp/key.pem')
|
||||
mock_temp_files.return_value.__exit__.return_value = None
|
||||
|
||||
# Mock failed mTLS response
|
||||
mock_mtls_response = mock.Mock()
|
||||
mock_mtls_response.status_code = 401
|
||||
mock_session = mock.Mock()
|
||||
mock_session.headers = {}
|
||||
mock_session.post.return_value = mock_mtls_response
|
||||
mock_session_class.return_value = mock_session
|
||||
|
||||
# Mock failed OIDC response
|
||||
mock_oidc_response = mock.Mock()
|
||||
mock_oidc_response.status_code = 403
|
||||
mock_oidc_response.text = 'Forbidden'
|
||||
mock_oidc_instance = mock.Mock()
|
||||
mock_oidc_instance.make_request.return_value = mock_oidc_response
|
||||
mock_oidc_client.return_value = mock_oidc_instance
|
||||
|
||||
result = ship(self.tarball_path)
|
||||
|
||||
assert result is False
|
||||
mock_session.post.assert_called_once()
|
||||
mock_oidc_instance.make_request.assert_called_once()
|
||||
310
awx/main/tests/unit/management/commands/test_candlepin_cert.py
Normal file
310
awx/main/tests/unit/management/commands/test_candlepin_cert.py
Normal file
@@ -0,0 +1,310 @@
|
||||
# Copyright (c) 2026 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
"""Tests for candlepin_cert management command."""
|
||||
|
||||
from io import StringIO
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.core.management import call_command
|
||||
from django.test.utils import override_settings
|
||||
|
||||
|
||||
class TestCandlepinCertCommand:
|
||||
"""Tests for candlepin_cert management command."""
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_registration_to_db')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.resolve_registration_credentials')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
@override_settings(
|
||||
AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com',
|
||||
AWX_ANALYTICS_CANDLEPIN_CA=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None,
|
||||
)
|
||||
def test_register_success(self, mock_fetch_cert, mock_resolve_creds, mock_client_class, mock_save_reg):
|
||||
"""Test successful registration."""
|
||||
# No existing cert
|
||||
mock_fetch_cert.return_value = (None, None, None)
|
||||
|
||||
# Valid credentials
|
||||
mock_resolve_creds.return_value = ('test_user', 'test_pass', 'test_org', 'install-uuid', None)
|
||||
|
||||
# Mock successful registration
|
||||
mock_client = mock.Mock()
|
||||
mock_client.register_consumer.return_value = ('cert-pem', 'key-pem', 'consumer-uuid')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
# Mock successful save
|
||||
mock_save_reg.return_value = True
|
||||
|
||||
out = StringIO()
|
||||
call_command('candlepin_cert', 'register', stdout=out, stderr=StringIO())
|
||||
|
||||
output = out.getvalue()
|
||||
assert 'Registered successfully' in output
|
||||
assert 'consumer-uuid' in output
|
||||
|
||||
mock_client.register_consumer.assert_called_once_with('test_user', 'test_pass', 'test_org', install_uuid='install-uuid')
|
||||
mock_save_reg.assert_called_once_with('cert-pem', 'key-pem', 'consumer-uuid')
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
def test_register_already_registered_without_force(self, mock_fetch_cert):
|
||||
"""Test registration fails when cert already exists and --force not provided."""
|
||||
# Existing cert
|
||||
mock_fetch_cert.return_value = ('existing-cert', 'existing-key', 'existing-uuid')
|
||||
|
||||
out = StringIO()
|
||||
call_command('candlepin_cert', 'register', stdout=out, stderr=StringIO())
|
||||
|
||||
output = out.getvalue()
|
||||
assert 'already stored' in output
|
||||
assert '--force' in output
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_registration_to_db')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.resolve_registration_credentials')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
@override_settings(
|
||||
AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com',
|
||||
AWX_ANALYTICS_CANDLEPIN_CA=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None,
|
||||
)
|
||||
def test_register_with_force_flag(self, mock_fetch_cert, mock_resolve_creds, mock_client_class, mock_save_reg):
|
||||
"""Test registration succeeds with --force even when cert exists."""
|
||||
# Existing cert
|
||||
mock_fetch_cert.return_value = ('existing-cert', 'existing-key', 'existing-uuid')
|
||||
|
||||
# Valid credentials
|
||||
mock_resolve_creds.return_value = ('test_user', 'test_pass', 'test_org', 'install-uuid', None)
|
||||
|
||||
# Mock successful registration
|
||||
mock_client = mock.Mock()
|
||||
mock_client.register_consumer.return_value = ('new-cert-pem', 'new-key-pem', 'new-consumer-uuid')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
# Mock successful save
|
||||
mock_save_reg.return_value = True
|
||||
|
||||
out = StringIO()
|
||||
call_command('candlepin_cert', 'register', '--force', stdout=out, stderr=StringIO())
|
||||
|
||||
output = out.getvalue()
|
||||
assert 'Registered successfully' in output
|
||||
|
||||
mock_client.register_consumer.assert_called_once()
|
||||
mock_save_reg.assert_called_once_with('new-cert-pem', 'new-key-pem', 'new-consumer-uuid')
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.resolve_registration_credentials')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
def test_register_missing_credentials(self, mock_fetch_cert, mock_resolve_creds):
|
||||
"""Test registration fails when credentials are missing."""
|
||||
mock_fetch_cert.return_value = (None, None, None)
|
||||
|
||||
# Missing credentials
|
||||
mock_resolve_creds.return_value = (None, None, None, None, ['username', 'password'])
|
||||
|
||||
err = StringIO()
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
call_command('candlepin_cert', 'register', stderr=err)
|
||||
|
||||
assert exc_info.value.code == 1
|
||||
error_output = err.getvalue()
|
||||
assert 'Missing required value' in error_output
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_cert_to_db')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.parse_cert')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
@override_settings(
|
||||
AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com',
|
||||
AWX_ANALYTICS_CANDLEPIN_CA=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90,
|
||||
)
|
||||
def test_renew_success(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class, mock_save_cert):
|
||||
"""Test successful certificate renewal."""
|
||||
# Existing cert
|
||||
mock_fetch_cert.return_value = ('old-cert', 'old-key', 'consumer-uuid')
|
||||
|
||||
# Parse cert returns metadata
|
||||
mock_parse_cert.side_effect = [
|
||||
{'serial': '123', 'cn': 'test', 'not_after': '2026-06-01', 'days_remaining': 10}, # Current cert
|
||||
{'serial': '456', 'cn': 'test', 'not_after': '2027-06-01', 'days_remaining': 365}, # Renewed cert
|
||||
]
|
||||
|
||||
# Renewal needed
|
||||
mock_needs_renewal.return_value = True
|
||||
|
||||
# Mock successful check-in and renewal
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = True
|
||||
mock_client.regenerate_cert.return_value = ('new-cert', 'new-key')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
mock_save_cert.return_value = True
|
||||
|
||||
out = StringIO()
|
||||
call_command('candlepin_cert', 'renew', stdout=out, stderr=StringIO())
|
||||
|
||||
output = out.getvalue()
|
||||
assert 'Check-in successful' in output
|
||||
assert 'Certificate renewed successfully' in output
|
||||
assert 'saved to database' in output
|
||||
|
||||
mock_client.checkin.assert_called_once_with('consumer-uuid', 'old-cert', 'old-key')
|
||||
mock_client.regenerate_cert.assert_called_once()
|
||||
mock_save_cert.assert_called_once_with('new-cert', 'new-key')
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
def test_renew_no_cert_in_db(self, mock_fetch_cert):
|
||||
"""Test renew fails when no certificate exists in database."""
|
||||
mock_fetch_cert.return_value = (None, None, None)
|
||||
|
||||
err = StringIO()
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
call_command('candlepin_cert', 'renew', stderr=err)
|
||||
|
||||
assert exc_info.value.code == 1
|
||||
error_output = err.getvalue()
|
||||
assert 'No Candlepin identity certificate found' in error_output
|
||||
assert 'Run the register subcommand first' in error_output
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.parse_cert')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
@override_settings(
|
||||
AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com',
|
||||
AWX_ANALYTICS_CANDLEPIN_CA=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90,
|
||||
)
|
||||
def test_renew_not_needed(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class):
|
||||
"""Test renew when certificate is still valid and renewal not needed."""
|
||||
mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid')
|
||||
|
||||
# Parse cert returns healthy cert
|
||||
mock_parse_cert.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 200}
|
||||
|
||||
# Renewal not needed
|
||||
mock_needs_renewal.return_value = False
|
||||
|
||||
# Mock successful check-in
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = True
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
out = StringIO()
|
||||
call_command('candlepin_cert', 'renew', stdout=out, stderr=StringIO())
|
||||
|
||||
output = out.getvalue()
|
||||
assert 'Check-in successful' in output
|
||||
assert 'No renewal needed' in output
|
||||
|
||||
mock_client.checkin.assert_called_once()
|
||||
mock_client.regenerate_cert.assert_not_called()
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._save_candlepin_cert_to_db')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.parse_cert')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
@override_settings(
|
||||
AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com',
|
||||
AWX_ANALYTICS_CANDLEPIN_CA=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90,
|
||||
)
|
||||
def test_renew_with_force_flag(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class, mock_save_cert):
|
||||
"""Test renew --force renews even when not needed."""
|
||||
mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid')
|
||||
|
||||
# Parse cert
|
||||
mock_parse_cert.side_effect = [
|
||||
{'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 200}, # Current cert (healthy)
|
||||
{'serial': '456', 'cn': 'test', 'not_after': '2027-06-01', 'days_remaining': 365}, # New cert
|
||||
]
|
||||
|
||||
# Would not need renewal without --force
|
||||
mock_needs_renewal.return_value = False
|
||||
|
||||
# Mock successful operations
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = True
|
||||
mock_client.regenerate_cert.return_value = ('new-cert', 'new-key')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
mock_save_cert.return_value = True
|
||||
|
||||
out = StringIO()
|
||||
call_command('candlepin_cert', 'renew', '--force', stdout=out, stderr=StringIO())
|
||||
|
||||
output = out.getvalue()
|
||||
assert 'forced via --force' in output
|
||||
assert 'Certificate renewed successfully' in output
|
||||
|
||||
mock_client.regenerate_cert.assert_called_once()
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.parse_cert')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
@override_settings(
|
||||
AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com',
|
||||
AWX_ANALYTICS_CANDLEPIN_CA=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90,
|
||||
)
|
||||
def test_renew_checkin_failure(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class):
|
||||
"""Test renew handles check-in failure gracefully."""
|
||||
mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid')
|
||||
|
||||
mock_parse_cert.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 100}
|
||||
mock_needs_renewal.return_value = False # Not needed for renewal, just testing check-in failure
|
||||
|
||||
# Mock failed check-in
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = False
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
err = StringIO()
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
call_command('candlepin_cert', 'renew', stderr=err)
|
||||
|
||||
assert exc_info.value.code == 1
|
||||
error_output = err.getvalue()
|
||||
assert 'Check-in with Candlepin failed' in error_output
|
||||
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.CandlepinClient')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.parse_cert')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert.needs_renewal')
|
||||
@mock.patch('awx.main.management.commands.candlepin_cert._fetch_candlepin_cert_from_db')
|
||||
@override_settings(
|
||||
AWX_ANALYTICS_CANDLEPIN_URL='https://test.example.com',
|
||||
AWX_ANALYTICS_CANDLEPIN_CA=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_PROXY_URL=None,
|
||||
AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS=90,
|
||||
)
|
||||
def test_renew_regenerate_cert_failure(self, mock_fetch_cert, mock_needs_renewal, mock_parse_cert, mock_client_class):
|
||||
"""Test renew handles certificate regeneration failure."""
|
||||
mock_fetch_cert.return_value = ('cert', 'key', 'consumer-uuid')
|
||||
|
||||
mock_parse_cert.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2026-06-01', 'days_remaining': 10}
|
||||
mock_needs_renewal.return_value = True
|
||||
|
||||
# Mock successful check-in but failed regeneration
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = True
|
||||
mock_client.regenerate_cert.side_effect = Exception('Certificate regeneration failed')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
err = StringIO()
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
call_command('candlepin_cert', 'renew', stderr=err)
|
||||
|
||||
assert exc_info.value.code == 1
|
||||
error_output = err.getvalue()
|
||||
assert 'Certificate renewal failed' in error_output
|
||||
@@ -0,0 +1,383 @@
|
||||
# Copyright (c) 2026 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from awx.main.utils.candlepin import (
|
||||
_discover_org,
|
||||
_fetch_candlepin_cert_from_db,
|
||||
_fetch_registration_credentials_from_db,
|
||||
_save_candlepin_cert_to_db,
|
||||
_save_candlepin_registration_to_db,
|
||||
_register_candlepin_consumer,
|
||||
_run_candlepin_lifecycle,
|
||||
get_or_generate_candlepin_certificate,
|
||||
resolve_registration_credentials,
|
||||
)
|
||||
|
||||
|
||||
class TestCandlepinCertificateRegistration:
|
||||
"""Tests for Candlepin integration in certificate registration module."""
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.requests.get')
|
||||
@mock.patch('awx.main.utils.candlepin.get_candlepin_ca')
|
||||
def test_discover_org_success(self, mock_get_ca, mock_requests_get):
|
||||
"""Test successful organization discovery."""
|
||||
mock_get_ca.return_value = '/path/to/ca.pem'
|
||||
mock_response = mock.Mock()
|
||||
mock_response.json.return_value = [
|
||||
{'key': 'test_org', 'displayName': 'Test Organization'},
|
||||
{'key': 'other_org', 'displayName': 'Other Organization'},
|
||||
]
|
||||
mock_requests_get.return_value = mock_response
|
||||
|
||||
org = _discover_org('https://candlepin.example.com', 'test_user', 'test_pass')
|
||||
|
||||
assert org == 'test_org'
|
||||
mock_requests_get.assert_called_once_with(
|
||||
'https://candlepin.example.com/users/test_user/owners',
|
||||
auth=('test_user', 'test_pass'),
|
||||
verify='/path/to/ca.pem',
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.requests.get')
|
||||
@mock.patch('awx.main.utils.candlepin.get_candlepin_ca')
|
||||
def test_discover_org_no_ca(self, mock_get_ca, mock_requests_get):
|
||||
"""Test organization discovery without custom CA (uses system certs)."""
|
||||
mock_get_ca.return_value = None
|
||||
mock_response = mock.Mock()
|
||||
mock_response.json.return_value = [{'key': 'test_org', 'displayName': 'Test Organization'}]
|
||||
mock_requests_get.return_value = mock_response
|
||||
|
||||
org = _discover_org('https://candlepin.example.com', 'test_user', 'test_pass')
|
||||
|
||||
assert org == 'test_org'
|
||||
# Should use True for verify when no CA is configured
|
||||
mock_requests_get.assert_called_once_with(
|
||||
'https://candlepin.example.com/users/test_user/owners',
|
||||
auth=('test_user', 'test_pass'),
|
||||
verify=True,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.requests.get')
|
||||
def test_discover_org_no_verify_tls(self, mock_requests_get):
|
||||
"""Test organization discovery with TLS verification disabled."""
|
||||
mock_response = mock.Mock()
|
||||
mock_response.json.return_value = [{'key': 'test_org', 'displayName': 'Test Organization'}]
|
||||
mock_requests_get.return_value = mock_response
|
||||
|
||||
org = _discover_org('https://candlepin.example.com', 'test_user', 'test_pass', verify_tls=False)
|
||||
|
||||
assert org == 'test_org'
|
||||
# Should use False for verify when verify_tls=False
|
||||
mock_requests_get.assert_called_once_with(
|
||||
'https://candlepin.example.com/users/test_user/owners',
|
||||
auth=('test_user', 'test_pass'),
|
||||
verify=False,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.settings')
|
||||
def test_fetch_candlepin_cert_from_db(self, mock_settings):
|
||||
"""Test fetching Candlepin cert from conf_settings."""
|
||||
mock_settings.CANDLEPIN_CONSUMER_UUID = 'test-uuid'
|
||||
mock_settings.CANDLEPIN_CERT_PEM = 'cert-pem-data'
|
||||
mock_settings.CANDLEPIN_KEY_PEM = 'key-pem-data'
|
||||
|
||||
cert, key, uuid = _fetch_candlepin_cert_from_db()
|
||||
|
||||
assert cert == 'cert-pem-data'
|
||||
assert key == 'key-pem-data'
|
||||
assert uuid == 'test-uuid'
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._discover_org')
|
||||
@mock.patch('awx.main.utils.candlepin.settings')
|
||||
def test_fetch_registration_credentials_from_db(self, mock_settings, mock_discover_org):
|
||||
"""Test fetching registration credentials from settings.
|
||||
|
||||
When both REDHAT and SUBSCRIPTIONS credentials exist, REDHAT takes priority
|
||||
for both authentication and org discovery.
|
||||
"""
|
||||
mock_settings.REDHAT_USERNAME = 'test_user'
|
||||
mock_settings.REDHAT_PASSWORD = 'test_pass'
|
||||
mock_settings.INSTALL_UUID = 'test-install-uuid'
|
||||
mock_settings.SUBSCRIPTIONS_USERNAME = 'subs_user'
|
||||
mock_settings.SUBSCRIPTIONS_PASSWORD = 'subs_pass'
|
||||
mock_discover_org.return_value = 'test_org'
|
||||
|
||||
username, password, org, install_uuid = _fetch_registration_credentials_from_db()
|
||||
|
||||
assert username == 'test_user'
|
||||
assert password == 'test_pass'
|
||||
assert org == 'test_org'
|
||||
assert install_uuid == 'test-install-uuid'
|
||||
# Verify _discover_org was called with REDHAT credentials (takes priority)
|
||||
assert mock_discover_org.call_count == 1
|
||||
args = mock_discover_org.call_args[0]
|
||||
assert args[1] == 'test_user' # REDHAT_USERNAME (selected)
|
||||
assert args[2] == 'test_pass' # REDHAT_PASSWORD (selected)
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._discover_org')
|
||||
@mock.patch('awx.main.utils.candlepin.settings')
|
||||
def test_fetch_registration_credentials_no_verify_tls(self, mock_settings, mock_discover_org):
|
||||
"""Test fetching credentials passes verify_tls=False to _discover_org.
|
||||
|
||||
Also verifies that selected credentials (REDHAT in this case) are used for org discovery.
|
||||
"""
|
||||
mock_settings.REDHAT_USERNAME = 'test_user'
|
||||
mock_settings.REDHAT_PASSWORD = 'test_pass'
|
||||
mock_settings.INSTALL_UUID = 'test-install-uuid'
|
||||
mock_settings.SUBSCRIPTIONS_USERNAME = 'subs_user'
|
||||
mock_settings.SUBSCRIPTIONS_PASSWORD = 'subs_pass'
|
||||
mock_discover_org.return_value = 'test_org'
|
||||
|
||||
username, password, org, install_uuid = _fetch_registration_credentials_from_db(verify_tls=False)
|
||||
|
||||
assert username == 'test_user'
|
||||
assert password == 'test_pass'
|
||||
assert org == 'test_org'
|
||||
assert install_uuid == 'test-install-uuid'
|
||||
# Verify _discover_org was called with verify_tls=False and REDHAT credentials
|
||||
mock_discover_org.assert_called_once()
|
||||
call_args = mock_discover_org.call_args
|
||||
assert call_args[0][1] == 'test_user' # REDHAT_USERNAME (selected)
|
||||
assert call_args[0][2] == 'test_pass' # REDHAT_PASSWORD (selected)
|
||||
call_kwargs = call_args[1]
|
||||
assert call_kwargs['verify_tls'] is False
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db')
|
||||
def test_resolve_registration_credentials_no_overrides(self, mock_fetch):
|
||||
"""Test resolve_registration_credentials with no overrides."""
|
||||
mock_fetch.return_value = ('db_user', 'db_pass', 'db_org', 'install-uuid')
|
||||
|
||||
username, password, org, install_uuid, errors = resolve_registration_credentials()
|
||||
|
||||
assert username == 'db_user'
|
||||
assert password == 'db_pass'
|
||||
assert org == 'db_org'
|
||||
assert install_uuid == 'install-uuid'
|
||||
assert errors is None
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db')
|
||||
def test_resolve_registration_credentials_with_overrides(self, mock_fetch):
|
||||
"""Test resolve_registration_credentials with CLI overrides."""
|
||||
mock_fetch.return_value = ('db_user', 'db_pass', 'db_org', 'install-uuid')
|
||||
|
||||
username, password, org, install_uuid, errors = resolve_registration_credentials(
|
||||
username_override='cli_user', password_override='cli_pass', org_override='cli_org'
|
||||
)
|
||||
|
||||
assert username == 'cli_user'
|
||||
assert password == 'cli_pass'
|
||||
assert org == 'cli_org'
|
||||
assert install_uuid == 'install-uuid'
|
||||
assert errors is None
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db')
|
||||
def test_resolve_registration_credentials_verify_tls_false(self, mock_fetch):
|
||||
"""Test resolve_registration_credentials passes verify_tls=False to fetch function."""
|
||||
mock_fetch.return_value = ('db_user', 'db_pass', 'db_org', 'install-uuid')
|
||||
|
||||
username, password, org, install_uuid, errors = resolve_registration_credentials(verify_tls=False)
|
||||
|
||||
# Verify _fetch_registration_credentials_from_db was called with verify_tls=False
|
||||
mock_fetch.assert_called_once_with(verify_tls=False)
|
||||
assert username == 'db_user'
|
||||
assert password == 'db_pass'
|
||||
assert org == 'db_org'
|
||||
assert install_uuid == 'install-uuid'
|
||||
assert errors is None
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.parse_cert')
|
||||
@mock.patch('awx.main.utils.candlepin.settings')
|
||||
def test_save_candlepin_cert_to_db(self, mock_settings, mock_parse_cert):
|
||||
"""Test saving Candlepin cert to conf_settings."""
|
||||
mock_parse_cert.return_value = {
|
||||
'serial': '123456',
|
||||
'cn': 'test-consumer',
|
||||
'not_before': '2026-01-01T00:00:00+00:00',
|
||||
'not_after': '2027-01-01T00:00:00+00:00',
|
||||
'days_remaining': 365,
|
||||
}
|
||||
|
||||
result = _save_candlepin_cert_to_db('new-cert', 'new-key')
|
||||
|
||||
assert result is True
|
||||
# Verify settings were assigned
|
||||
assert mock_settings.CANDLEPIN_CERT_PEM == 'new-cert'
|
||||
assert mock_settings.CANDLEPIN_KEY_PEM == 'new-key'
|
||||
assert mock_settings.CANDLEPIN_SERIAL_NUMBER == '123456'
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.parse_cert')
|
||||
@mock.patch('awx.main.utils.candlepin.settings')
|
||||
def test_save_candlepin_registration_to_db(self, mock_settings, mock_parse_cert):
|
||||
"""Test saving Candlepin registration to conf_settings."""
|
||||
mock_parse_cert.return_value = {
|
||||
'serial': '789012',
|
||||
'cn': 'test-consumer',
|
||||
'not_before': '2026-01-01T00:00:00+00:00',
|
||||
'not_after': '2027-01-01T00:00:00+00:00',
|
||||
'days_remaining': 365,
|
||||
}
|
||||
|
||||
result = _save_candlepin_registration_to_db('cert', 'key', 'uuid')
|
||||
|
||||
assert result is True
|
||||
# Verify all registration data was saved
|
||||
assert mock_settings.CANDLEPIN_CONSUMER_UUID == 'uuid'
|
||||
assert mock_settings.CANDLEPIN_CERT_PEM == 'cert'
|
||||
assert mock_settings.CANDLEPIN_KEY_PEM == 'key'
|
||||
assert mock_settings.CANDLEPIN_SERIAL_NUMBER == '789012'
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._save_candlepin_registration_to_db')
|
||||
@mock.patch('awx.main.utils.candlepin.CandlepinClient')
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db')
|
||||
@mock.patch('awx.main.utils.candlepin.get_proxy_url')
|
||||
@mock.patch('awx.main.utils.candlepin.get_candlepin_ca')
|
||||
@mock.patch('awx.main.utils.candlepin.get_candlepin_url')
|
||||
def test_register_candlepin_consumer_success(self, mock_get_url, mock_get_ca, mock_get_proxy, mock_fetch_creds, mock_client_class, mock_save):
|
||||
"""Test successful Candlepin consumer registration."""
|
||||
mock_get_url.return_value = 'https://candlepin.example.com'
|
||||
mock_get_ca.return_value = '/path/to/ca.pem'
|
||||
mock_get_proxy.return_value = None
|
||||
mock_fetch_creds.return_value = ('user', 'pass', 'org', 'install-uuid')
|
||||
mock_save.return_value = True
|
||||
|
||||
mock_client = mock.Mock()
|
||||
mock_client.register_consumer.return_value = ('cert', 'key', 'uuid')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
cert, key, uuid = _register_candlepin_consumer()
|
||||
|
||||
assert cert == 'cert'
|
||||
assert key == 'key'
|
||||
assert uuid == 'uuid'
|
||||
mock_save.assert_called_once_with('cert', 'key', 'uuid')
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_registration_credentials_from_db')
|
||||
def test_register_candlepin_consumer_missing_credentials(self, mock_fetch_creds):
|
||||
"""Test registration fails when credentials are missing."""
|
||||
mock_fetch_creds.return_value = (None, None, None, None)
|
||||
|
||||
cert, key, uuid = _register_candlepin_consumer()
|
||||
|
||||
assert cert is None
|
||||
assert key is None
|
||||
assert uuid is None
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._save_candlepin_cert_to_db')
|
||||
@mock.patch('awx.main.utils.candlepin.run_candlepin_lifecycle')
|
||||
@mock.patch('awx.main.utils.candlepin.get_proxy_url')
|
||||
@mock.patch('awx.main.utils.candlepin.get_candlepin_ca')
|
||||
@mock.patch('awx.main.utils.candlepin.get_renewal_days')
|
||||
@mock.patch('awx.main.utils.candlepin.get_candlepin_url')
|
||||
def test_run_candlepin_lifecycle_with_renewal(self, mock_get_url, mock_get_days, mock_get_ca, mock_get_proxy, mock_lifecycle, mock_save):
|
||||
"""Test lifecycle with certificate renewal."""
|
||||
mock_get_url.return_value = 'https://candlepin.example.com'
|
||||
mock_get_days.return_value = 90
|
||||
mock_get_ca.return_value = '/path/to/ca.pem'
|
||||
mock_get_proxy.return_value = None
|
||||
mock_lifecycle.return_value = ('new-cert', 'new-key')
|
||||
mock_save.return_value = True
|
||||
|
||||
cert, key = _run_candlepin_lifecycle('old-cert', 'old-key', 'real-uuid')
|
||||
|
||||
assert cert == 'new-cert'
|
||||
assert key == 'new-key'
|
||||
mock_lifecycle.assert_called_once()
|
||||
mock_save.assert_called_once_with('new-cert', 'new-key')
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.is_cert_valid')
|
||||
@mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle')
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db')
|
||||
def test_get_or_generate_candlepin_certificate_existing_valid(self, mock_fetch, mock_lifecycle, mock_is_valid):
|
||||
"""Test get_or_generate with existing valid certificate."""
|
||||
mock_fetch.return_value = ('cert-pem', 'key-pem', 'consumer-uuid')
|
||||
mock_lifecycle.return_value = ('cert-pem', 'key-pem')
|
||||
mock_is_valid.return_value = True
|
||||
|
||||
cert, key = get_or_generate_candlepin_certificate()
|
||||
|
||||
assert cert == 'cert-pem'
|
||||
assert key == 'key-pem'
|
||||
mock_lifecycle.assert_called_once_with('cert-pem', 'key-pem', 'consumer-uuid')
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.is_cert_valid')
|
||||
@mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle')
|
||||
@mock.patch('awx.main.utils.candlepin._register_candlepin_consumer')
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db')
|
||||
def test_get_or_generate_candlepin_certificate_register_new(self, mock_fetch, mock_register, mock_lifecycle, mock_is_valid):
|
||||
"""Test get_or_generate when no certificate exists - registers new."""
|
||||
mock_fetch.return_value = (None, None, None)
|
||||
mock_register.return_value = ('new-cert', 'new-key', 'new-uuid')
|
||||
mock_lifecycle.return_value = ('new-cert', 'new-key')
|
||||
mock_is_valid.return_value = True
|
||||
|
||||
cert, key = get_or_generate_candlepin_certificate()
|
||||
|
||||
assert cert == 'new-cert'
|
||||
assert key == 'new-key'
|
||||
mock_register.assert_called_once()
|
||||
mock_lifecycle.assert_called_once_with('new-cert', 'new-key', 'new-uuid')
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin._register_candlepin_consumer')
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db')
|
||||
def test_get_or_generate_candlepin_certificate_registration_fails(self, mock_fetch, mock_register):
|
||||
"""Test get_or_generate when registration fails."""
|
||||
mock_fetch.return_value = (None, None, None)
|
||||
mock_register.return_value = (None, None, None)
|
||||
|
||||
cert, key = get_or_generate_candlepin_certificate()
|
||||
|
||||
assert cert is None
|
||||
assert key is None
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.is_cert_valid')
|
||||
@mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle')
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db')
|
||||
def test_get_or_generate_candlepin_certificate_invalid_cert(self, mock_fetch, mock_lifecycle, mock_is_valid):
|
||||
"""Test get_or_generate when certificate is invalid."""
|
||||
mock_fetch.return_value = ('cert-pem', 'key-pem', 'consumer-uuid')
|
||||
mock_lifecycle.return_value = ('cert-pem', 'key-pem')
|
||||
mock_is_valid.return_value = False
|
||||
|
||||
cert, key = get_or_generate_candlepin_certificate()
|
||||
|
||||
assert cert is None
|
||||
assert key is None
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.is_cert_valid')
|
||||
@mock.patch('awx.main.utils.candlepin._run_candlepin_lifecycle')
|
||||
@mock.patch('awx.main.utils.candlepin._fetch_candlepin_cert_from_db')
|
||||
def test_get_or_generate_candlepin_certificate_expired_cert_renewed_successfully(self, mock_fetch, mock_lifecycle, mock_is_valid):
|
||||
"""Test get_or_generate with expired certificate that is successfully renewed."""
|
||||
mock_fetch.return_value = ('expired-cert', 'old-key', 'consumer-uuid')
|
||||
# Lifecycle successfully renews
|
||||
mock_lifecycle.return_value = ('new-cert', 'new-key')
|
||||
# New certificate is valid
|
||||
mock_is_valid.return_value = True
|
||||
|
||||
cert, key = get_or_generate_candlepin_certificate()
|
||||
|
||||
assert cert == 'new-cert'
|
||||
assert key == 'new-key'
|
||||
mock_lifecycle.assert_called_once_with('expired-cert', 'old-key', 'consumer-uuid')
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.parse_cert')
|
||||
@mock.patch('awx.main.utils.candlepin.settings')
|
||||
def test_save_candlepin_registration_to_db_cert_parse_failure(self, mock_settings, mock_parse_cert):
|
||||
"""Test _save_candlepin_registration_to_db handles cert parsing failure gracefully."""
|
||||
# Cert parsing fails
|
||||
mock_parse_cert.side_effect = ValueError('Invalid certificate format')
|
||||
|
||||
result = _save_candlepin_registration_to_db('invalid-cert', 'key-pem', 'consumer-uuid')
|
||||
|
||||
# Should still save registration even if parsing fails
|
||||
assert result is True
|
||||
# Verify UUID, cert, key, and serial (empty string) were saved
|
||||
assert mock_settings.CANDLEPIN_CONSUMER_UUID == 'consumer-uuid'
|
||||
assert mock_settings.CANDLEPIN_CERT_PEM == 'invalid-cert'
|
||||
assert mock_settings.CANDLEPIN_KEY_PEM == 'key-pem'
|
||||
assert mock_settings.CANDLEPIN_SERIAL_NUMBER == ''
|
||||
124
awx/main/tests/unit/utils/test_candlepin_client.py
Normal file
124
awx/main/tests/unit/utils/test_candlepin_client.py
Normal file
@@ -0,0 +1,124 @@
|
||||
# Copyright (c) 2026 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
from awx.main.utils.candlepin.client import CandlepinClient, _temp_cert_files
|
||||
|
||||
|
||||
class TestCandlepinClient:
|
||||
"""Tests for CandlepinClient."""
|
||||
|
||||
def test_base_url_required(self):
|
||||
"""Test base_url parameter is required."""
|
||||
client = CandlepinClient(base_url='https://subscription.example.com/candlepin')
|
||||
assert client.base_url == 'https://subscription.example.com/candlepin'
|
||||
|
||||
def test_verify_tls_enabled_by_default(self):
|
||||
"""Test TLS verification is enabled by default."""
|
||||
client = CandlepinClient(base_url='https://test.example.com')
|
||||
assert client.verify is True
|
||||
|
||||
def test_verify_tls_with_ca(self):
|
||||
"""Test TLS verification with custom CA."""
|
||||
client = CandlepinClient(base_url='https://test.example.com', candlepin_ca='/path/to/ca.pem')
|
||||
assert client.verify == '/path/to/ca.pem'
|
||||
|
||||
def test_proxy_configuration(self):
|
||||
"""Test proxy configuration."""
|
||||
client = CandlepinClient(base_url='https://test.example.com', proxy='http://proxy.example.com:8080')
|
||||
assert client.proxies == {'https': 'http://proxy.example.com:8080', 'http': 'http://proxy.example.com:8080'}
|
||||
|
||||
def test_temp_cert_files_cleanup(self):
|
||||
"""Test temporary certificate files are created and cleaned up."""
|
||||
cert_pem = '-----BEGIN CERTIFICATE-----\ntest_cert\n-----END CERTIFICATE-----'
|
||||
key_pem = '-----BEGIN PRIVATE KEY-----\ntest_key\n-----END PRIVATE KEY-----'
|
||||
|
||||
with _temp_cert_files(cert_pem, key_pem) as (cert_path, key_path):
|
||||
assert os.path.exists(cert_path)
|
||||
assert os.path.exists(key_path)
|
||||
# Verify file permissions
|
||||
cert_stat = os.stat(cert_path)
|
||||
assert oct(cert_stat.st_mode)[-3:] == '600'
|
||||
|
||||
# Verify cleanup
|
||||
assert not os.path.exists(cert_path)
|
||||
assert not os.path.exists(key_path)
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.client.requests.post')
|
||||
def test_register_consumer_success(self, mock_post):
|
||||
"""Test successful consumer registration."""
|
||||
mock_response = mock.Mock()
|
||||
mock_response.ok = True
|
||||
mock_response.json.return_value = {
|
||||
'uuid': 'test-consumer-uuid',
|
||||
'idCert': {
|
||||
'cert': '-----BEGIN CERTIFICATE-----\ncert_data\n-----END CERTIFICATE-----',
|
||||
'key': '-----BEGIN PRIVATE KEY-----\nkey_data\n-----END PRIVATE KEY-----',
|
||||
},
|
||||
}
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
client = CandlepinClient(base_url='https://test.example.com')
|
||||
cert_pem, key_pem, consumer_uuid = client.register_consumer('test_user', 'test_pass', 'test_org', install_uuid='test-install-uuid')
|
||||
|
||||
assert consumer_uuid == 'test-consumer-uuid'
|
||||
assert '-----BEGIN CERTIFICATE-----' in cert_pem
|
||||
assert '-----BEGIN PRIVATE KEY-----' in key_pem
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.client.requests.put')
|
||||
def test_checkin_success(self, mock_put):
|
||||
"""Test successful check-in."""
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_put.return_value = mock_response
|
||||
|
||||
client = CandlepinClient(base_url='https://test.example.com')
|
||||
cert_pem = '-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----'
|
||||
key_pem = '-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----'
|
||||
|
||||
result = client.checkin('test-uuid', cert_pem, key_pem)
|
||||
assert result is True
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.client.requests.get')
|
||||
def test_get_consumer_success(self, mock_get):
|
||||
"""Test successful consumer retrieval."""
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
'uuid': 'test-consumer-uuid',
|
||||
'name': 'aap-12345678',
|
||||
'idCert': {'cert': '-----BEGIN CERTIFICATE-----\nserver_cert\n-----END CERTIFICATE-----', 'serial': {'serial': 123456789}},
|
||||
}
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
client = CandlepinClient(base_url='https://test.example.com')
|
||||
cert_pem = '-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----'
|
||||
key_pem = '-----BEGIN PRIVATE KEY-----\ntest\n-----END PRIVATE KEY-----'
|
||||
|
||||
result = client.get_consumer('test-uuid', cert_pem, key_pem)
|
||||
assert result is not None
|
||||
assert result['uuid'] == 'test-consumer-uuid'
|
||||
assert 'idCert' in result
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.client.requests.post')
|
||||
def test_regenerate_cert_success(self, mock_post):
|
||||
"""Test successful certificate regeneration."""
|
||||
mock_response = mock.Mock()
|
||||
mock_response.ok = True
|
||||
mock_response.json.return_value = {
|
||||
'idCert': {
|
||||
'cert': '-----BEGIN CERTIFICATE-----\nnew_cert\n-----END CERTIFICATE-----',
|
||||
'key': '-----BEGIN PRIVATE KEY-----\nnew_key\n-----END PRIVATE KEY-----',
|
||||
}
|
||||
}
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
client = CandlepinClient(base_url='https://test.example.com')
|
||||
old_cert = '-----BEGIN CERTIFICATE-----\nold\n-----END CERTIFICATE-----'
|
||||
old_key = '-----BEGIN PRIVATE KEY-----\nold\n-----END PRIVATE KEY-----'
|
||||
|
||||
new_cert, new_key = client.regenerate_cert('test-uuid', old_cert, old_key)
|
||||
assert 'new_cert' in new_cert
|
||||
assert 'new_key' in new_key
|
||||
222
awx/main/tests/unit/utils/test_candlepin_lifecycle.py
Normal file
222
awx/main/tests/unit/utils/test_candlepin_lifecycle.py
Normal file
@@ -0,0 +1,222 @@
|
||||
# Copyright (c) 2026 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from unittest import mock
|
||||
|
||||
from awx.main.utils.candlepin.lifecycle import (
|
||||
parse_cert,
|
||||
needs_renewal,
|
||||
run_candlepin_lifecycle,
|
||||
get_candlepin_url,
|
||||
get_renewal_days,
|
||||
get_candlepin_ca,
|
||||
get_proxy_url,
|
||||
)
|
||||
|
||||
# Sample test certificate (expires far in the future for testing)
|
||||
SAMPLE_CERT_PEM = """-----BEGIN CERTIFICATE-----
|
||||
MIIDXTCCAkWgAwIBAgIJAKJ5VZ2cPQE5MA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
|
||||
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
|
||||
aWRnaXRzIFB0eSBMdGQwHhcNMjYwMTAxMDAwMDAwWhcNMjcwMTAxMDAwMDAwWjBF
|
||||
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
|
||||
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
|
||||
CgKCAQEA0a7Y3l3X4L7pKq3xDl8vCRrRK6qU5dF7r3xQH5YRz4hZJN9wE3xW0qDT
|
||||
-----END CERTIFICATE-----"""
|
||||
|
||||
|
||||
class TestCandlepinLifecycle:
|
||||
"""Tests for Candlepin lifecycle functions."""
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.settings')
|
||||
def test_get_candlepin_url_default(self, mock_settings):
|
||||
"""Test default Candlepin URL from defaults.py."""
|
||||
mock_settings.AWX_ANALYTICS_CANDLEPIN_URL = 'https://subscription.example.com/candlepin/'
|
||||
url = get_candlepin_url()
|
||||
assert url == 'https://subscription.example.com/candlepin/'
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.settings')
|
||||
def test_get_renewal_days_from_settings(self, mock_settings):
|
||||
"""Test renewal days from Django settings."""
|
||||
mock_settings.AWX_ANALYTICS_CANDLEPIN_RENEWAL_THRESHOLD_DAYS = 45
|
||||
days = get_renewal_days()
|
||||
assert days == 45
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.os.path.isfile')
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.settings')
|
||||
def test_get_candlepin_ca_from_settings(self, mock_settings, mock_isfile):
|
||||
"""Test Candlepin CA from Django settings when file exists."""
|
||||
mock_settings.AWX_ANALYTICS_CANDLEPIN_CA = '/path/to/ca.pem'
|
||||
mock_isfile.return_value = True
|
||||
ca = get_candlepin_ca()
|
||||
assert ca == '/path/to/ca.pem'
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.os.path.isfile')
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.settings')
|
||||
def test_get_candlepin_ca_file_not_found(self, mock_settings, mock_isfile):
|
||||
"""Test Candlepin CA returns None when configured path doesn't exist."""
|
||||
mock_settings.AWX_ANALYTICS_CANDLEPIN_CA = '/path/to/missing.pem'
|
||||
mock_isfile.return_value = False
|
||||
ca = get_candlepin_ca()
|
||||
assert ca is None
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.settings')
|
||||
def test_get_proxy_url_from_settings(self, mock_settings):
|
||||
"""Test proxy URL from Django settings."""
|
||||
mock_settings.AWX_ANALYTICS_CANDLEPIN_PROXY_URL = 'http://proxy.example.com:8080'
|
||||
proxy = get_proxy_url()
|
||||
assert proxy == 'http://proxy.example.com:8080'
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.x509.load_pem_x509_certificate')
|
||||
def test_parse_cert(self, mock_load_cert):
|
||||
"""Test certificate parsing."""
|
||||
# Mock a certificate object
|
||||
mock_cert = mock.Mock()
|
||||
mock_cert.serial_number = 123456
|
||||
mock_cert.not_valid_before_utc = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
mock_cert.not_valid_after_utc = datetime(2027, 1, 1, tzinfo=timezone.utc)
|
||||
|
||||
# Mock subject and issuer
|
||||
mock_attr = mock.Mock()
|
||||
mock_attr.oid._name = 'commonName'
|
||||
mock_attr.value = 'test-cn'
|
||||
mock_cert.subject = [mock_attr]
|
||||
mock_cert.issuer = [mock_attr]
|
||||
|
||||
mock_load_cert.return_value = mock_cert
|
||||
|
||||
result = parse_cert('fake-pem')
|
||||
|
||||
assert result['serial'] == '123456'
|
||||
assert result['cn'] == 'test-cn'
|
||||
assert 'not_before' in result
|
||||
assert 'not_after' in result
|
||||
assert 'days_remaining' in result
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert')
|
||||
def test_needs_renewal_true(self, mock_parse):
|
||||
"""Test needs_renewal returns True when cert is expiring soon."""
|
||||
mock_parse.return_value = {'days_remaining': 10}
|
||||
|
||||
result = needs_renewal('fake-cert', days_before_expiry=30)
|
||||
assert result is True
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert')
|
||||
def test_needs_renewal_false(self, mock_parse):
|
||||
"""Test needs_renewal returns False when cert has time remaining."""
|
||||
mock_parse.return_value = {'days_remaining': 100}
|
||||
|
||||
result = needs_renewal('fake-cert', days_before_expiry=30)
|
||||
assert result is False
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient')
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert')
|
||||
def test_run_candlepin_lifecycle_no_renewal_needed(self, mock_parse, mock_client_class):
|
||||
"""Test lifecycle when no renewal is needed."""
|
||||
mock_parse.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01T00:00:00+00:00', 'days_remaining': 100}
|
||||
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = True
|
||||
mock_client.get_consumer.return_value = None # Skip serial comparison
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
cert_pem, key_pem = run_candlepin_lifecycle('cert-pem', 'key-pem', 'consumer-uuid', candlepin_url='https://test.example.com', renewal_days=30)
|
||||
|
||||
assert cert_pem == 'cert-pem'
|
||||
assert key_pem == 'key-pem'
|
||||
mock_client.checkin.assert_called_once()
|
||||
mock_client.regenerate_cert.assert_not_called()
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient')
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert')
|
||||
def test_run_candlepin_lifecycle_with_renewal(self, mock_parse, mock_client_class):
|
||||
"""Test lifecycle when renewal is needed."""
|
||||
# parse_cert is called multiple times:
|
||||
# 1. Parse original cert
|
||||
# 2. In needs_renewal() to check expiry
|
||||
# 3. Parse new cert after renewal for logging
|
||||
mock_parse.side_effect = [
|
||||
{'serial': '123', 'cn': 'test', 'not_after': '2026-02-01', 'days_remaining': 10}, # Original cert
|
||||
{'serial': '123', 'cn': 'test', 'not_after': '2026-02-01', 'days_remaining': 10}, # needs_renewal check
|
||||
{'serial': '456', 'cn': 'test', 'not_after': '2027-02-01', 'days_remaining': 365}, # New cert
|
||||
]
|
||||
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = True
|
||||
mock_client.get_consumer.return_value = None # Skip serial comparison
|
||||
mock_client.regenerate_cert.return_value = ('new-cert', 'new-key')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
cert_pem, key_pem = run_candlepin_lifecycle('old-cert', 'old-key', 'consumer-uuid', renewal_days=90)
|
||||
|
||||
assert cert_pem == 'new-cert'
|
||||
assert key_pem == 'new-key'
|
||||
mock_client.regenerate_cert.assert_called_once()
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient')
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert')
|
||||
def test_run_candlepin_lifecycle_expired_cert_renewal(self, mock_parse, mock_client_class):
|
||||
"""Test lifecycle renews an expired certificate."""
|
||||
# parse_cert called for:
|
||||
# 1. Parse original expired cert
|
||||
# 2. needs_renewal check (expired, so returns True)
|
||||
# 3. Parse new cert after renewal
|
||||
mock_parse.side_effect = [
|
||||
{'serial': '123', 'cn': 'test', 'not_after': '2025-12-31', 'days_remaining': -120}, # Expired cert
|
||||
{'serial': '123', 'cn': 'test', 'not_after': '2025-12-31', 'days_remaining': -120}, # needs_renewal
|
||||
{'serial': '456', 'cn': 'test', 'not_after': '2027-06-01', 'days_remaining': 365}, # New cert
|
||||
]
|
||||
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = True
|
||||
mock_client.get_consumer.return_value = None
|
||||
mock_client.regenerate_cert.return_value = ('new-cert', 'new-key')
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
cert_pem, key_pem = run_candlepin_lifecycle('expired-cert', 'old-key', 'consumer-uuid', renewal_days=90)
|
||||
|
||||
assert cert_pem == 'new-cert'
|
||||
assert key_pem == 'new-key'
|
||||
mock_client.regenerate_cert.assert_called_once()
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient')
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert')
|
||||
def test_run_candlepin_lifecycle_checkin_failure_revoked_cert(self, mock_parse, mock_client_class):
|
||||
"""Test lifecycle handles check-in failure (e.g., revoked certificate)."""
|
||||
mock_parse.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 100}
|
||||
|
||||
# Check-in fails (could indicate revoked cert or deleted consumer)
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = False
|
||||
mock_client.get_consumer.return_value = None # get_consumer also fails
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
# Lifecycle should continue and return original cert
|
||||
cert_pem, key_pem = run_candlepin_lifecycle('cert-pem', 'key-pem', 'consumer-uuid', renewal_days=30)
|
||||
|
||||
assert cert_pem == 'cert-pem'
|
||||
assert key_pem == 'key-pem'
|
||||
mock_client.checkin.assert_called_once()
|
||||
# Regeneration should not be attempted since get_consumer indicates consumer doesn't exist
|
||||
mock_client.regenerate_cert.assert_not_called()
|
||||
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.CandlepinClient')
|
||||
@mock.patch('awx.main.utils.candlepin.lifecycle.parse_cert')
|
||||
def test_run_candlepin_lifecycle_consumer_deleted_server_side(self, mock_parse, mock_client_class):
|
||||
"""Test lifecycle detects when consumer was deleted from Candlepin server."""
|
||||
mock_parse.return_value = {'serial': '123', 'cn': 'test', 'not_after': '2027-01-01', 'days_remaining': 100}
|
||||
|
||||
# Both check-in and get_consumer fail (consumer deleted)
|
||||
mock_client = mock.Mock()
|
||||
mock_client.checkin.return_value = False
|
||||
mock_client.get_consumer.return_value = None
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
cert_pem, key_pem = run_candlepin_lifecycle('cert-pem', 'key-pem', 'consumer-uuid', renewal_days=30)
|
||||
|
||||
# Should return original cert (caller can attempt mTLS, which will fail and fall back to service account)
|
||||
assert cert_pem == 'cert-pem'
|
||||
assert key_pem == 'key-pem'
|
||||
mock_client.checkin.assert_called_once()
|
||||
mock_client.get_consumer.assert_called_once()
|
||||
mock_client.regenerate_cert.assert_not_called()
|
||||
Reference in New Issue
Block a user