Compare commits

...

4 Commits

Author SHA1 Message Date
Andrea Restle-Lay
8033b7cbe9 Fix Redis broken pipe error in queue.py with reconnection logic and enhanced diagnostics for long-running jobs 2025-10-31 16:12:53 -04:00
Seth Foster
95289ff28c Update subscription API to use service accounts
Update code to pull subscriptions from
console.redhat.com instead of
subscription.rhsm.redhat.com

Uses service account client ID and client secret
instead of username/password, which is being
deprecated in July 2025.

Additional changes:

- In awx.awx.subscriptions module, use new service
account params rather than old basic auth params

- Update awx.awx.license module to use subscription_id
instead of pool_id. This is due to using a different API,
which identifies unique subscriptions by subscriptionID
instead of pool ID.

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
Co-authored-by: Chris Meyers <chris.meyers.fsu@gmail.com>
Co-authored-by: Peter Braun <pbraun@redhat.com>
2025-04-30 15:44:38 -04:00
Chris Meyers
000f6b0708 Allow ipv6 address for TOWER_URL_BASE setting
* Django url validators support ipv6, our custom URLField
  allow_plain_hostname feature was messing with the hostname before it
  was passed to Django validators.
* This change dodges the allow_plan_hostname transformations if the
  value looks like it's an ipv6 one.
2025-04-28 10:14:39 -04:00
Peter Braun
c799d51ec8 fix: keep processing events, even if previous event data cannot be pa… (#15965)
* fix: keep processing events, even if previous event data cannot be parsed

* change log level to warning
2025-04-28 13:21:33 +00:00
17 changed files with 396 additions and 156 deletions

View File

@@ -10,7 +10,7 @@ from awx.api.generics import APIView, Response
from awx.api.permissions import AnalyticsPermission from awx.api.permissions import AnalyticsPermission
from awx.api.versioning import reverse from awx.api.versioning import reverse
from awx.main.utils import get_awx_version from awx.main.utils import get_awx_version
from awx.main.utils.analytics_proxy import OIDCClient, DEFAULT_OIDC_TOKEN_ENDPOINT from awx.main.utils.analytics_proxy import OIDCClient
from rest_framework import status from rest_framework import status
from collections import OrderedDict from collections import OrderedDict
@@ -205,7 +205,7 @@ class AnalyticsGenericView(APIView):
try: try:
rh_user = self._get_setting('REDHAT_USERNAME', None, ERROR_MISSING_USER) rh_user = self._get_setting('REDHAT_USERNAME', None, ERROR_MISSING_USER)
rh_password = self._get_setting('REDHAT_PASSWORD', None, ERROR_MISSING_PASSWORD) rh_password = self._get_setting('REDHAT_PASSWORD', None, ERROR_MISSING_PASSWORD)
client = OIDCClient(rh_user, rh_password, DEFAULT_OIDC_TOKEN_ENDPOINT, ['api.console']) client = OIDCClient(rh_user, rh_password)
response = client.make_request( response = client.make_request(
method, method,
url, url,
@@ -219,8 +219,8 @@ class AnalyticsGenericView(APIView):
logger.error("Automation Analytics API request failed, trying base auth method") logger.error("Automation Analytics API request failed, trying base auth method")
response = self._base_auth_request(request, method, url, rh_user, rh_password, headers) response = self._base_auth_request(request, method, url, rh_user, rh_password, headers)
except MissingSettings: except MissingSettings:
rh_user = self._get_setting('SUBSCRIPTIONS_USERNAME', None, ERROR_MISSING_USER) rh_user = self._get_setting('SUBSCRIPTIONS_CLIENT_ID', None, ERROR_MISSING_USER)
rh_password = self._get_setting('SUBSCRIPTIONS_PASSWORD', None, ERROR_MISSING_PASSWORD) rh_password = self._get_setting('SUBSCRIPTIONS_CLIENT_SECRET', None, ERROR_MISSING_PASSWORD)
response = self._base_auth_request(request, method, url, rh_user, rh_password, headers) response = self._base_auth_request(request, method, url, rh_user, rh_password, headers)
# #
# Missing or wrong user/pass # Missing or wrong user/pass

View File

@@ -32,6 +32,7 @@ from awx.api.versioning import URLPathVersioning, reverse, drf_reverse
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS from awx.main.constants import PRIVILEGE_ESCALATION_METHODS
from awx.main.models import Project, Organization, Instance, InstanceGroup, JobTemplate from awx.main.models import Project, Organization, Instance, InstanceGroup, JobTemplate
from awx.main.utils import set_environ from awx.main.utils import set_environ
from awx.main.utils.analytics_proxy import TokenError
from awx.main.utils.licensing import get_licenser from awx.main.utils.licensing import get_licenser
logger = logging.getLogger('awx.api.views.root') logger = logging.getLogger('awx.api.views.root')
@@ -176,19 +177,21 @@ class ApiV2SubscriptionView(APIView):
def post(self, request): def post(self, request):
data = request.data.copy() data = request.data.copy()
if data.get('subscriptions_password') == '$encrypted$': if data.get('subscriptions_client_secret') == '$encrypted$':
data['subscriptions_password'] = settings.SUBSCRIPTIONS_PASSWORD data['subscriptions_client_secret'] = settings.SUBSCRIPTIONS_CLIENT_SECRET
try: try:
user, pw = data.get('subscriptions_username'), data.get('subscriptions_password') user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret')
with set_environ(**settings.AWX_TASK_ENV): with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw) validated = get_licenser().validate_rh(user, pw)
if user: if user:
settings.SUBSCRIPTIONS_USERNAME = data['subscriptions_username'] settings.SUBSCRIPTIONS_CLIENT_ID = data['subscriptions_client_id']
if pw: if pw:
settings.SUBSCRIPTIONS_PASSWORD = data['subscriptions_password'] settings.SUBSCRIPTIONS_CLIENT_SECRET = data['subscriptions_client_secret']
except Exception as exc: except Exception as exc:
msg = _("Invalid Subscription") msg = _("Invalid Subscription")
if isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401: if isinstance(exc, TokenError) or (
isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401
):
msg = _("The provided credentials are invalid (HTTP 401).") msg = _("The provided credentials are invalid (HTTP 401).")
elif isinstance(exc, requests.exceptions.ProxyError): elif isinstance(exc, requests.exceptions.ProxyError):
msg = _("Unable to connect to proxy server.") msg = _("Unable to connect to proxy server.")
@@ -215,12 +218,12 @@ class ApiV2AttachView(APIView):
def post(self, request): def post(self, request):
data = request.data.copy() data = request.data.copy()
pool_id = data.get('pool_id', None) subscription_id = data.get('subscription_id', None)
if not pool_id: if not subscription_id:
return Response({"error": _("No subscription pool ID provided.")}, status=status.HTTP_400_BAD_REQUEST) return Response({"error": _("No subscription ID provided.")}, status=status.HTTP_400_BAD_REQUEST)
user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None) user = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None)
pw = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None) pw = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None)
if pool_id and user and pw: if subscription_id and user and pw:
data = request.data.copy() data = request.data.copy()
try: try:
with set_environ(**settings.AWX_TASK_ENV): with set_environ(**settings.AWX_TASK_ENV):
@@ -239,7 +242,7 @@ class ApiV2AttachView(APIView):
logger.exception(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username)) logger.exception(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": msg}, status=status.HTTP_400_BAD_REQUEST) return Response({"error": msg}, status=status.HTTP_400_BAD_REQUEST)
for sub in validated: for sub in validated:
if sub['pool_id'] == pool_id: if sub['subscription_id'] == subscription_id:
sub['valid_key'] = True sub['valid_key'] = True
settings.LICENSE = sub settings.LICENSE = sub
return Response(sub) return Response(sub)

View File

@@ -207,7 +207,8 @@ class URLField(CharField):
if self.allow_plain_hostname: if self.allow_plain_hostname:
try: try:
url_parts = urlparse.urlsplit(value) url_parts = urlparse.urlsplit(value)
if url_parts.hostname and '.' not in url_parts.hostname: looks_like_ipv6 = bool(url_parts.netloc and url_parts.netloc.startswith('[') and url_parts.netloc.endswith(']'))
if not looks_like_ipv6 and url_parts.hostname and '.' not in url_parts.hostname:
netloc = '{}.local'.format(url_parts.hostname) netloc = '{}.local'.format(url_parts.hostname)
if url_parts.port: if url_parts.port:
netloc = '{}:{}'.format(netloc, url_parts.port) netloc = '{}:{}'.format(netloc, url_parts.port)

View File

@@ -27,5 +27,5 @@ def _migrate_setting(apps, old_key, new_key, encrypted=False):
def prefill_rh_credentials(apps, schema_editor): def prefill_rh_credentials(apps, schema_editor):
_migrate_setting(apps, 'REDHAT_USERNAME', 'SUBSCRIPTIONS_USERNAME', encrypted=False) _migrate_setting(apps, 'REDHAT_USERNAME', 'SUBSCRIPTIONS_CLIENT_ID', encrypted=False)
_migrate_setting(apps, 'REDHAT_PASSWORD', 'SUBSCRIPTIONS_PASSWORD', encrypted=True) _migrate_setting(apps, 'REDHAT_PASSWORD', 'SUBSCRIPTIONS_CLIENT_SECRET', encrypted=True)

View File

@@ -128,3 +128,41 @@ class TestURLField:
else: else:
with pytest.raises(ValidationError): with pytest.raises(ValidationError):
field.run_validators(url) field.run_validators(url)
@pytest.mark.parametrize(
"url, expect_error",
[
("https://[1:2:3]", True),
("http://[1:2:3]", True),
("https://[2001:db8:3333:4444:5555:6666:7777:8888", True),
("https://2001:db8:3333:4444:5555:6666:7777:8888", True),
("https://[2001:db8:3333:4444:5555:6666:7777:8888]", False),
("https://[::1]", False),
("https://[::]", False),
("https://[2001:db8::1]", False),
("https://[2001:db8:0:0:0:0:1:1]", False),
("https://[fe80::2%eth0]", True), # ipv6 scope identifier
("https://[fe80:0:0:0:200:f8ff:fe21:67cf]", False),
("https://[::ffff:192.168.1.10]", False),
("https://[0:0:0:0:0:ffff:c000:0201]", False),
("https://[2001:0db8:000a:0001:0000:0000:0000:0000]", False),
("https://[2001:db8:a:1::]", False),
("https://[ff02::1]", False),
("https://[ff02:0:0:0:0:0:0:1]", False),
("https://[fc00::1]", False),
("https://[fd12:3456:789a:1::1]", False),
("https://[2001:db8::abcd:ef12:3456:7890]", False),
("https://[2001:db8:0000:abcd:0000:ef12:0000:3456]", False),
("https://[::ffff:10.0.0.1]", False),
("https://[2001:db8:cafe::]", False),
("https://[2001:db8:cafe:0:0:0:0:0]", False),
("https://[fe80::210:f3ff:fedf:4567%3]", True), # ipv6 scope identifier, numerical interface
],
)
def test_ipv6_urls(self, url, expect_error):
field = URLField()
if expect_error:
with pytest.raises(ValidationError, match="Enter a valid URL"):
field.run_validators(url)
else:
field.run_validators(url)

View File

@@ -22,7 +22,7 @@ from ansible_base.lib.utils.db import advisory_lock
from awx.main.models import Job from awx.main.models import Job
from awx.main.access import access_registry from awx.main.access import access_registry
from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook
from awx.main.utils.analytics_proxy import OIDCClient, DEFAULT_OIDC_TOKEN_ENDPOINT from awx.main.utils.analytics_proxy import OIDCClient
__all__ = ['register', 'gather', 'ship'] __all__ = ['register', 'gather', 'ship']
@@ -186,7 +186,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
if not ( if not (
settings.AUTOMATION_ANALYTICS_URL settings.AUTOMATION_ANALYTICS_URL
and ((settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD) or (settings.SUBSCRIPTIONS_USERNAME and settings.SUBSCRIPTIONS_PASSWORD)) and ((settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD) or (settings.SUBSCRIPTIONS_CLIENT_ID and settings.SUBSCRIPTIONS_CLIENT_SECRET))
): ):
logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.") logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.")
return None return None
@@ -368,8 +368,20 @@ def ship(path):
logger.error('AUTOMATION_ANALYTICS_URL is not set') logger.error('AUTOMATION_ANALYTICS_URL is not set')
return False return False
rh_user = getattr(settings, 'REDHAT_USERNAME', None) rh_id = getattr(settings, 'REDHAT_USERNAME', None)
rh_password = getattr(settings, 'REDHAT_PASSWORD', None) rh_secret = getattr(settings, 'REDHAT_PASSWORD', None)
if not (rh_id and rh_secret):
rh_id = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None)
rh_secret = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None)
if not rh_id:
logger.error('Neither REDHAT_USERNAME nor SUBSCRIPTIONS_CLIENT_ID are set')
return False
if not rh_secret:
logger.error('Neither REDHAT_PASSWORD nor SUBSCRIPTIONS_CLIENT_SECRET are set')
return False
with open(path, 'rb') as f: with open(path, 'rb') as f:
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)} files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
@@ -377,25 +389,13 @@ def ship(path):
s.headers = get_awx_http_client_headers() s.headers = get_awx_http_client_headers()
s.headers.pop('Content-Type') s.headers.pop('Content-Type')
with set_environ(**settings.AWX_TASK_ENV): with set_environ(**settings.AWX_TASK_ENV):
if rh_user and rh_password: try:
try: client = OIDCClient(rh_id, rh_secret)
client = OIDCClient(rh_user, rh_password, DEFAULT_OIDC_TOKEN_ENDPOINT, ['api.console']) response = client.make_request("POST", url, headers=s.headers, files=files, verify=settings.INSIGHTS_CERT_PATH, timeout=(31, 31))
response = client.make_request("POST", url, headers=s.headers, files=files, verify=settings.INSIGHTS_CERT_PATH, timeout=(31, 31)) except requests.RequestException:
except requests.RequestException: logger.error("Automation Analytics API request failed, trying base auth method")
logger.error("Automation Analytics API request failed, trying base auth method") response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_id, rh_secret), headers=s.headers, timeout=(31, 31))
response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31))
elif not rh_user or not rh_password:
logger.info('REDHAT_USERNAME and REDHAT_PASSWORD are not set, using SUBSCRIPTIONS_USERNAME and SUBSCRIPTIONS_PASSWORD')
rh_user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None)
rh_password = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None)
if rh_user and rh_password:
response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31))
elif not rh_user:
logger.error('REDHAT_USERNAME and SUBSCRIPTIONS_USERNAME are not set')
return False
elif not rh_password:
logger.error('REDHAT_PASSWORD and SUBSCRIPTIONS_USERNAME are not set')
return False
# Accept 2XX status_codes # Accept 2XX status_codes
if response.status_code >= 300: if response.status_code >= 300:
logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text)) logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text))

View File

@@ -124,8 +124,8 @@ register(
allow_blank=True, allow_blank=True,
encrypted=False, encrypted=False,
read_only=False, read_only=False,
label=_('Red Hat customer username'), label=_('Red Hat Client ID for Analytics'),
help_text=_('This username is used to send data to Automation Analytics'), help_text=_('Client ID used to send data to Automation Analytics'),
category=_('System'), category=_('System'),
category_slug='system', category_slug='system',
) )
@@ -137,34 +137,34 @@ register(
allow_blank=True, allow_blank=True,
encrypted=True, encrypted=True,
read_only=False, read_only=False,
label=_('Red Hat customer password'), label=_('Red Hat Client Secret for Analytics'),
help_text=_('This password is used to send data to Automation Analytics'), help_text=_('Client secret used to send data to Automation Analytics'),
category=_('System'), category=_('System'),
category_slug='system', category_slug='system',
) )
register( register(
'SUBSCRIPTIONS_USERNAME', 'SUBSCRIPTIONS_CLIENT_ID',
field_class=fields.CharField, field_class=fields.CharField,
default='', default='',
allow_blank=True, allow_blank=True,
encrypted=False, encrypted=False,
read_only=False, read_only=False,
label=_('Red Hat or Satellite username'), label=_('Red Hat Client ID for Subscriptions'),
help_text=_('This username is used to retrieve subscription and content information'), # noqa help_text=_('Client ID used to retrieve subscription and content information'), # noqa
category=_('System'), category=_('System'),
category_slug='system', category_slug='system',
) )
register( register(
'SUBSCRIPTIONS_PASSWORD', 'SUBSCRIPTIONS_CLIENT_SECRET',
field_class=fields.CharField, field_class=fields.CharField,
default='', default='',
allow_blank=True, allow_blank=True,
encrypted=True, encrypted=True,
read_only=False, read_only=False,
label=_('Red Hat or Satellite password'), label=_('Red Hat Client Secret for Subscriptions'),
help_text=_('This password is used to retrieve subscription and content information'), # noqa help_text=_('Client secret used to retrieve subscription and content information'), # noqa
category=_('System'), category=_('System'),
category_slug='system', category_slug='system',
) )

View File

@@ -1,6 +1,9 @@
# Copyright (c) 2015 Ansible, Inc. # Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved. # All Rights Reserved.
# AIA: Primarily AI, Modified content, Human-initiated, Reviewed, Claude (Anthropic AI) via Cursor
# AIA PAI Mc Hin R Claude Cursor - https://aiattribution.github.io/interpret-attribution
# Python # Python
import json import json
import logging import logging
@@ -26,7 +29,151 @@ class CallbackQueueDispatcher(object):
def __init__(self): def __init__(self):
self.queue = getattr(settings, 'CALLBACK_QUEUE', '') self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher') self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
self.connection = redis.Redis.from_url(settings.BROKER_URL) self._broker_url = settings.BROKER_URL
self.connection = redis.Redis.from_url(self._broker_url)
self._connection_failures = 0
self._max_reconnect_attempts = 3
self._total_reconnections = 0
self._events_lost = 0
def _reconnect(self):
"""
Attempt to reconnect to Redis after connection failure.
Returns:
bool: True if reconnection successful, False otherwise
"""
try:
attempt = self._connection_failures + 1
self.logger.warning(
f"Redis reconnection attempt {attempt}/{self._max_reconnect_attempts} " f"(total reconnections this session: {self._total_reconnections})"
)
# Create new connection
self.connection = redis.Redis.from_url(self._broker_url)
# Verify connection works
self.connection.ping()
# Success
self._connection_failures = 0
self._total_reconnections += 1
self.logger.info(f"Successfully reconnected to Redis (session reconnections: {self._total_reconnections})")
return True
except Exception as e:
self._connection_failures += 1
self.logger.error(f"Redis reconnection failed (attempt {self._connection_failures}): {type(e).__name__}: {e}")
return False
def dispatch(self, obj): def dispatch(self, obj):
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder)) """
Dispatch event to Redis queue with automatic reconnection on failure.
Handles BrokenPipeError and ConnectionError by attempting reconnection.
If all reconnection attempts fail, logs the event loss but allows job to continue.
Args:
obj: Event data to dispatch (dict or serializable object)
"""
max_attempts = self._max_reconnect_attempts + 1
last_error = None
# Extract diagnostic info from event
event_type = 'unknown'
job_id = 'unknown'
if isinstance(obj, dict):
event_type = obj.get('event', obj.get('event_name', 'unknown'))
job_id = obj.get('job_id', obj.get('unified_job_id', 'unknown'))
for attempt in range(max_attempts):
try:
# Attempt to push event to Redis
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))
# Success - reset failure counter if this was a recovery
if self._connection_failures > 0:
self.logger.info(f"Redis connection recovered after reconnection. " f"job_id={job_id}, event_type={event_type}")
self._connection_failures = 0
return # Successfully dispatched
except (BrokenPipeError, redis.exceptions.ConnectionError) as e:
last_error = e
error_type = type(e).__name__
self.logger.warning(f"Redis connection error during event dispatch " f"(attempt {attempt + 1}/{max_attempts}): {error_type}: {e}")
# Enhanced diagnostics
self.logger.warning(
f"Failed event details: job_id={job_id}, event_type={event_type}, " f"queue={self.queue}, attempt={attempt + 1}/{max_attempts}"
)
if attempt < max_attempts - 1:
# Try to reconnect before next attempt
reconnected = self._reconnect()
if reconnected:
self.logger.info("Retrying event dispatch after successful reconnection")
else:
self.logger.warning(f"Reconnection failed, will retry dispatch anyway " f"(attempt {attempt + 2} coming)")
# Continue to next attempt
continue
else:
# All attempts exhausted
self._events_lost += 1
self.logger.error(
f"CRITICAL: Failed to dispatch event after {max_attempts} attempts. "
f"Event will be lost. Total events lost this session: {self._events_lost}"
)
self.logger.error(
f"DIAGNOSTIC INFO: "
f"job_id={job_id}, "
f"event_type={event_type}, "
f"queue={self.queue}, "
f"broker_url={self._broker_url}, "
f"last_error={error_type}: {last_error}, "
f"session_reconnections={self._total_reconnections}, "
f"session_events_lost={self._events_lost}"
)
# IMPORTANT: Don't raise exception
# Allow job to continue even though this event was lost
# This prevents losing 17+ minutes of work due to event logging failure
break
except Exception as e:
# Catch any other unexpected Redis errors
self.logger.error(f"Unexpected error dispatching event to Redis: {type(e).__name__}: {e}")
self.logger.error(f"Event context: job_id={job_id}, event_type={event_type}")
# Don't raise - allow job to continue
break
def health_check(self):
"""
Check Redis connection health.
Returns:
bool: True if connection is healthy, False otherwise
"""
try:
self.connection.ping()
return True
except Exception as e:
self.logger.warning(f"Redis health check failed: {type(e).__name__}: {e}")
return False
def get_connection_stats(self):
"""
Get Redis connection statistics for monitoring.
Returns:
dict: Connection statistics
"""
return {
'broker_url': self._broker_url,
'queue': self.queue,
'connected': self.health_check(),
'connection_failures': self._connection_failures,
'total_reconnections': self._total_reconnections,
'events_lost': self._events_lost,
}

View File

@@ -77,7 +77,14 @@ def build_indirect_host_data(job: Job, job_event_queries: dict[str, dict[str, st
if jq_str_for_event not in compiled_jq_expressions: if jq_str_for_event not in compiled_jq_expressions:
compiled_jq_expressions[resolved_action] = jq.compile(jq_str_for_event) compiled_jq_expressions[resolved_action] = jq.compile(jq_str_for_event)
compiled_jq = compiled_jq_expressions[resolved_action] compiled_jq = compiled_jq_expressions[resolved_action]
for data in compiled_jq.input(event.event_data['res']).all():
try:
data_source = compiled_jq.input(event.event_data['res']).all()
except Exception as e:
logger.warning(f'error for module {resolved_action} and data {event.event_data["res"]}: {e}')
continue
for data in data_source:
# From this jq result (specific to a single Ansible module), get index information about this host record # From this jq result (specific to a single Ansible module), get index information about this host record
if not data.get('canonical_facts'): if not data.get('canonical_facts'):
if not facts_missing_logged: if not facts_missing_logged:

View File

@@ -87,8 +87,8 @@ def mock_analytic_post():
{ {
'REDHAT_USERNAME': 'redhat_user', 'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR 'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': '', 'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_PASSWORD': '', 'SUBSCRIPTIONS_CLIENT_SECRET': '',
}, },
True, True,
('redhat_user', 'redhat_pass'), ('redhat_user', 'redhat_pass'),
@@ -98,8 +98,8 @@ def mock_analytic_post():
{ {
'REDHAT_USERNAME': None, 'REDHAT_USERNAME': None,
'REDHAT_PASSWORD': None, 'REDHAT_PASSWORD': None,
'SUBSCRIPTIONS_USERNAME': 'subs_user', 'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR 'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
}, },
True, True,
('subs_user', 'subs_pass'), ('subs_user', 'subs_pass'),
@@ -109,8 +109,8 @@ def mock_analytic_post():
{ {
'REDHAT_USERNAME': '', 'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '', 'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': 'subs_user', 'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR 'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
}, },
True, True,
('subs_user', 'subs_pass'), ('subs_user', 'subs_pass'),
@@ -120,8 +120,8 @@ def mock_analytic_post():
{ {
'REDHAT_USERNAME': '', 'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '', 'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': '', 'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_PASSWORD': '', 'SUBSCRIPTIONS_CLIENT_SECRET': '',
}, },
False, False,
None, # No request should be made None, # No request should be made
@@ -131,8 +131,8 @@ def mock_analytic_post():
{ {
'REDHAT_USERNAME': '', 'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR 'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': 'subs_user', 'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': '', 'SUBSCRIPTIONS_CLIENT_SECRET': '',
}, },
False, False,
None, # Invalid, no request should be made None, # Invalid, no request should be made

View File

@@ -97,8 +97,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True, 'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': 'redhat_user', 'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR 'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': '', 'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_PASSWORD': '', 'SUBSCRIPTIONS_CLIENT_SECRET': '',
}, },
('redhat_user', 'redhat_pass'), ('redhat_user', 'redhat_pass'),
None, None,
@@ -109,8 +109,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True, 'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '', 'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '', 'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': 'subs_user', 'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR 'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
}, },
('subs_user', 'subs_pass'), ('subs_user', 'subs_pass'),
None, None,
@@ -121,8 +121,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True, 'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '', 'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '', 'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': '', 'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_PASSWORD': '', 'SUBSCRIPTIONS_CLIENT_SECRET': '',
}, },
None, None,
ERROR_MISSING_USER, ERROR_MISSING_USER,
@@ -133,8 +133,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True, 'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': 'redhat_user', 'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR 'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': 'subs_user', 'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR 'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
}, },
('redhat_user', 'redhat_pass'), ('redhat_user', 'redhat_pass'),
None, None,
@@ -145,8 +145,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True, 'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '', 'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '', 'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_USERNAME': 'subs_user', # NOSONAR 'SUBSCRIPTIONS_CLIENT_ID': 'subs_user', # NOSONAR
'SUBSCRIPTIONS_PASSWORD': '', 'SUBSCRIPTIONS_CLIENT_SECRET': '',
}, },
None, None,
ERROR_MISSING_PASSWORD, ERROR_MISSING_PASSWORD,

View File

@@ -0,0 +1,37 @@
import json
from http import HTTPStatus
from unittest.mock import patch
from requests import Response
from awx.main.utils.licensing import Licenser
def test_rhsm_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'body': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('awx.main.utils.analytics_proxy.OIDCClient.make_request', new=mocked_requests_get):
subs = licenser.get_rhsm_subs('localhost', 'admin', 'admin')
assert subs == []
def test_satellite_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'results': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('requests.get', new=mocked_requests_get):
subs = licenser.get_satellite_subs('localhost', 'admin', 'admin')
assert subs == []

View File

@@ -23,7 +23,7 @@ class TokenError(requests.RequestException):
try: try:
client = OIDCClient(...) client = OIDCClient(...)
client.make_request(...) client.make_request(...)
except TokenGenerationError as e: except TokenError as e:
print(f"Token generation failed due to {e.__cause__}") print(f"Token generation failed due to {e.__cause__}")
except requests.RequestException: except requests.RequestException:
print("API request failed) print("API request failed)
@@ -102,13 +102,15 @@ class OIDCClient:
self, self,
client_id: str, client_id: str,
client_secret: str, client_secret: str,
token_url: str, token_url: str = DEFAULT_OIDC_TOKEN_ENDPOINT,
scopes: list[str], scopes: list[str] = None,
base_url: str = '', base_url: str = '',
) -> None: ) -> None:
self.client_id: str = client_id self.client_id: str = client_id
self.client_secret: str = client_secret self.client_secret: str = client_secret
self.token_url: str = token_url self.token_url: str = token_url
if scopes is None:
scopes = ['api.console']
self.scopes = scopes self.scopes = scopes
self.base_url: str = base_url self.base_url: str = base_url
self.token: Optional[Token] = None self.token: Optional[Token] = None

View File

@@ -38,6 +38,7 @@ from django.utils.translation import gettext_lazy as _
from awx_plugins.interfaces._temporary_private_licensing_api import detect_server_product_name from awx_plugins.interfaces._temporary_private_licensing_api import detect_server_product_name
from awx.main.constants import SUBSCRIPTION_USAGE_MODEL_UNIQUE_HOSTS from awx.main.constants import SUBSCRIPTION_USAGE_MODEL_UNIQUE_HOSTS
from awx.main.utils.analytics_proxy import OIDCClient
MAX_INSTANCES = 9999999 MAX_INSTANCES = 9999999
@@ -228,37 +229,38 @@ class Licenser(object):
host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None) host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None)
if not user: if not user:
raise ValueError('subscriptions_username is required') raise ValueError('subscriptions_client_id is required')
if not pw: if not pw:
raise ValueError('subscriptions_password is required') raise ValueError('subscriptions_client_secret is required')
if host and user and pw: if host and user and pw:
if 'subscription.rhsm.redhat.com' in host: if 'subscription.rhsm.redhat.com' in host:
json = self.get_rhsm_subs(host, user, pw) json = self.get_rhsm_subs(settings.SUBSCRIPTIONS_RHSM_URL, user, pw)
else: else:
json = self.get_satellite_subs(host, user, pw) json = self.get_satellite_subs(host, user, pw)
return self.generate_license_options_from_entitlements(json) return self.generate_license_options_from_entitlements(json)
return [] return []
def get_rhsm_subs(self, host, user, pw): def get_rhsm_subs(self, host, client_id, client_secret):
verify = getattr(settings, 'REDHAT_CANDLEPIN_VERIFY', True) client = OIDCClient(client_id, client_secret)
json = [] subs = client.make_request(
try: 'GET',
subs = requests.get('/'.join([host, 'subscription/users/{}/owners'.format(user)]), verify=verify, auth=(user, pw)) host,
except requests.exceptions.ConnectionError as error: verify=True,
raise error timeout=(31, 31),
except OSError as error: )
raise OSError(
'Unable to open certificate bundle {}. Check that the service is running on Red Hat Enterprise Linux.'.format(verify)
) from error # noqa
subs.raise_for_status()
for sub in subs.json(): subs.raise_for_status()
resp = requests.get('/'.join([host, 'subscription/owners/{}/pools/?match=*tower*'.format(sub['key'])]), verify=verify, auth=(user, pw)) subs_formatted = []
resp.raise_for_status() for sku in subs.json()['body']:
json.extend(resp.json()) sku_data = {k: v for k, v in sku.items() if k != 'subscriptions'}
return json for sub in sku['subscriptions']:
sub_data = sku_data.copy()
sub_data['subscriptions'] = sub
subs_formatted.append(sub_data)
return subs_formatted
def get_satellite_subs(self, host, user, pw): def get_satellite_subs(self, host, user, pw):
port = None port = None
@@ -267,7 +269,7 @@ class Licenser(object):
port = str(self.config.get("server", "port")) port = str(self.config.get("server", "port"))
except Exception as e: except Exception as e:
logger.exception('Unable to read rhsm config to get ca_cert location. {}'.format(str(e))) logger.exception('Unable to read rhsm config to get ca_cert location. {}'.format(str(e)))
verify = getattr(settings, 'REDHAT_CANDLEPIN_VERIFY', True) verify = True
if port: if port:
host = ':'.join([host, port]) host = ':'.join([host, port])
json = [] json = []
@@ -314,20 +316,11 @@ class Licenser(object):
return False return False
return True return True
def is_appropriate_sub(self, sub):
if sub['activeSubscription'] is False:
return False
# Products that contain Ansible Tower
products = sub.get('providedProducts', [])
if any(product.get('productId') == '480' for product in products):
return True
return False
def generate_license_options_from_entitlements(self, json): def generate_license_options_from_entitlements(self, json):
from dateutil.parser import parse from dateutil.parser import parse
ValidSub = collections.namedtuple( ValidSub = collections.namedtuple(
'ValidSub', 'sku name support_level end_date trial developer_license quantity pool_id satellite subscription_id account_number usage' 'ValidSub', 'sku name support_level end_date trial developer_license quantity satellite subscription_id account_number usage'
) )
valid_subs = [] valid_subs = []
for sub in json: for sub in json:
@@ -335,10 +328,14 @@ class Licenser(object):
if satellite: if satellite:
is_valid = self.is_appropriate_sat_sub(sub) is_valid = self.is_appropriate_sat_sub(sub)
else: else:
is_valid = self.is_appropriate_sub(sub) # the list of subs from console.redhat.com are already valid based on the query params we provided
is_valid = True
if is_valid: if is_valid:
try: try:
end_date = parse(sub.get('endDate')) if satellite:
end_date = parse(sub.get('endDate'))
else:
end_date = parse(sub['subscriptions']['endDate'])
except Exception: except Exception:
continue continue
now = datetime.utcnow() now = datetime.utcnow()
@@ -346,44 +343,50 @@ class Licenser(object):
if end_date < now: if end_date < now:
# If the sub has a past end date, skip it # If the sub has a past end date, skip it
continue continue
try:
quantity = int(sub['quantity'])
if quantity == -1:
# effectively, unlimited
quantity = MAX_INSTANCES
except Exception:
continue
sku = sub['productId']
trial = sku.startswith('S') # i.e.,, SER/SVC
developer_license = False developer_license = False
support_level = '' support_level = ''
usage = '' account_number = ''
pool_id = sub['id'] usage = sub.get('usage', '')
subscription_id = sub['subscriptionId']
account_number = sub['accountNumber']
if satellite: if satellite:
try:
quantity = int(sub['quantity'])
except Exception:
continue
sku = sub['productId']
subscription_id = sub['subscriptionId']
sub_name = sub['productName']
support_level = sub['support_level'] support_level = sub['support_level']
usage = sub['usage'] account_number = sub['accountNumber']
else: else:
for attr in sub.get('productAttributes', []): try:
if attr.get('name') == 'support_level': if sub['capacity']['name'] == "Nodes":
support_level = attr.get('value') quantity = int(sub['capacity']['quantity']) * int(sub['subscriptions']['quantity'])
elif attr.get('name') == 'usage': else:
usage = attr.get('value') continue
elif attr.get('name') == 'ph_product_name' and attr.get('value') == 'RHEL Developer': except Exception:
developer_license = True continue
sku = sub['sku']
sub_name = sub['name']
support_level = sub['serviceLevel']
subscription_id = sub['subscriptions']['number']
if sub.get('name') == 'RHEL Developer':
developer_license = True
if quantity == -1:
# effectively, unlimited
quantity = MAX_INSTANCES
trial = sku.startswith('S') # i.e.,, SER/SVC
valid_subs.append( valid_subs.append(
ValidSub( ValidSub(
sku, sku,
sub['productName'], sub_name,
support_level, support_level,
end_date, end_date,
trial, trial,
developer_license, developer_license,
quantity, quantity,
pool_id,
satellite, satellite,
subscription_id, subscription_id,
account_number, account_number,
@@ -414,7 +417,6 @@ class Licenser(object):
license._attrs['satellite'] = satellite license._attrs['satellite'] = satellite
license._attrs['valid_key'] = True license._attrs['valid_key'] = True
license.update(license_date=int(sub.end_date.strftime('%s'))) license.update(license_date=int(sub.end_date.strftime('%s')))
license.update(pool_id=sub.pool_id)
license.update(subscription_id=sub.subscription_id) license.update(subscription_id=sub.subscription_id)
license.update(account_number=sub.account_number) license.update(account_number=sub.account_number)
licenses.append(license._attrs.copy()) licenses.append(license._attrs.copy())

View File

@@ -964,6 +964,9 @@ CLUSTER_HOST_ID = socket.gethostname()
# - 'unique_managed_hosts': Compliant = automated - deleted hosts (using /api/v2/host_metrics/) # - 'unique_managed_hosts': Compliant = automated - deleted hosts (using /api/v2/host_metrics/)
SUBSCRIPTION_USAGE_MODEL = '' SUBSCRIPTION_USAGE_MODEL = ''
# Default URL and query params for obtaining valid AAP subscriptions
SUBSCRIPTIONS_RHSM_URL = 'https://console.redhat.com/api/rhsm/v2/products?include=providedProducts&oids=480&status=Active'
# Host metrics cleanup - last time of the task/command run # Host metrics cleanup - last time of the task/command run
CLEANUP_HOST_METRICS_LAST_TS = None CLEANUP_HOST_METRICS_LAST_TS = None
# Host metrics cleanup - minimal interval between two cleanups in days # Host metrics cleanup - minimal interval between two cleanups in days

View File

@@ -31,9 +31,9 @@ options:
unlicensed or trial licensed. When force=true, the license is always applied. unlicensed or trial licensed. When force=true, the license is always applied.
type: bool type: bool
default: 'False' default: 'False'
pool_id: subscription_id:
description: description:
- Red Hat or Red Hat Satellite pool_id to attach to - Red Hat or Red Hat Satellite subscription_id to attach to
required: False required: False
type: str type: str
state: state:
@@ -57,9 +57,9 @@ EXAMPLES = '''
username: "my_satellite_username" username: "my_satellite_username"
password: "my_satellite_password" password: "my_satellite_password"
- name: Attach to a pool (requires fetching subscriptions at least once before) - name: Attach to a subscription (requires fetching subscriptions at least once before)
license: license:
pool_id: 123456 subscription_id: 123456
- name: Remove license - name: Remove license
license: license:
@@ -75,14 +75,14 @@ def main():
module = ControllerAPIModule( module = ControllerAPIModule(
argument_spec=dict( argument_spec=dict(
manifest=dict(type='str', required=False), manifest=dict(type='str', required=False),
pool_id=dict(type='str', required=False), subscription_id=dict(type='str', required=False),
force=dict(type='bool', default=False), force=dict(type='bool', default=False),
state=dict(choices=['present', 'absent'], default='present'), state=dict(choices=['present', 'absent'], default='present'),
), ),
required_if=[ required_if=[
['state', 'present', ['manifest', 'pool_id'], True], ['state', 'present', ['manifest', 'subscription_id'], True],
], ],
mutually_exclusive=[("manifest", "pool_id")], mutually_exclusive=[("manifest", "subscription_id")],
) )
json_output = {'changed': False} json_output = {'changed': False}
@@ -124,7 +124,7 @@ def main():
if module.params.get('manifest', None): if module.params.get('manifest', None):
module.post_endpoint('config', data={'manifest': manifest.decode()}) module.post_endpoint('config', data={'manifest': manifest.decode()})
else: else:
module.post_endpoint('config/attach', data={'pool_id': module.params.get('pool_id')}) module.post_endpoint('config/attach', data={'subscription_id': module.params.get('subscription_id')})
module.exit_json(**json_output) module.exit_json(**json_output)

View File

@@ -20,15 +20,15 @@ description:
- Get subscriptions available to Automation Platform Controller. See - Get subscriptions available to Automation Platform Controller. See
U(https://www.ansible.com/tower) for an overview. U(https://www.ansible.com/tower) for an overview.
options: options:
username: client_id:
description: description:
- Red Hat or Red Hat Satellite username to get available subscriptions. - Red Hat service account client ID or Red Hat Satellite username to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions - The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True required: True
type: str type: str
password: client_secret:
description: description:
- Red Hat or Red Hat Satellite password to get available subscriptions. - Red Hat service account client secret or Red Hat Satellite password to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions - The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True required: True
type: str type: str
@@ -53,13 +53,13 @@ subscriptions:
EXAMPLES = ''' EXAMPLES = '''
- name: Get subscriptions - name: Get subscriptions
subscriptions: subscriptions:
username: "my_username" client_id: "c6bd7594-d776-46e5-8156-6d17af147479"
password: "My Password" client_secret: "MO9QUvoOZ5fc5JQKXoTch1AsTLI7nFsZ"
- name: Get subscriptions with a filter - name: Get subscriptions with a filter
subscriptions: subscriptions:
username: "my_username" client_id: "c6bd7594-d776-46e5-8156-6d17af147479"
password: "My Password" client_secret: "MO9QUvoOZ5fc5JQKXoTch1AsTLI7nFsZ"
filters: filters:
product_name: "Red Hat Ansible Automation Platform" product_name: "Red Hat Ansible Automation Platform"
support_level: "Self-Support" support_level: "Self-Support"
@@ -72,8 +72,8 @@ def main():
module = ControllerAPIModule( module = ControllerAPIModule(
argument_spec=dict( argument_spec=dict(
username=dict(type='str', required=True), client_id=dict(type='str', required=True),
password=dict(type='str', no_log=True, required=True), client_secret=dict(type='str', no_log=True, required=True),
filters=dict(type='dict', required=False, default={}), filters=dict(type='dict', required=False, default={}),
), ),
) )
@@ -82,8 +82,8 @@ def main():
# Check if Tower is already licensed # Check if Tower is already licensed
post_data = { post_data = {
'subscriptions_password': module.params.get('password'), 'subscriptions_client_secret': module.params.get('client_secret'),
'subscriptions_username': module.params.get('username'), 'subscriptions_client_id': module.params.get('client_id'),
} }
all_subscriptions = module.post_endpoint('config/subscriptions', data=post_data)['json'] all_subscriptions = module.post_endpoint('config/subscriptions', data=post_data)['json']
json_output['subscriptions'] = [] json_output['subscriptions'] = []