Compare commits

..

5 Commits

Author SHA1 Message Date
Lila
98dc60f9d6 Update tests to expect batch_size to agree with changes 2025-04-18 16:08:18 -04:00
Lila
29addb6ad0 Remove del load_credentials to resolve CI issue 2025-04-18 15:22:35 -04:00
Lila
c2f8acebb1 Sort both bulk updates and add batch size to facts bulk update to resolve deadlock issue 2025-04-18 15:19:16 -04:00
Alan Rominger
65d309f44a Comment out actual fix 2025-04-18 15:16:41 -04:00
Alan Rominger
8bcc65fe80 Demo of sorting hosts 2025-04-18 15:16:08 -04:00
21 changed files with 239 additions and 399 deletions

View File

@@ -10,7 +10,7 @@ from awx.api.generics import APIView, Response
from awx.api.permissions import AnalyticsPermission
from awx.api.versioning import reverse
from awx.main.utils import get_awx_version
from awx.main.utils.analytics_proxy import OIDCClient
from awx.main.utils.analytics_proxy import OIDCClient, DEFAULT_OIDC_TOKEN_ENDPOINT
from rest_framework import status
from collections import OrderedDict
@@ -205,7 +205,7 @@ class AnalyticsGenericView(APIView):
try:
rh_user = self._get_setting('REDHAT_USERNAME', None, ERROR_MISSING_USER)
rh_password = self._get_setting('REDHAT_PASSWORD', None, ERROR_MISSING_PASSWORD)
client = OIDCClient(rh_user, rh_password)
client = OIDCClient(rh_user, rh_password, DEFAULT_OIDC_TOKEN_ENDPOINT, ['api.console'])
response = client.make_request(
method,
url,
@@ -219,8 +219,8 @@ class AnalyticsGenericView(APIView):
logger.error("Automation Analytics API request failed, trying base auth method")
response = self._base_auth_request(request, method, url, rh_user, rh_password, headers)
except MissingSettings:
rh_user = self._get_setting('SUBSCRIPTIONS_CLIENT_ID', None, ERROR_MISSING_USER)
rh_password = self._get_setting('SUBSCRIPTIONS_CLIENT_SECRET', None, ERROR_MISSING_PASSWORD)
rh_user = self._get_setting('SUBSCRIPTIONS_USERNAME', None, ERROR_MISSING_USER)
rh_password = self._get_setting('SUBSCRIPTIONS_PASSWORD', None, ERROR_MISSING_PASSWORD)
response = self._base_auth_request(request, method, url, rh_user, rh_password, headers)
#
# Missing or wrong user/pass

View File

@@ -32,7 +32,6 @@ from awx.api.versioning import URLPathVersioning, reverse, drf_reverse
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS
from awx.main.models import Project, Organization, Instance, InstanceGroup, JobTemplate
from awx.main.utils import set_environ
from awx.main.utils.analytics_proxy import TokenError
from awx.main.utils.licensing import get_licenser
logger = logging.getLogger('awx.api.views.root')
@@ -177,21 +176,19 @@ class ApiV2SubscriptionView(APIView):
def post(self, request):
data = request.data.copy()
if data.get('subscriptions_client_secret') == '$encrypted$':
data['subscriptions_client_secret'] = settings.SUBSCRIPTIONS_CLIENT_SECRET
if data.get('subscriptions_password') == '$encrypted$':
data['subscriptions_password'] = settings.SUBSCRIPTIONS_PASSWORD
try:
user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret')
user, pw = data.get('subscriptions_username'), data.get('subscriptions_password')
with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw)
if user:
settings.SUBSCRIPTIONS_CLIENT_ID = data['subscriptions_client_id']
settings.SUBSCRIPTIONS_USERNAME = data['subscriptions_username']
if pw:
settings.SUBSCRIPTIONS_CLIENT_SECRET = data['subscriptions_client_secret']
settings.SUBSCRIPTIONS_PASSWORD = data['subscriptions_password']
except Exception as exc:
msg = _("Invalid Subscription")
if isinstance(exc, TokenError) or (
isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401
):
if isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401:
msg = _("The provided credentials are invalid (HTTP 401).")
elif isinstance(exc, requests.exceptions.ProxyError):
msg = _("Unable to connect to proxy server.")
@@ -218,12 +215,12 @@ class ApiV2AttachView(APIView):
def post(self, request):
data = request.data.copy()
subscription_id = data.get('subscription_id', None)
if not subscription_id:
return Response({"error": _("No subscription ID provided.")}, status=status.HTTP_400_BAD_REQUEST)
user = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None)
pw = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None)
if subscription_id and user and pw:
pool_id = data.get('pool_id', None)
if not pool_id:
return Response({"error": _("No subscription pool ID provided.")}, status=status.HTTP_400_BAD_REQUEST)
user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None)
pw = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None)
if pool_id and user and pw:
data = request.data.copy()
try:
with set_environ(**settings.AWX_TASK_ENV):
@@ -242,7 +239,7 @@ class ApiV2AttachView(APIView):
logger.exception(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": msg}, status=status.HTTP_400_BAD_REQUEST)
for sub in validated:
if sub['subscription_id'] == subscription_id:
if sub['pool_id'] == pool_id:
sub['valid_key'] = True
settings.LICENSE = sub
return Response(sub)

View File

@@ -207,8 +207,7 @@ class URLField(CharField):
if self.allow_plain_hostname:
try:
url_parts = urlparse.urlsplit(value)
looks_like_ipv6 = bool(url_parts.netloc and url_parts.netloc.startswith('[') and url_parts.netloc.endswith(']'))
if not looks_like_ipv6 and url_parts.hostname and '.' not in url_parts.hostname:
if url_parts.hostname and '.' not in url_parts.hostname:
netloc = '{}.local'.format(url_parts.hostname)
if url_parts.port:
netloc = '{}:{}'.format(netloc, url_parts.port)

View File

@@ -27,5 +27,5 @@ def _migrate_setting(apps, old_key, new_key, encrypted=False):
def prefill_rh_credentials(apps, schema_editor):
_migrate_setting(apps, 'REDHAT_USERNAME', 'SUBSCRIPTIONS_CLIENT_ID', encrypted=False)
_migrate_setting(apps, 'REDHAT_PASSWORD', 'SUBSCRIPTIONS_CLIENT_SECRET', encrypted=True)
_migrate_setting(apps, 'REDHAT_USERNAME', 'SUBSCRIPTIONS_USERNAME', encrypted=False)
_migrate_setting(apps, 'REDHAT_PASSWORD', 'SUBSCRIPTIONS_PASSWORD', encrypted=True)

View File

@@ -128,41 +128,3 @@ class TestURLField:
else:
with pytest.raises(ValidationError):
field.run_validators(url)
@pytest.mark.parametrize(
"url, expect_error",
[
("https://[1:2:3]", True),
("http://[1:2:3]", True),
("https://[2001:db8:3333:4444:5555:6666:7777:8888", True),
("https://2001:db8:3333:4444:5555:6666:7777:8888", True),
("https://[2001:db8:3333:4444:5555:6666:7777:8888]", False),
("https://[::1]", False),
("https://[::]", False),
("https://[2001:db8::1]", False),
("https://[2001:db8:0:0:0:0:1:1]", False),
("https://[fe80::2%eth0]", True), # ipv6 scope identifier
("https://[fe80:0:0:0:200:f8ff:fe21:67cf]", False),
("https://[::ffff:192.168.1.10]", False),
("https://[0:0:0:0:0:ffff:c000:0201]", False),
("https://[2001:0db8:000a:0001:0000:0000:0000:0000]", False),
("https://[2001:db8:a:1::]", False),
("https://[ff02::1]", False),
("https://[ff02:0:0:0:0:0:0:1]", False),
("https://[fc00::1]", False),
("https://[fd12:3456:789a:1::1]", False),
("https://[2001:db8::abcd:ef12:3456:7890]", False),
("https://[2001:db8:0000:abcd:0000:ef12:0000:3456]", False),
("https://[::ffff:10.0.0.1]", False),
("https://[2001:db8:cafe::]", False),
("https://[2001:db8:cafe:0:0:0:0:0]", False),
("https://[fe80::210:f3ff:fedf:4567%3]", True), # ipv6 scope identifier, numerical interface
],
)
def test_ipv6_urls(self, url, expect_error):
field = URLField()
if expect_error:
with pytest.raises(ValidationError, match="Enter a valid URL"):
field.run_validators(url)
else:
field.run_validators(url)

View File

@@ -22,7 +22,7 @@ from ansible_base.lib.utils.db import advisory_lock
from awx.main.models import Job
from awx.main.access import access_registry
from awx.main.utils import get_awx_http_client_headers, set_environ, datetime_hook
from awx.main.utils.analytics_proxy import OIDCClient
from awx.main.utils.analytics_proxy import OIDCClient, DEFAULT_OIDC_TOKEN_ENDPOINT
__all__ = ['register', 'gather', 'ship']
@@ -186,7 +186,7 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
if not (
settings.AUTOMATION_ANALYTICS_URL
and ((settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD) or (settings.SUBSCRIPTIONS_CLIENT_ID and settings.SUBSCRIPTIONS_CLIENT_SECRET))
and ((settings.REDHAT_USERNAME and settings.REDHAT_PASSWORD) or (settings.SUBSCRIPTIONS_USERNAME and settings.SUBSCRIPTIONS_PASSWORD))
):
logger.log(log_level, "Not gathering analytics, configuration is invalid. Use --dry-run to gather locally without sending.")
return None
@@ -368,20 +368,8 @@ def ship(path):
logger.error('AUTOMATION_ANALYTICS_URL is not set')
return False
rh_id = getattr(settings, 'REDHAT_USERNAME', None)
rh_secret = getattr(settings, 'REDHAT_PASSWORD', None)
if not (rh_id and rh_secret):
rh_id = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None)
rh_secret = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None)
if not rh_id:
logger.error('Neither REDHAT_USERNAME nor SUBSCRIPTIONS_CLIENT_ID are set')
return False
if not rh_secret:
logger.error('Neither REDHAT_PASSWORD nor SUBSCRIPTIONS_CLIENT_SECRET are set')
return False
rh_user = getattr(settings, 'REDHAT_USERNAME', None)
rh_password = getattr(settings, 'REDHAT_PASSWORD', None)
with open(path, 'rb') as f:
files = {'file': (os.path.basename(path), f, settings.INSIGHTS_AGENT_MIME)}
@@ -389,13 +377,25 @@ def ship(path):
s.headers = get_awx_http_client_headers()
s.headers.pop('Content-Type')
with set_environ(**settings.AWX_TASK_ENV):
try:
client = OIDCClient(rh_id, rh_secret)
response = client.make_request("POST", url, headers=s.headers, files=files, verify=settings.INSIGHTS_CERT_PATH, timeout=(31, 31))
except requests.RequestException:
logger.error("Automation Analytics API request failed, trying base auth method")
response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_id, rh_secret), headers=s.headers, timeout=(31, 31))
if rh_user and rh_password:
try:
client = OIDCClient(rh_user, rh_password, DEFAULT_OIDC_TOKEN_ENDPOINT, ['api.console'])
response = client.make_request("POST", url, headers=s.headers, files=files, verify=settings.INSIGHTS_CERT_PATH, timeout=(31, 31))
except requests.RequestException:
logger.error("Automation Analytics API request failed, trying base auth method")
response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31))
elif not rh_user or not rh_password:
logger.info('REDHAT_USERNAME and REDHAT_PASSWORD are not set, using SUBSCRIPTIONS_USERNAME and SUBSCRIPTIONS_PASSWORD')
rh_user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None)
rh_password = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None)
if rh_user and rh_password:
response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31))
elif not rh_user:
logger.error('REDHAT_USERNAME and SUBSCRIPTIONS_USERNAME are not set')
return False
elif not rh_password:
logger.error('REDHAT_PASSWORD and SUBSCRIPTIONS_USERNAME are not set')
return False
# Accept 2XX status_codes
if response.status_code >= 300:
logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text))

View File

@@ -124,8 +124,8 @@ register(
allow_blank=True,
encrypted=False,
read_only=False,
label=_('Red Hat Client ID for Analytics'),
help_text=_('Client ID used to send data to Automation Analytics'),
label=_('Red Hat customer username'),
help_text=_('This username is used to send data to Automation Analytics'),
category=_('System'),
category_slug='system',
)
@@ -137,34 +137,34 @@ register(
allow_blank=True,
encrypted=True,
read_only=False,
label=_('Red Hat Client Secret for Analytics'),
help_text=_('Client secret used to send data to Automation Analytics'),
label=_('Red Hat customer password'),
help_text=_('This password is used to send data to Automation Analytics'),
category=_('System'),
category_slug='system',
)
register(
'SUBSCRIPTIONS_CLIENT_ID',
'SUBSCRIPTIONS_USERNAME',
field_class=fields.CharField,
default='',
allow_blank=True,
encrypted=False,
read_only=False,
label=_('Red Hat Client ID for Subscriptions'),
help_text=_('Client ID used to retrieve subscription and content information'), # noqa
label=_('Red Hat or Satellite username'),
help_text=_('This username is used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
)
register(
'SUBSCRIPTIONS_CLIENT_SECRET',
'SUBSCRIPTIONS_PASSWORD',
field_class=fields.CharField,
default='',
allow_blank=True,
encrypted=True,
read_only=False,
label=_('Red Hat Client Secret for Subscriptions'),
help_text=_('Client secret used to retrieve subscription and content information'), # noqa
label=_('Red Hat or Satellite password'),
help_text=_('This password is used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
)

View File

@@ -602,7 +602,7 @@ class JobEvent(BasePlaybookEvent):
h.last_job_host_summary_id = host_mapping[h.id]
updated_hosts.add(h)
Host.objects.bulk_update(list(updated_hosts), ['last_job_id', 'last_job_host_summary_id'], batch_size=100)
Host.objects.bulk_update(sorted(updated_hosts, key=lambda host: host.id), ['last_job_id', 'last_job_host_summary_id'], batch_size=100)
# Create/update Host Metrics
self._update_host_metrics(updated_hosts_list)

View File

@@ -1,9 +1,6 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# AIA: Primarily AI, Modified content, Human-initiated, Reviewed, Claude (Anthropic AI) via Cursor
# AIA PAI Mc Hin R Claude Cursor - https://aiattribution.github.io/interpret-attribution
# Python
import json
import logging
@@ -29,151 +26,7 @@ class CallbackQueueDispatcher(object):
def __init__(self):
self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
self._broker_url = settings.BROKER_URL
self.connection = redis.Redis.from_url(self._broker_url)
self._connection_failures = 0
self._max_reconnect_attempts = 3
self._total_reconnections = 0
self._events_lost = 0
def _reconnect(self):
"""
Attempt to reconnect to Redis after connection failure.
Returns:
bool: True if reconnection successful, False otherwise
"""
try:
attempt = self._connection_failures + 1
self.logger.warning(
f"Redis reconnection attempt {attempt}/{self._max_reconnect_attempts} " f"(total reconnections this session: {self._total_reconnections})"
)
# Create new connection
self.connection = redis.Redis.from_url(self._broker_url)
# Verify connection works
self.connection.ping()
# Success
self._connection_failures = 0
self._total_reconnections += 1
self.logger.info(f"Successfully reconnected to Redis (session reconnections: {self._total_reconnections})")
return True
except Exception as e:
self._connection_failures += 1
self.logger.error(f"Redis reconnection failed (attempt {self._connection_failures}): {type(e).__name__}: {e}")
return False
self.connection = redis.Redis.from_url(settings.BROKER_URL)
def dispatch(self, obj):
"""
Dispatch event to Redis queue with automatic reconnection on failure.
Handles BrokenPipeError and ConnectionError by attempting reconnection.
If all reconnection attempts fail, logs the event loss but allows job to continue.
Args:
obj: Event data to dispatch (dict or serializable object)
"""
max_attempts = self._max_reconnect_attempts + 1
last_error = None
# Extract diagnostic info from event
event_type = 'unknown'
job_id = 'unknown'
if isinstance(obj, dict):
event_type = obj.get('event', obj.get('event_name', 'unknown'))
job_id = obj.get('job_id', obj.get('unified_job_id', 'unknown'))
for attempt in range(max_attempts):
try:
# Attempt to push event to Redis
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))
# Success - reset failure counter if this was a recovery
if self._connection_failures > 0:
self.logger.info(f"Redis connection recovered after reconnection. " f"job_id={job_id}, event_type={event_type}")
self._connection_failures = 0
return # Successfully dispatched
except (BrokenPipeError, redis.exceptions.ConnectionError) as e:
last_error = e
error_type = type(e).__name__
self.logger.warning(f"Redis connection error during event dispatch " f"(attempt {attempt + 1}/{max_attempts}): {error_type}: {e}")
# Enhanced diagnostics
self.logger.warning(
f"Failed event details: job_id={job_id}, event_type={event_type}, " f"queue={self.queue}, attempt={attempt + 1}/{max_attempts}"
)
if attempt < max_attempts - 1:
# Try to reconnect before next attempt
reconnected = self._reconnect()
if reconnected:
self.logger.info("Retrying event dispatch after successful reconnection")
else:
self.logger.warning(f"Reconnection failed, will retry dispatch anyway " f"(attempt {attempt + 2} coming)")
# Continue to next attempt
continue
else:
# All attempts exhausted
self._events_lost += 1
self.logger.error(
f"CRITICAL: Failed to dispatch event after {max_attempts} attempts. "
f"Event will be lost. Total events lost this session: {self._events_lost}"
)
self.logger.error(
f"DIAGNOSTIC INFO: "
f"job_id={job_id}, "
f"event_type={event_type}, "
f"queue={self.queue}, "
f"broker_url={self._broker_url}, "
f"last_error={error_type}: {last_error}, "
f"session_reconnections={self._total_reconnections}, "
f"session_events_lost={self._events_lost}"
)
# IMPORTANT: Don't raise exception
# Allow job to continue even though this event was lost
# This prevents losing 17+ minutes of work due to event logging failure
break
except Exception as e:
# Catch any other unexpected Redis errors
self.logger.error(f"Unexpected error dispatching event to Redis: {type(e).__name__}: {e}")
self.logger.error(f"Event context: job_id={job_id}, event_type={event_type}")
# Don't raise - allow job to continue
break
def health_check(self):
"""
Check Redis connection health.
Returns:
bool: True if connection is healthy, False otherwise
"""
try:
self.connection.ping()
return True
except Exception as e:
self.logger.warning(f"Redis health check failed: {type(e).__name__}: {e}")
return False
def get_connection_stats(self):
"""
Get Redis connection statistics for monitoring.
Returns:
dict: Connection statistics
"""
return {
'broker_url': self._broker_url,
'queue': self.queue,
'connected': self.health_check(),
'connection_failures': self._connection_failures,
'total_reconnections': self._total_reconnections,
'events_lost': self._events_lost,
}
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))

View File

@@ -62,7 +62,8 @@ def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=No
def raw_update_hosts(host_list):
Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'])
host_list = sorted(host_list, key=lambda host: host.id)
Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'], batch_size=100)
def update_hosts(host_list, max_tries=5):

View File

@@ -77,14 +77,7 @@ def build_indirect_host_data(job: Job, job_event_queries: dict[str, dict[str, st
if jq_str_for_event not in compiled_jq_expressions:
compiled_jq_expressions[resolved_action] = jq.compile(jq_str_for_event)
compiled_jq = compiled_jq_expressions[resolved_action]
try:
data_source = compiled_jq.input(event.event_data['res']).all()
except Exception as e:
logger.warning(f'error for module {resolved_action} and data {event.event_data["res"]}: {e}')
continue
for data in data_source:
for data in compiled_jq.input(event.event_data['res']).all():
# From this jq result (specific to a single Ansible module), get index information about this host record
if not data.get('canonical_facts'):
if not facts_missing_logged:

View File

@@ -87,8 +87,8 @@ def mock_analytic_post():
{
'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_CLIENT_SECRET': '',
'SUBSCRIPTIONS_USERNAME': '',
'SUBSCRIPTIONS_PASSWORD': '',
},
True,
('redhat_user', 'redhat_pass'),
@@ -98,8 +98,8 @@ def mock_analytic_post():
{
'REDHAT_USERNAME': None,
'REDHAT_PASSWORD': None,
'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR
},
True,
('subs_user', 'subs_pass'),
@@ -109,8 +109,8 @@ def mock_analytic_post():
{
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR
},
True,
('subs_user', 'subs_pass'),
@@ -120,8 +120,8 @@ def mock_analytic_post():
{
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_CLIENT_SECRET': '',
'SUBSCRIPTIONS_USERNAME': '',
'SUBSCRIPTIONS_PASSWORD': '',
},
False,
None, # No request should be made
@@ -131,8 +131,8 @@ def mock_analytic_post():
{
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_CLIENT_SECRET': '',
'SUBSCRIPTIONS_USERNAME': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': '',
},
False,
None, # Invalid, no request should be made

View File

@@ -97,8 +97,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_CLIENT_SECRET': '',
'SUBSCRIPTIONS_USERNAME': '',
'SUBSCRIPTIONS_PASSWORD': '',
},
('redhat_user', 'redhat_pass'),
None,
@@ -109,8 +109,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR
},
('subs_user', 'subs_pass'),
None,
@@ -121,8 +121,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_CLIENT_ID': '',
'SUBSCRIPTIONS_CLIENT_SECRET': '',
'SUBSCRIPTIONS_USERNAME': '',
'SUBSCRIPTIONS_PASSWORD': '',
},
None,
ERROR_MISSING_USER,
@@ -133,8 +133,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': 'redhat_user',
'REDHAT_PASSWORD': 'redhat_pass', # NOSONAR
'SUBSCRIPTIONS_CLIENT_ID': 'subs_user',
'SUBSCRIPTIONS_CLIENT_SECRET': 'subs_pass', # NOSONAR
'SUBSCRIPTIONS_USERNAME': 'subs_user',
'SUBSCRIPTIONS_PASSWORD': 'subs_pass', # NOSONAR
},
('redhat_user', 'redhat_pass'),
None,
@@ -145,8 +145,8 @@ class TestAnalyticsGenericView:
'INSIGHTS_TRACKING_STATE': True,
'REDHAT_USERNAME': '',
'REDHAT_PASSWORD': '',
'SUBSCRIPTIONS_CLIENT_ID': 'subs_user', # NOSONAR
'SUBSCRIPTIONS_CLIENT_SECRET': '',
'SUBSCRIPTIONS_USERNAME': 'subs_user', # NOSONAR
'SUBSCRIPTIONS_PASSWORD': '',
},
None,
ERROR_MISSING_PASSWORD,

View File

@@ -0,0 +1,79 @@
import multiprocessing
import random
from django.db import connection
from django.utils.timezone import now
from awx.main.models import Inventory, Host
def worker_delete_target(ready_event, continue_event, field_name):
"""Runs the bulk update, will be called in duplicate, in parallel"""
inv = Inventory.objects.get(organization__name='Default', name='test_host_update_contention')
host_list = list(inv.hosts.all())
random.shuffle(host_list)
for i, host in enumerate(host_list):
setattr(host, field_name, f'my_var: {i}')
# ready to do the bulk_update
print('worker has loaded all the hosts needed')
ready_event.set()
# wait for the coordination message
continue_event.wait()
# # presumed fix
# host_list = sorted(host_list, key=lambda host: host.id)
# NOTE: did not reproduce the bug without batch_size
Host.objects.bulk_update(host_list, [field_name], batch_size=100)
print('finished doing the bulk update in worker')
def test_host_update_contention(default_org):
inv_kwargs = dict(organization=default_org, name='test_host_update_contention')
if Inventory.objects.filter(**inv_kwargs).exists():
inv = Inventory.objects.get(**inv_kwargs).delete()
inv = Inventory.objects.create(**inv_kwargs)
right_now = now()
hosts = [Host(inventory=inv, name=f'host-{i}', created=right_now, modified=right_now) for i in range(1000)]
print('bulk creating hosts')
Host.objects.bulk_create(hosts)
# sanity check
for host in hosts:
assert not host.variables
# Force our worker pool to make their own connection
connection.close()
ready_events = [multiprocessing.Event() for _ in range(2)]
continue_event = multiprocessing.Event()
print('spawning processes for concurrent bulk updates')
processes = []
fields = ['variables', 'ansible_facts']
for i in range(2):
p = multiprocessing.Process(target=worker_delete_target, args=(ready_events[i], continue_event, fields[i]))
processes.append(p)
p.start()
# Assure both processes are connected and have loaded their host list
for e in ready_events:
print('waiting on subprocess ready event')
e.wait()
# Begin the bulk_update queries
print('setting the continue event for the workers')
continue_event.set()
# if a Deadloack happens it will probably be surfaced by result here
print('waiting on the workers to finish the bulk_update')
for p in processes:
p.join()
print('checking workers have variables set')
for host in inv.hosts.all():
assert host.variables.startswith('my_var:')
assert host.ansible_facts.startswith('my_var:')

View File

@@ -103,7 +103,7 @@ def test_finish_job_fact_cache_with_existing_data(hosts, mocker, tmpdir, ref_tim
assert host.ansible_facts_modified == ref_time
assert hosts[1].ansible_facts == ansible_facts_new
assert hosts[1].ansible_facts_modified > ref_time
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'])
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'], batch_size=100)
def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir):
@@ -139,4 +139,4 @@ def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir):
assert host.ansible_facts_modified == ref_time
assert hosts[1].ansible_facts == {}
assert hosts[1].ansible_facts_modified > ref_time
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'])
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'], batch_size=100)

View File

@@ -1,37 +0,0 @@
import json
from http import HTTPStatus
from unittest.mock import patch
from requests import Response
from awx.main.utils.licensing import Licenser
def test_rhsm_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'body': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('awx.main.utils.analytics_proxy.OIDCClient.make_request', new=mocked_requests_get):
subs = licenser.get_rhsm_subs('localhost', 'admin', 'admin')
assert subs == []
def test_satellite_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'results': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('requests.get', new=mocked_requests_get):
subs = licenser.get_satellite_subs('localhost', 'admin', 'admin')
assert subs == []

View File

@@ -23,7 +23,7 @@ class TokenError(requests.RequestException):
try:
client = OIDCClient(...)
client.make_request(...)
except TokenError as e:
except TokenGenerationError as e:
print(f"Token generation failed due to {e.__cause__}")
except requests.RequestException:
print("API request failed)
@@ -102,15 +102,13 @@ class OIDCClient:
self,
client_id: str,
client_secret: str,
token_url: str = DEFAULT_OIDC_TOKEN_ENDPOINT,
scopes: list[str] = None,
token_url: str,
scopes: list[str],
base_url: str = '',
) -> None:
self.client_id: str = client_id
self.client_secret: str = client_secret
self.token_url: str = token_url
if scopes is None:
scopes = ['api.console']
self.scopes = scopes
self.base_url: str = base_url
self.token: Optional[Token] = None

View File

@@ -38,7 +38,6 @@ from django.utils.translation import gettext_lazy as _
from awx_plugins.interfaces._temporary_private_licensing_api import detect_server_product_name
from awx.main.constants import SUBSCRIPTION_USAGE_MODEL_UNIQUE_HOSTS
from awx.main.utils.analytics_proxy import OIDCClient
MAX_INSTANCES = 9999999
@@ -229,38 +228,37 @@ class Licenser(object):
host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None)
if not user:
raise ValueError('subscriptions_client_id is required')
raise ValueError('subscriptions_username is required')
if not pw:
raise ValueError('subscriptions_client_secret is required')
raise ValueError('subscriptions_password is required')
if host and user and pw:
if 'subscription.rhsm.redhat.com' in host:
json = self.get_rhsm_subs(settings.SUBSCRIPTIONS_RHSM_URL, user, pw)
json = self.get_rhsm_subs(host, user, pw)
else:
json = self.get_satellite_subs(host, user, pw)
return self.generate_license_options_from_entitlements(json)
return []
def get_rhsm_subs(self, host, client_id, client_secret):
client = OIDCClient(client_id, client_secret)
subs = client.make_request(
'GET',
host,
verify=True,
timeout=(31, 31),
)
def get_rhsm_subs(self, host, user, pw):
verify = getattr(settings, 'REDHAT_CANDLEPIN_VERIFY', True)
json = []
try:
subs = requests.get('/'.join([host, 'subscription/users/{}/owners'.format(user)]), verify=verify, auth=(user, pw))
except requests.exceptions.ConnectionError as error:
raise error
except OSError as error:
raise OSError(
'Unable to open certificate bundle {}. Check that the service is running on Red Hat Enterprise Linux.'.format(verify)
) from error # noqa
subs.raise_for_status()
subs_formatted = []
for sku in subs.json()['body']:
sku_data = {k: v for k, v in sku.items() if k != 'subscriptions'}
for sub in sku['subscriptions']:
sub_data = sku_data.copy()
sub_data['subscriptions'] = sub
subs_formatted.append(sub_data)
return subs_formatted
for sub in subs.json():
resp = requests.get('/'.join([host, 'subscription/owners/{}/pools/?match=*tower*'.format(sub['key'])]), verify=verify, auth=(user, pw))
resp.raise_for_status()
json.extend(resp.json())
return json
def get_satellite_subs(self, host, user, pw):
port = None
@@ -269,7 +267,7 @@ class Licenser(object):
port = str(self.config.get("server", "port"))
except Exception as e:
logger.exception('Unable to read rhsm config to get ca_cert location. {}'.format(str(e)))
verify = True
verify = getattr(settings, 'REDHAT_CANDLEPIN_VERIFY', True)
if port:
host = ':'.join([host, port])
json = []
@@ -316,11 +314,20 @@ class Licenser(object):
return False
return True
def is_appropriate_sub(self, sub):
if sub['activeSubscription'] is False:
return False
# Products that contain Ansible Tower
products = sub.get('providedProducts', [])
if any(product.get('productId') == '480' for product in products):
return True
return False
def generate_license_options_from_entitlements(self, json):
from dateutil.parser import parse
ValidSub = collections.namedtuple(
'ValidSub', 'sku name support_level end_date trial developer_license quantity satellite subscription_id account_number usage'
'ValidSub', 'sku name support_level end_date trial developer_license quantity pool_id satellite subscription_id account_number usage'
)
valid_subs = []
for sub in json:
@@ -328,14 +335,10 @@ class Licenser(object):
if satellite:
is_valid = self.is_appropriate_sat_sub(sub)
else:
# the list of subs from console.redhat.com are already valid based on the query params we provided
is_valid = True
is_valid = self.is_appropriate_sub(sub)
if is_valid:
try:
if satellite:
end_date = parse(sub.get('endDate'))
else:
end_date = parse(sub['subscriptions']['endDate'])
end_date = parse(sub.get('endDate'))
except Exception:
continue
now = datetime.utcnow()
@@ -343,50 +346,44 @@ class Licenser(object):
if end_date < now:
# If the sub has a past end date, skip it
continue
try:
quantity = int(sub['quantity'])
if quantity == -1:
# effectively, unlimited
quantity = MAX_INSTANCES
except Exception:
continue
sku = sub['productId']
trial = sku.startswith('S') # i.e.,, SER/SVC
developer_license = False
support_level = ''
account_number = ''
usage = sub.get('usage', '')
usage = ''
pool_id = sub['id']
subscription_id = sub['subscriptionId']
account_number = sub['accountNumber']
if satellite:
try:
quantity = int(sub['quantity'])
except Exception:
continue
sku = sub['productId']
subscription_id = sub['subscriptionId']
sub_name = sub['productName']
support_level = sub['support_level']
account_number = sub['accountNumber']
usage = sub['usage']
else:
try:
if sub['capacity']['name'] == "Nodes":
quantity = int(sub['capacity']['quantity']) * int(sub['subscriptions']['quantity'])
else:
continue
except Exception:
continue
sku = sub['sku']
sub_name = sub['name']
support_level = sub['serviceLevel']
subscription_id = sub['subscriptions']['number']
if sub.get('name') == 'RHEL Developer':
developer_license = True
if quantity == -1:
# effectively, unlimited
quantity = MAX_INSTANCES
trial = sku.startswith('S') # i.e.,, SER/SVC
for attr in sub.get('productAttributes', []):
if attr.get('name') == 'support_level':
support_level = attr.get('value')
elif attr.get('name') == 'usage':
usage = attr.get('value')
elif attr.get('name') == 'ph_product_name' and attr.get('value') == 'RHEL Developer':
developer_license = True
valid_subs.append(
ValidSub(
sku,
sub_name,
sub['productName'],
support_level,
end_date,
trial,
developer_license,
quantity,
pool_id,
satellite,
subscription_id,
account_number,
@@ -417,6 +414,7 @@ class Licenser(object):
license._attrs['satellite'] = satellite
license._attrs['valid_key'] = True
license.update(license_date=int(sub.end_date.strftime('%s')))
license.update(pool_id=sub.pool_id)
license.update(subscription_id=sub.subscription_id)
license.update(account_number=sub.account_number)
licenses.append(license._attrs.copy())

View File

@@ -964,9 +964,6 @@ CLUSTER_HOST_ID = socket.gethostname()
# - 'unique_managed_hosts': Compliant = automated - deleted hosts (using /api/v2/host_metrics/)
SUBSCRIPTION_USAGE_MODEL = ''
# Default URL and query params for obtaining valid AAP subscriptions
SUBSCRIPTIONS_RHSM_URL = 'https://console.redhat.com/api/rhsm/v2/products?include=providedProducts&oids=480&status=Active'
# Host metrics cleanup - last time of the task/command run
CLEANUP_HOST_METRICS_LAST_TS = None
# Host metrics cleanup - minimal interval between two cleanups in days

View File

@@ -31,9 +31,9 @@ options:
unlicensed or trial licensed. When force=true, the license is always applied.
type: bool
default: 'False'
subscription_id:
pool_id:
description:
- Red Hat or Red Hat Satellite subscription_id to attach to
- Red Hat or Red Hat Satellite pool_id to attach to
required: False
type: str
state:
@@ -57,9 +57,9 @@ EXAMPLES = '''
username: "my_satellite_username"
password: "my_satellite_password"
- name: Attach to a subscription (requires fetching subscriptions at least once before)
- name: Attach to a pool (requires fetching subscriptions at least once before)
license:
subscription_id: 123456
pool_id: 123456
- name: Remove license
license:
@@ -75,14 +75,14 @@ def main():
module = ControllerAPIModule(
argument_spec=dict(
manifest=dict(type='str', required=False),
subscription_id=dict(type='str', required=False),
pool_id=dict(type='str', required=False),
force=dict(type='bool', default=False),
state=dict(choices=['present', 'absent'], default='present'),
),
required_if=[
['state', 'present', ['manifest', 'subscription_id'], True],
['state', 'present', ['manifest', 'pool_id'], True],
],
mutually_exclusive=[("manifest", "subscription_id")],
mutually_exclusive=[("manifest", "pool_id")],
)
json_output = {'changed': False}
@@ -124,7 +124,7 @@ def main():
if module.params.get('manifest', None):
module.post_endpoint('config', data={'manifest': manifest.decode()})
else:
module.post_endpoint('config/attach', data={'subscription_id': module.params.get('subscription_id')})
module.post_endpoint('config/attach', data={'pool_id': module.params.get('pool_id')})
module.exit_json(**json_output)

View File

@@ -20,15 +20,15 @@ description:
- Get subscriptions available to Automation Platform Controller. See
U(https://www.ansible.com/tower) for an overview.
options:
client_id:
username:
description:
- Red Hat service account client ID or Red Hat Satellite username to get available subscriptions.
- Red Hat or Red Hat Satellite username to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True
type: str
client_secret:
password:
description:
- Red Hat service account client secret or Red Hat Satellite password to get available subscriptions.
- Red Hat or Red Hat Satellite password to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True
type: str
@@ -53,13 +53,13 @@ subscriptions:
EXAMPLES = '''
- name: Get subscriptions
subscriptions:
client_id: "c6bd7594-d776-46e5-8156-6d17af147479"
client_secret: "MO9QUvoOZ5fc5JQKXoTch1AsTLI7nFsZ"
username: "my_username"
password: "My Password"
- name: Get subscriptions with a filter
subscriptions:
client_id: "c6bd7594-d776-46e5-8156-6d17af147479"
client_secret: "MO9QUvoOZ5fc5JQKXoTch1AsTLI7nFsZ"
username: "my_username"
password: "My Password"
filters:
product_name: "Red Hat Ansible Automation Platform"
support_level: "Self-Support"
@@ -72,8 +72,8 @@ def main():
module = ControllerAPIModule(
argument_spec=dict(
client_id=dict(type='str', required=True),
client_secret=dict(type='str', no_log=True, required=True),
username=dict(type='str', required=True),
password=dict(type='str', no_log=True, required=True),
filters=dict(type='dict', required=False, default={}),
),
)
@@ -82,8 +82,8 @@ def main():
# Check if Tower is already licensed
post_data = {
'subscriptions_client_secret': module.params.get('client_secret'),
'subscriptions_client_id': module.params.get('client_id'),
'subscriptions_password': module.params.get('password'),
'subscriptions_username': module.params.get('username'),
}
all_subscriptions = module.post_endpoint('config/subscriptions', data=post_data)['json']
json_output['subscriptions'] = []