AAP-58452 Add version fallback for external query files (#16309)

Forward port of 5c4653fbbca4b542fea05f070efea0396b034839 from stable-2.6.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Lila Yasin
2026-02-27 15:08:27 -05:00
committed by GitHub
parent e1e2c60f2e
commit 0f2692b504

View File

@@ -21,8 +21,11 @@ DOCUMENTATION = '''
import os
import json
import re
from importlib.resources import files
from packaging.version import Version, InvalidVersion
from ansible.plugins.callback import CallbackBase
# NOTE: in Ansible 1.2 or later general logging is available without
@@ -52,6 +55,91 @@ def list_collections(artifacts_manager=None):
return collections
# External query path constants
EXTERNAL_QUERY_COLLECTION = 'ansible_collections.redhat.indirect_accounting'
EXTERNAL_QUERY_PATH = 'extensions/audit/external_queries'
def list_external_queries(namespace, name):
"""List all available external query versions for a collection.
Returns a list of Version objects for all available query files
matching the namespace.name pattern.
"""
versions = []
try:
queries_dir = files(EXTERNAL_QUERY_COLLECTION) / 'extensions' / 'audit' / 'external_queries'
except ModuleNotFoundError:
return versions
# Pattern: namespace.name.X.Y.Z.yml where X.Y.Z is the version
pattern = re.compile(rf'^{re.escape(namespace)}\.{re.escape(name)}\.(.+)\.yml$')
for query_file in queries_dir.iterdir():
match = pattern.match(query_file.name)
if match:
version_str = match.group(1)
try:
versions.append(Version(version_str))
except InvalidVersion:
# Skip files with invalid version strings
pass
return versions
def find_external_query_with_fallback(namespace, name, installed_version, display=None):
"""Find external query file with semantic version fallback.
Args:
namespace: Collection namespace (e.g., 'community')
name: Collection name (e.g., 'vmware')
installed_version: Version string of installed collection (e.g., '4.5.0')
display: Ansible display object for logging
Returns:
Tuple of (query_content, fallback_used, fallback_version) or (None, False, None)
- query_content: The query file content if found
- fallback_used: True if a fallback version was used instead of exact match
- fallback_version: The version string used (for logging)
"""
try:
installed_version_object = Version(installed_version)
except InvalidVersion:
# Invalid version string - can't do version comparison
return None, False, None
try:
queries_dir = files(EXTERNAL_QUERY_COLLECTION) / 'extensions' / 'audit' / 'external_queries'
except ModuleNotFoundError:
return None, False, None
# 1. Try exact version match first (AC5.2)
exact_file = queries_dir / f'{namespace}.{name}.{installed_version}.yml'
if exact_file.exists():
with exact_file.open('r') as f:
return f.read(), False, installed_version
# 2. Find compatible fallback (same major version, nearest lower version)
available_versions = list_external_queries(namespace, name)
if not available_versions:
return None, False, None
# Filter to same major version and versions <= installed version (AC5.3, AC5.5)
compatible_versions = [v for v in available_versions if v.major == installed_version_object.major and v <= installed_version_object]
if not compatible_versions:
# No compatible fallback exists (AC5.7)
return None, False, None
# Select nearest lower version - highest compatible version (AC5.4)
fallback_version_object = max(compatible_versions)
fallback_version_str = str(fallback_version_object)
fallback_file = queries_dir / f'{namespace}.{name}.{fallback_version_str}.yml'
if fallback_file.exists():
with fallback_file.open('r') as f:
return f.read(), True, fallback_version_str
return None, False, None
class CallbackModule(CallbackBase):
"""
logs playbook results, per host, in /var/log/ansible/hosts
@@ -81,6 +169,17 @@ class CallbackModule(CallbackBase):
if query_file.exists():
with query_file.open('r') as f:
collection_print['host_query'] = f.read()
self._display.vv(f"Using embedded query for {candidate.fqcn} v{candidate.ver}")
else:
# 2. Check for external query file with version fallback
query_content, fallback_used, version_used = find_external_query_with_fallback(candidate.namespace, candidate.name, candidate.ver)
if query_content:
collection_print['host_query'] = query_content
if fallback_used:
# AC5.6: Log when fallback is used
self._display.v(f"Using external query {version_used} for {candidate.fqcn} v{candidate.ver}.")
else:
self._display.v(f"Using external query for {candidate.fqcn} v{candidate.ver}")
collections_print[candidate.fqcn] = collection_print