mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
Merge branch 'devel' into fix_lang
This commit is contained in:
@@ -2,6 +2,7 @@ receptor_user: awx
|
||||
receptor_group: awx
|
||||
receptor_verify: true
|
||||
receptor_tls: true
|
||||
receptor_mintls13: false
|
||||
receptor_work_commands:
|
||||
ansible-runner:
|
||||
command: ansible-runner
|
||||
|
||||
31
awx/api/urls/analytics.py
Normal file
31
awx/api/urls/analytics.py
Normal file
@@ -0,0 +1,31 @@
|
||||
# Copyright (c) 2017 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
from django.urls import re_path
|
||||
|
||||
import awx.api.views.analytics as analytics
|
||||
|
||||
|
||||
urls = [
|
||||
re_path(r'^$', analytics.AnalyticsRootView.as_view(), name='analytics_root_view'),
|
||||
re_path(r'^authorized/$', analytics.AnalyticsAuthorizedView.as_view(), name='analytics_authorized'),
|
||||
re_path(r'^reports/$', analytics.AnalyticsReportsList.as_view(), name='analytics_reports_list'),
|
||||
re_path(r'^report/(?P<slug>[\w-]+)/$', analytics.AnalyticsReportDetail.as_view(), name='analytics_report_detail'),
|
||||
re_path(r'^report_options/$', analytics.AnalyticsReportOptionsList.as_view(), name='analytics_report_options_list'),
|
||||
re_path(r'^adoption_rate/$', analytics.AnalyticsAdoptionRateList.as_view(), name='analytics_adoption_rate'),
|
||||
re_path(r'^adoption_rate_options/$', analytics.AnalyticsAdoptionRateList.as_view(), name='analytics_adoption_rate_options'),
|
||||
re_path(r'^event_explorer/$', analytics.AnalyticsEventExplorerList.as_view(), name='analytics_event_explorer'),
|
||||
re_path(r'^event_explorer_options/$', analytics.AnalyticsEventExplorerList.as_view(), name='analytics_event_explorer_options'),
|
||||
re_path(r'^host_explorer/$', analytics.AnalyticsHostExplorerList.as_view(), name='analytics_host_explorer'),
|
||||
re_path(r'^host_explorer_options/$', analytics.AnalyticsHostExplorerList.as_view(), name='analytics_host_explorer_options'),
|
||||
re_path(r'^job_explorer/$', analytics.AnalyticsJobExplorerList.as_view(), name='analytics_job_explorer'),
|
||||
re_path(r'^job_explorer_options/$', analytics.AnalyticsJobExplorerList.as_view(), name='analytics_job_explorer_options'),
|
||||
re_path(r'^probe_templates/$', analytics.AnalyticsProbeTemplatesList.as_view(), name='analytics_probe_templates_explorer'),
|
||||
re_path(r'^probe_templates_options/$', analytics.AnalyticsProbeTemplatesList.as_view(), name='analytics_probe_templates_options'),
|
||||
re_path(r'^probe_template_for_hosts/$', analytics.AnalyticsProbeTemplateForHostsList.as_view(), name='analytics_probe_template_for_hosts_explorer'),
|
||||
re_path(r'^probe_template_for_hosts_options/$', analytics.AnalyticsProbeTemplateForHostsList.as_view(), name='analytics_probe_template_for_hosts_options'),
|
||||
re_path(r'^roi_templates/$', analytics.AnalyticsRoiTemplatesList.as_view(), name='analytics_roi_templates_explorer'),
|
||||
re_path(r'^roi_templates_options/$', analytics.AnalyticsRoiTemplatesList.as_view(), name='analytics_roi_templates_options'),
|
||||
]
|
||||
|
||||
__all__ = ['urls']
|
||||
@@ -42,6 +42,7 @@ from awx.api.views.bulk import (
|
||||
from awx.api.views.mesh_visualizer import MeshVisualizer
|
||||
|
||||
from awx.api.views.metrics import MetricsView
|
||||
from awx.api.views.analytics import AWX_ANALYTICS_API_PREFIX
|
||||
|
||||
from .organization import urls as organization_urls
|
||||
from .user import urls as user_urls
|
||||
@@ -82,7 +83,7 @@ from .oauth2 import urls as oauth2_urls
|
||||
from .oauth2_root import urls as oauth2_root_urls
|
||||
from .workflow_approval_template import urls as workflow_approval_template_urls
|
||||
from .workflow_approval import urls as workflow_approval_urls
|
||||
|
||||
from .analytics import urls as analytics_urls
|
||||
|
||||
v2_urls = [
|
||||
re_path(r'^$', ApiV2RootView.as_view(), name='api_v2_root_view'),
|
||||
@@ -147,6 +148,7 @@ v2_urls = [
|
||||
re_path(r'^unified_job_templates/$', UnifiedJobTemplateList.as_view(), name='unified_job_template_list'),
|
||||
re_path(r'^unified_jobs/$', UnifiedJobList.as_view(), name='unified_job_list'),
|
||||
re_path(r'^activity_stream/', include(activity_stream_urls)),
|
||||
re_path(rf'^{AWX_ANALYTICS_API_PREFIX}/', include(analytics_urls)),
|
||||
re_path(r'^workflow_approval_templates/', include(workflow_approval_template_urls)),
|
||||
re_path(r'^workflow_approvals/', include(workflow_approval_urls)),
|
||||
re_path(r'^bulk/$', BulkView.as_view(), name='bulk'),
|
||||
|
||||
297
awx/api/views/analytics.py
Normal file
297
awx/api/views/analytics.py
Normal file
@@ -0,0 +1,297 @@
|
||||
import requests
|
||||
import logging
|
||||
import urllib.parse as urlparse
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.utils import translation
|
||||
|
||||
from awx.api.generics import APIView, Response
|
||||
from awx.api.permissions import IsSystemAdminOrAuditor
|
||||
from awx.api.versioning import reverse
|
||||
from awx.main.utils import get_awx_version
|
||||
from rest_framework.permissions import AllowAny
|
||||
from rest_framework import status
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
AUTOMATION_ANALYTICS_API_URL_PATH = "/api/tower-analytics/v1"
|
||||
AWX_ANALYTICS_API_PREFIX = 'analytics'
|
||||
|
||||
ERROR_UPLOAD_NOT_ENABLED = "analytics-upload-not-enabled"
|
||||
ERROR_MISSING_URL = "missing-url"
|
||||
ERROR_MISSING_USER = "missing-user"
|
||||
ERROR_MISSING_PASSWORD = "missing-password"
|
||||
ERROR_NO_DATA_OR_ENTITLEMENT = "no-data-or-entitlement"
|
||||
ERROR_NOT_FOUND = "not-found"
|
||||
ERROR_UNAUTHORIZED = "unauthorized"
|
||||
ERROR_UNKNOWN = "unknown"
|
||||
ERROR_UNSUPPORTED_METHOD = "unsupported-method"
|
||||
|
||||
logger = logging.getLogger('awx.api.views.analytics')
|
||||
|
||||
|
||||
class MissingSettings(Exception):
|
||||
"""Settings are not correct Exception"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class GetNotAllowedMixin(object):
|
||||
def get(self, request, format=None):
|
||||
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
|
||||
|
||||
|
||||
class AnalyticsRootView(APIView):
|
||||
permission_classes = (AllowAny,)
|
||||
name = _('Automation Analytics')
|
||||
swagger_topic = 'Automation Analytics'
|
||||
|
||||
def get(self, request, format=None):
|
||||
data = OrderedDict()
|
||||
data['authorized'] = reverse('api:analytics_authorized')
|
||||
data['reports'] = reverse('api:analytics_reports_list')
|
||||
data['report_options'] = reverse('api:analytics_report_options_list')
|
||||
data['adoption_rate'] = reverse('api:analytics_adoption_rate')
|
||||
data['adoption_rate_options'] = reverse('api:analytics_adoption_rate_options')
|
||||
data['event_explorer'] = reverse('api:analytics_event_explorer')
|
||||
data['event_explorer_options'] = reverse('api:analytics_event_explorer_options')
|
||||
data['host_explorer'] = reverse('api:analytics_host_explorer')
|
||||
data['host_explorer_options'] = reverse('api:analytics_host_explorer_options')
|
||||
data['job_explorer'] = reverse('api:analytics_job_explorer')
|
||||
data['job_explorer_options'] = reverse('api:analytics_job_explorer_options')
|
||||
data['probe_templates'] = reverse('api:analytics_probe_templates_explorer')
|
||||
data['probe_templates_options'] = reverse('api:analytics_probe_templates_options')
|
||||
data['probe_template_for_hosts'] = reverse('api:analytics_probe_template_for_hosts_explorer')
|
||||
data['probe_template_for_hosts_options'] = reverse('api:analytics_probe_template_for_hosts_options')
|
||||
data['roi_templates'] = reverse('api:analytics_roi_templates_explorer')
|
||||
data['roi_templates_options'] = reverse('api:analytics_roi_templates_options')
|
||||
return Response(data)
|
||||
|
||||
|
||||
class AnalyticsGenericView(APIView):
|
||||
"""
|
||||
Example:
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
params = {
|
||||
'limit': '20',
|
||||
'offset': '0',
|
||||
'sort_by': 'name:asc',
|
||||
}
|
||||
|
||||
json_data = {
|
||||
'limit': '20',
|
||||
'offset': '0',
|
||||
'sort_options': 'name',
|
||||
'sort_order': 'asc',
|
||||
'tags': [],
|
||||
'slug': [],
|
||||
'name': [],
|
||||
'description': '',
|
||||
}
|
||||
|
||||
response = requests.post(f'{AUTOMATION_ANALYTICS_API_URL}/reports/', params=params,
|
||||
headers=headers, json=json_data)
|
||||
|
||||
return Response(response.json(), status=response.status_code)
|
||||
"""
|
||||
|
||||
permission_classes = (IsSystemAdminOrAuditor,)
|
||||
|
||||
@staticmethod
|
||||
def _request_headers(request):
|
||||
headers = {}
|
||||
for header in ['Content-Type', 'Content-Length', 'Accept-Encoding', 'User-Agent', 'Accept']:
|
||||
if request.headers.get(header, None):
|
||||
headers[header] = request.headers.get(header)
|
||||
headers['X-Rh-Analytics-Source'] = 'controller'
|
||||
headers['X-Rh-Analytics-Source-Version'] = get_awx_version()
|
||||
headers['Accept-Language'] = translation.get_language()
|
||||
|
||||
return headers
|
||||
|
||||
@staticmethod
|
||||
def _get_analytics_path(request_path):
|
||||
parts = request_path.split(f'{AWX_ANALYTICS_API_PREFIX}/')
|
||||
path_specific = parts[-1]
|
||||
return f"{AUTOMATION_ANALYTICS_API_URL_PATH}/{path_specific}"
|
||||
|
||||
def _get_analytics_url(self, request_path):
|
||||
analytics_path = self._get_analytics_path(request_path)
|
||||
url = getattr(settings, 'AUTOMATION_ANALYTICS_URL', None)
|
||||
if not url:
|
||||
raise MissingSettings(ERROR_MISSING_URL)
|
||||
url_parts = urlparse.urlsplit(url)
|
||||
analytics_url = urlparse.urlunsplit([url_parts.scheme, url_parts.netloc, analytics_path, url_parts.query, url_parts.fragment])
|
||||
return analytics_url
|
||||
|
||||
@staticmethod
|
||||
def _get_setting(setting_name, default, error_message):
|
||||
setting = getattr(settings, setting_name, default)
|
||||
if not setting:
|
||||
raise MissingSettings(error_message)
|
||||
return setting
|
||||
|
||||
@staticmethod
|
||||
def _error_response(keyword, message=None, remote=True, remote_status_code=None, status_code=status.HTTP_403_FORBIDDEN):
|
||||
text = {"error": {"remote": remote, "remote_status": remote_status_code, "keyword": keyword}}
|
||||
if message:
|
||||
text["error"]["message"] = message
|
||||
return Response(text, status=status_code)
|
||||
|
||||
def _error_response_404(self, response):
|
||||
try:
|
||||
json_response = response.json()
|
||||
# Subscription/entitlement problem or missing tenant data in AA db => HTTP 403
|
||||
message = json_response.get('error', None)
|
||||
if message:
|
||||
return self._error_response(ERROR_NO_DATA_OR_ENTITLEMENT, message, remote=True, remote_status_code=response.status_code)
|
||||
|
||||
# Standard 404 problem => HTTP 404
|
||||
message = json_response.get('detail', None) or response.text
|
||||
except requests.exceptions.JSONDecodeError:
|
||||
# Unexpected text => still HTTP 404
|
||||
message = response.text
|
||||
|
||||
return self._error_response(ERROR_NOT_FOUND, message, remote=True, remote_status_code=status.HTTP_404_NOT_FOUND, status_code=status.HTTP_404_NOT_FOUND)
|
||||
|
||||
@staticmethod
|
||||
def _update_response_links(json_response):
|
||||
if not json_response.get('links', None):
|
||||
return
|
||||
|
||||
for key, value in json_response['links'].items():
|
||||
if value:
|
||||
json_response['links'][key] = value.replace(AUTOMATION_ANALYTICS_API_URL_PATH, f"/api/v2/{AWX_ANALYTICS_API_PREFIX}")
|
||||
|
||||
def _forward_response(self, response):
|
||||
try:
|
||||
content_type = response.headers.get('content-type', '')
|
||||
if content_type.find('application/json') != -1:
|
||||
json_response = response.json()
|
||||
self._update_response_links(json_response)
|
||||
|
||||
return Response(json_response, status=response.status_code)
|
||||
except Exception as e:
|
||||
logger.error(f"Analytics API: Response error: {e}")
|
||||
|
||||
return Response(response.content, status=response.status_code)
|
||||
|
||||
def _send_to_analytics(self, request, method):
|
||||
try:
|
||||
headers = self._request_headers(request)
|
||||
|
||||
self._get_setting('INSIGHTS_TRACKING_STATE', False, ERROR_UPLOAD_NOT_ENABLED)
|
||||
url = self._get_analytics_url(request.path)
|
||||
rh_user = self._get_setting('REDHAT_USERNAME', None, ERROR_MISSING_USER)
|
||||
rh_password = self._get_setting('REDHAT_PASSWORD', None, ERROR_MISSING_PASSWORD)
|
||||
|
||||
if method not in ["GET", "POST", "OPTIONS"]:
|
||||
return self._error_response(ERROR_UNSUPPORTED_METHOD, method, remote=False, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||||
else:
|
||||
response = requests.request(
|
||||
method,
|
||||
url,
|
||||
auth=(rh_user, rh_password),
|
||||
verify=settings.INSIGHTS_CERT_PATH,
|
||||
params=request.query_params,
|
||||
headers=headers,
|
||||
json=request.data,
|
||||
timeout=(31, 31),
|
||||
)
|
||||
#
|
||||
# Missing or wrong user/pass
|
||||
#
|
||||
if response.status_code == status.HTTP_401_UNAUTHORIZED:
|
||||
text = (response.text or '').rstrip("\n")
|
||||
return self._error_response(ERROR_UNAUTHORIZED, text, remote=True, remote_status_code=response.status_code)
|
||||
#
|
||||
# Not found, No entitlement or No data in Analytics
|
||||
#
|
||||
elif response.status_code == status.HTTP_404_NOT_FOUND:
|
||||
return self._error_response_404(response)
|
||||
#
|
||||
# Success or not a 401/404 errors are just forwarded
|
||||
#
|
||||
else:
|
||||
return self._forward_response(response)
|
||||
|
||||
except MissingSettings as e:
|
||||
logger.warning(f"Analytics API: Setting missing: {e.args[0]}")
|
||||
return self._error_response(e.args[0], remote=False)
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Analytics API: Request error: {e}")
|
||||
return self._error_response(ERROR_UNKNOWN, str(e), remote=False, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||||
except Exception as e:
|
||||
logger.error(f"Analytics API: Error: {e}")
|
||||
return self._error_response(ERROR_UNKNOWN, str(e), remote=False, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||||
|
||||
|
||||
class AnalyticsGenericListView(AnalyticsGenericView):
|
||||
def get(self, request, format=None):
|
||||
return self._send_to_analytics(request, method="GET")
|
||||
|
||||
def post(self, request, format=None):
|
||||
return self._send_to_analytics(request, method="POST")
|
||||
|
||||
def options(self, request, format=None):
|
||||
return self._send_to_analytics(request, method="OPTIONS")
|
||||
|
||||
|
||||
class AnalyticsGenericDetailView(AnalyticsGenericView):
|
||||
def get(self, request, slug, format=None):
|
||||
return self._send_to_analytics(request, method="GET")
|
||||
|
||||
def post(self, request, slug, format=None):
|
||||
return self._send_to_analytics(request, method="POST")
|
||||
|
||||
def options(self, request, slug, format=None):
|
||||
return self._send_to_analytics(request, method="OPTIONS")
|
||||
|
||||
|
||||
class AnalyticsAuthorizedView(AnalyticsGenericListView):
|
||||
name = _("Authorized")
|
||||
|
||||
|
||||
class AnalyticsReportsList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("Reports")
|
||||
swagger_topic = "Automation Analytics"
|
||||
|
||||
|
||||
class AnalyticsReportDetail(AnalyticsGenericDetailView):
|
||||
name = _("Report")
|
||||
|
||||
|
||||
class AnalyticsReportOptionsList(AnalyticsGenericListView):
|
||||
name = _("Report Options")
|
||||
|
||||
|
||||
class AnalyticsAdoptionRateList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("Adoption Rate")
|
||||
|
||||
|
||||
class AnalyticsEventExplorerList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("Event Explorer")
|
||||
|
||||
|
||||
class AnalyticsHostExplorerList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("Host Explorer")
|
||||
|
||||
|
||||
class AnalyticsJobExplorerList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("Job Explorer")
|
||||
|
||||
|
||||
class AnalyticsProbeTemplatesList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("Probe Templates")
|
||||
|
||||
|
||||
class AnalyticsProbeTemplateForHostsList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("Probe Template For Hosts")
|
||||
|
||||
|
||||
class AnalyticsRoiTemplatesList(GetNotAllowedMixin, AnalyticsGenericListView):
|
||||
name = _("ROI Templates")
|
||||
@@ -126,6 +126,7 @@ class ApiVersionRootView(APIView):
|
||||
data['workflow_job_nodes'] = reverse('api:workflow_job_node_list', request=request)
|
||||
data['mesh_visualizer'] = reverse('api:mesh_visualizer_view', request=request)
|
||||
data['bulk'] = reverse('api:bulk', request=request)
|
||||
data['analytics'] = reverse('api:analytics_root_view', request=request)
|
||||
return Response(data)
|
||||
|
||||
|
||||
|
||||
@@ -5,11 +5,13 @@ import threading
|
||||
import time
|
||||
import os
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
# Django
|
||||
from django.conf import LazySettings
|
||||
from django.conf import settings, UserSettingsHolder
|
||||
from django.core.cache import cache as django_cache
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
from django.core.exceptions import ImproperlyConfigured, SynchronousOnlyOperation
|
||||
from django.db import transaction, connection
|
||||
from django.db.utils import Error as DBError, ProgrammingError
|
||||
from django.utils.functional import cached_property
|
||||
@@ -157,7 +159,7 @@ class EncryptedCacheProxy(object):
|
||||
obj_id = self.cache.get(Setting.get_cache_id_key(key), default=empty)
|
||||
if obj_id is empty:
|
||||
logger.info('Efficiency notice: Corresponding id not stored in cache %s', Setting.get_cache_id_key(key))
|
||||
obj_id = getattr(self._get_setting_from_db(key), 'pk', None)
|
||||
obj_id = getattr(_get_setting_from_db(self.registry, key), 'pk', None)
|
||||
elif obj_id == SETTING_CACHE_NONE:
|
||||
obj_id = None
|
||||
return method(TransientSetting(pk=obj_id, value=value), 'value')
|
||||
@@ -166,11 +168,6 @@ class EncryptedCacheProxy(object):
|
||||
# a no-op; it just returns the provided value
|
||||
return value
|
||||
|
||||
def _get_setting_from_db(self, key):
|
||||
field = self.registry.get_setting_field(key)
|
||||
if not field.read_only:
|
||||
return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first()
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.cache, name)
|
||||
|
||||
@@ -186,6 +183,22 @@ def get_settings_to_cache(registry):
|
||||
return dict([(key, SETTING_CACHE_NOTSET) for key in get_writeable_settings(registry)])
|
||||
|
||||
|
||||
# Will first attempt to get the setting from the database in synchronous mode.
|
||||
# If call from async context, it will attempt to get the setting from the database in a thread.
|
||||
def _get_setting_from_db(registry, key):
|
||||
def get_settings_from_db_sync(registry, key):
|
||||
field = registry.get_setting_field(key)
|
||||
if not field.read_only or key == 'INSTALL_UUID':
|
||||
return Setting.objects.filter(key=key, user__isnull=True).order_by('pk').first()
|
||||
|
||||
try:
|
||||
return get_settings_from_db_sync(registry, key)
|
||||
except SynchronousOnlyOperation:
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(get_settings_from_db_sync, registry, key)
|
||||
return future.result()
|
||||
|
||||
|
||||
def get_cache_value(value):
|
||||
"""Returns the proper special cache setting for a value
|
||||
based on instance type.
|
||||
@@ -345,7 +358,7 @@ class SettingsWrapper(UserSettingsHolder):
|
||||
setting_id = None
|
||||
# this value is read-only, however we *do* want to fetch its value from the database
|
||||
if not field.read_only or name == 'INSTALL_UUID':
|
||||
setting = Setting.objects.filter(key=name, user__isnull=True).order_by('pk').first()
|
||||
setting = _get_setting_from_db(self.registry, name)
|
||||
if setting:
|
||||
if getattr(field, 'encrypted', False):
|
||||
value = decrypt_field(setting, 'value')
|
||||
|
||||
@@ -94,9 +94,7 @@ def test_setting_singleton_retrieve_readonly(api_request, dummy_setting):
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_setting_singleton_update(api_request, dummy_setting):
|
||||
with dummy_setting('FOO_BAR', field_class=fields.IntegerField, category='FooBar', category_slug='foobar'), mock.patch(
|
||||
'awx.conf.views.handle_setting_changes'
|
||||
):
|
||||
with dummy_setting('FOO_BAR', field_class=fields.IntegerField, category='FooBar', category_slug='foobar'), mock.patch('awx.conf.views.clear_setting_cache'):
|
||||
api_request('patch', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}), data={'FOO_BAR': 3})
|
||||
response = api_request('get', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}))
|
||||
assert response.data['FOO_BAR'] == 3
|
||||
@@ -112,7 +110,7 @@ def test_setting_singleton_update_hybriddictfield_with_forbidden(api_request, du
|
||||
# sure that the _Forbidden validator doesn't get used for the
|
||||
# fields. See also https://github.com/ansible/awx/issues/4099.
|
||||
with dummy_setting('FOO_BAR', field_class=sso_fields.SAMLOrgAttrField, category='FooBar', category_slug='foobar'), mock.patch(
|
||||
'awx.conf.views.handle_setting_changes'
|
||||
'awx.conf.views.clear_setting_cache'
|
||||
):
|
||||
api_request(
|
||||
'patch',
|
||||
@@ -126,7 +124,7 @@ def test_setting_singleton_update_hybriddictfield_with_forbidden(api_request, du
|
||||
@pytest.mark.django_db
|
||||
def test_setting_singleton_update_dont_change_readonly_fields(api_request, dummy_setting):
|
||||
with dummy_setting('FOO_BAR', field_class=fields.IntegerField, read_only=True, default=4, category='FooBar', category_slug='foobar'), mock.patch(
|
||||
'awx.conf.views.handle_setting_changes'
|
||||
'awx.conf.views.clear_setting_cache'
|
||||
):
|
||||
api_request('patch', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}), data={'FOO_BAR': 5})
|
||||
response = api_request('get', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}))
|
||||
@@ -136,7 +134,7 @@ def test_setting_singleton_update_dont_change_readonly_fields(api_request, dummy
|
||||
@pytest.mark.django_db
|
||||
def test_setting_singleton_update_dont_change_encrypted_mark(api_request, dummy_setting):
|
||||
with dummy_setting('FOO_BAR', field_class=fields.CharField, encrypted=True, category='FooBar', category_slug='foobar'), mock.patch(
|
||||
'awx.conf.views.handle_setting_changes'
|
||||
'awx.conf.views.clear_setting_cache'
|
||||
):
|
||||
api_request('patch', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}), data={'FOO_BAR': 'password'})
|
||||
assert Setting.objects.get(key='FOO_BAR').value.startswith('$encrypted$')
|
||||
@@ -155,16 +153,14 @@ def test_setting_singleton_update_runs_custom_validate(api_request, dummy_settin
|
||||
|
||||
with dummy_setting('FOO_BAR', field_class=fields.IntegerField, category='FooBar', category_slug='foobar'), dummy_validate(
|
||||
'foobar', func_raising_exception
|
||||
), mock.patch('awx.conf.views.handle_setting_changes'):
|
||||
), mock.patch('awx.conf.views.clear_setting_cache'):
|
||||
response = api_request('patch', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}), data={'FOO_BAR': 23})
|
||||
assert response.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_setting_singleton_delete(api_request, dummy_setting):
|
||||
with dummy_setting('FOO_BAR', field_class=fields.IntegerField, category='FooBar', category_slug='foobar'), mock.patch(
|
||||
'awx.conf.views.handle_setting_changes'
|
||||
):
|
||||
with dummy_setting('FOO_BAR', field_class=fields.IntegerField, category='FooBar', category_slug='foobar'), mock.patch('awx.conf.views.clear_setting_cache'):
|
||||
api_request('delete', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}))
|
||||
response = api_request('get', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}))
|
||||
assert not response.data['FOO_BAR']
|
||||
@@ -173,7 +169,7 @@ def test_setting_singleton_delete(api_request, dummy_setting):
|
||||
@pytest.mark.django_db
|
||||
def test_setting_singleton_delete_no_read_only_fields(api_request, dummy_setting):
|
||||
with dummy_setting('FOO_BAR', field_class=fields.IntegerField, read_only=True, default=23, category='FooBar', category_slug='foobar'), mock.patch(
|
||||
'awx.conf.views.handle_setting_changes'
|
||||
'awx.conf.views.clear_setting_cache'
|
||||
):
|
||||
api_request('delete', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}))
|
||||
response = api_request('get', reverse('api:setting_singleton_detail', kwargs={'category_slug': 'foobar'}))
|
||||
|
||||
@@ -26,10 +26,11 @@ from awx.api.generics import APIView, GenericAPIView, ListAPIView, RetrieveUpdat
|
||||
from awx.api.permissions import IsSystemAdminOrAuditor
|
||||
from awx.api.versioning import reverse
|
||||
from awx.main.utils import camelcase_to_underscore
|
||||
from awx.main.tasks.system import handle_setting_changes
|
||||
from awx.main.tasks.system import clear_setting_cache
|
||||
from awx.conf.models import Setting
|
||||
from awx.conf.serializers import SettingCategorySerializer, SettingSingletonSerializer
|
||||
from awx.conf import settings_registry
|
||||
from awx.main.utils.external_logging import reconfigure_rsyslog
|
||||
|
||||
|
||||
SettingCategory = collections.namedtuple('SettingCategory', ('url', 'slug', 'name'))
|
||||
@@ -118,7 +119,10 @@ class SettingSingletonDetail(RetrieveUpdateDestroyAPIView):
|
||||
setting.save(update_fields=['value'])
|
||||
settings_change_list.append(key)
|
||||
if settings_change_list:
|
||||
connection.on_commit(lambda: handle_setting_changes.delay(settings_change_list))
|
||||
connection.on_commit(lambda: clear_setting_cache.delay(settings_change_list))
|
||||
if any([setting.startswith('LOG_AGGREGATOR') for setting in settings_change_list]):
|
||||
# call notify to rsyslog. no data is need so payload is empty
|
||||
reconfigure_rsyslog.delay()
|
||||
|
||||
def destroy(self, request, *args, **kwargs):
|
||||
instance = self.get_object()
|
||||
@@ -133,7 +137,10 @@ class SettingSingletonDetail(RetrieveUpdateDestroyAPIView):
|
||||
setting.delete()
|
||||
settings_change_list.append(setting.key)
|
||||
if settings_change_list:
|
||||
connection.on_commit(lambda: handle_setting_changes.delay(settings_change_list))
|
||||
connection.on_commit(lambda: clear_setting_cache.delay(settings_change_list))
|
||||
if any([setting.startswith('LOG_AGGREGATOR') for setting in settings_change_list]):
|
||||
# call notify to rsyslog. no data is need so payload is empty
|
||||
reconfigure_rsyslog.delay()
|
||||
|
||||
# When TOWER_URL_BASE is deleted from the API, reset it to the hostname
|
||||
# used to make the request as a default.
|
||||
|
||||
@@ -4,11 +4,11 @@ import logging
|
||||
# AWX
|
||||
from awx.main.analytics.subsystem_metrics import Metrics
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
|
||||
logger = logging.getLogger('awx.main.scheduler')
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def send_subsystem_metrics():
|
||||
Metrics().send_metrics()
|
||||
|
||||
@@ -65,7 +65,7 @@ class FixedSlidingWindow:
|
||||
return sum(self.buckets.values()) or 0
|
||||
|
||||
|
||||
class BroadcastWebsocketStatsManager:
|
||||
class RelayWebsocketStatsManager:
|
||||
def __init__(self, event_loop, local_hostname):
|
||||
self._local_hostname = local_hostname
|
||||
|
||||
@@ -74,7 +74,7 @@ class BroadcastWebsocketStatsManager:
|
||||
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME
|
||||
|
||||
def new_remote_host_stats(self, remote_hostname):
|
||||
self._stats[remote_hostname] = BroadcastWebsocketStats(self._local_hostname, remote_hostname)
|
||||
self._stats[remote_hostname] = RelayWebsocketStats(self._local_hostname, remote_hostname)
|
||||
return self._stats[remote_hostname]
|
||||
|
||||
def delete_remote_host_stats(self, remote_hostname):
|
||||
@@ -107,7 +107,7 @@ class BroadcastWebsocketStatsManager:
|
||||
return parser.text_string_to_metric_families(stats_str.decode('UTF-8'))
|
||||
|
||||
|
||||
class BroadcastWebsocketStats:
|
||||
class RelayWebsocketStats:
|
||||
def __init__(self, local_hostname, remote_hostname):
|
||||
self._local_hostname = local_hostname
|
||||
self._remote_hostname = remote_hostname
|
||||
|
||||
@@ -6,7 +6,7 @@ import platform
|
||||
import distro
|
||||
|
||||
from django.db import connection
|
||||
from django.db.models import Count
|
||||
from django.db.models import Count, Min
|
||||
from django.conf import settings
|
||||
from django.contrib.sessions.models import Session
|
||||
from django.utils.timezone import now, timedelta
|
||||
@@ -35,7 +35,7 @@ data _since_ the last report date - i.e., new data in the last 24 hours)
|
||||
"""
|
||||
|
||||
|
||||
def trivial_slicing(key, since, until, last_gather):
|
||||
def trivial_slicing(key, since, until, last_gather, **kwargs):
|
||||
if since is not None:
|
||||
return [(since, until)]
|
||||
|
||||
@@ -48,7 +48,7 @@ def trivial_slicing(key, since, until, last_gather):
|
||||
return [(last_entry, until)]
|
||||
|
||||
|
||||
def four_hour_slicing(key, since, until, last_gather):
|
||||
def four_hour_slicing(key, since, until, last_gather, **kwargs):
|
||||
if since is not None:
|
||||
last_entry = since
|
||||
else:
|
||||
@@ -69,6 +69,54 @@ def four_hour_slicing(key, since, until, last_gather):
|
||||
start = end
|
||||
|
||||
|
||||
def host_metric_slicing(key, since, until, last_gather, **kwargs):
|
||||
"""
|
||||
Slicing doesn't start 4 weeks ago, but sends whole table monthly or first time
|
||||
"""
|
||||
from awx.main.models.inventory import HostMetric
|
||||
|
||||
if since is not None:
|
||||
return [(since, until)]
|
||||
|
||||
from awx.conf.models import Setting
|
||||
|
||||
# Check if full sync should be done
|
||||
full_sync_enabled = kwargs.get('full_sync_enabled', False)
|
||||
last_entry = None
|
||||
if not full_sync_enabled:
|
||||
#
|
||||
# If not, try incremental sync first
|
||||
#
|
||||
last_entries = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_ENTRIES').first()
|
||||
last_entries = json.loads((last_entries.value if last_entries is not None else '') or '{}', object_hook=datetime_hook)
|
||||
last_entry = last_entries.get(key)
|
||||
if not last_entry:
|
||||
#
|
||||
# If not done before, switch to full sync
|
||||
#
|
||||
full_sync_enabled = True
|
||||
|
||||
if full_sync_enabled:
|
||||
#
|
||||
# Find the lowest date for full sync
|
||||
#
|
||||
min_dates = HostMetric.objects.aggregate(min_last_automation=Min('last_automation'), min_last_deleted=Min('last_deleted'))
|
||||
if min_dates['min_last_automation'] and min_dates['min_last_deleted']:
|
||||
last_entry = min(min_dates['min_last_automation'], min_dates['min_last_deleted'])
|
||||
elif min_dates['min_last_automation'] or min_dates['min_last_deleted']:
|
||||
last_entry = min_dates['min_last_automation'] or min_dates['min_last_deleted']
|
||||
|
||||
if not last_entry:
|
||||
# empty table
|
||||
return []
|
||||
|
||||
start, end = last_entry, None
|
||||
while start < until:
|
||||
end = min(start + timedelta(days=30), until)
|
||||
yield (start, end)
|
||||
start = end
|
||||
|
||||
|
||||
def _identify_lower(key, since, until, last_gather):
|
||||
from awx.conf.models import Setting
|
||||
|
||||
@@ -537,3 +585,25 @@ def workflow_job_template_node_table(since, full_path, **kwargs):
|
||||
) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id
|
||||
ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER'''
|
||||
return _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path)
|
||||
|
||||
|
||||
@register(
|
||||
'host_metric_table', '1.0', format='csv', description=_('Host Metric data, incremental/full sync'), expensive=host_metric_slicing, full_sync_interval=30
|
||||
)
|
||||
def host_metric_table(since, full_path, until, **kwargs):
|
||||
host_metric_query = '''COPY (SELECT main_hostmetric.id,
|
||||
main_hostmetric.hostname,
|
||||
main_hostmetric.first_automation,
|
||||
main_hostmetric.last_automation,
|
||||
main_hostmetric.last_deleted,
|
||||
main_hostmetric.deleted,
|
||||
main_hostmetric.automated_counter,
|
||||
main_hostmetric.deleted_counter,
|
||||
main_hostmetric.used_in_inventories
|
||||
FROM main_hostmetric
|
||||
WHERE (main_hostmetric.last_automation > '{}' AND main_hostmetric.last_automation <= '{}') OR
|
||||
(main_hostmetric.last_deleted > '{}' AND main_hostmetric.last_deleted <= '{}')
|
||||
ORDER BY main_hostmetric.id ASC) TO STDOUT WITH CSV HEADER'''.format(
|
||||
since.isoformat(), until.isoformat(), since.isoformat(), until.isoformat()
|
||||
)
|
||||
return _copy_table(table='host_metric', query=host_metric_query, path=full_path)
|
||||
|
||||
@@ -52,7 +52,7 @@ def all_collectors():
|
||||
}
|
||||
|
||||
|
||||
def register(key, version, description=None, format='json', expensive=None):
|
||||
def register(key, version, description=None, format='json', expensive=None, full_sync_interval=None):
|
||||
"""
|
||||
A decorator used to register a function as a metric collector.
|
||||
|
||||
@@ -71,6 +71,7 @@ def register(key, version, description=None, format='json', expensive=None):
|
||||
f.__awx_analytics_description__ = description
|
||||
f.__awx_analytics_type__ = format
|
||||
f.__awx_expensive__ = expensive
|
||||
f.__awx_full_sync_interval__ = full_sync_interval
|
||||
return f
|
||||
|
||||
return decorate
|
||||
@@ -259,10 +260,19 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
|
||||
# These slicer functions may return a generator. The `since` parameter is
|
||||
# allowed to be None, and will fall back to LAST_ENTRIES[key] or to
|
||||
# LAST_GATHER (truncated appropriately to match the 4-week limit).
|
||||
#
|
||||
# Or it can force full table sync if interval is given
|
||||
kwargs = dict()
|
||||
full_sync_enabled = False
|
||||
if func.__awx_full_sync_interval__:
|
||||
last_full_sync = last_entries.get(f"{key}_full")
|
||||
full_sync_enabled = not last_full_sync or last_full_sync < now() - timedelta(days=func.__awx_full_sync_interval__)
|
||||
|
||||
kwargs['full_sync_enabled'] = full_sync_enabled
|
||||
if func.__awx_expensive__:
|
||||
slices = func.__awx_expensive__(key, since, until, last_gather)
|
||||
slices = func.__awx_expensive__(key, since, until, last_gather, **kwargs)
|
||||
else:
|
||||
slices = collectors.trivial_slicing(key, since, until, last_gather)
|
||||
slices = collectors.trivial_slicing(key, since, until, last_gather, **kwargs)
|
||||
|
||||
for start, end in slices:
|
||||
files = func(start, full_path=gather_dir, until=end)
|
||||
@@ -301,6 +311,12 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
|
||||
succeeded = False
|
||||
logger.exception("Could not generate metric {}".format(filename))
|
||||
|
||||
# update full sync timestamp if successfully shipped
|
||||
if full_sync_enabled and collection_type != 'dry-run' and succeeded:
|
||||
with disable_activity_stream():
|
||||
last_entries[f"{key}_full"] = now()
|
||||
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
|
||||
|
||||
if collection_type != 'dry-run':
|
||||
if succeeded:
|
||||
for fpath in tarfiles:
|
||||
@@ -359,9 +375,7 @@ def ship(path):
|
||||
s.headers = get_awx_http_client_headers()
|
||||
s.headers.pop('Content-Type')
|
||||
with set_environ(**settings.AWX_TASK_ENV):
|
||||
response = s.post(
|
||||
url, files=files, verify="/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31)
|
||||
)
|
||||
response = s.post(url, files=files, verify=settings.INSIGHTS_CERT_PATH, auth=(rh_user, rh_password), headers=s.headers, timeout=(31, 31))
|
||||
# Accept 2XX status_codes
|
||||
if response.status_code >= 300:
|
||||
logger.error('Upload failed with status {}, {}'.format(response.status_code, response.text))
|
||||
|
||||
@@ -9,7 +9,7 @@ from django.apps import apps
|
||||
from awx.main.consumers import emit_channel_notification
|
||||
from awx.main.utils import is_testing
|
||||
|
||||
root_key = 'awx_metrics'
|
||||
root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
|
||||
logger = logging.getLogger('awx.main.analytics')
|
||||
|
||||
|
||||
@@ -264,13 +264,6 @@ class Metrics:
|
||||
data[field] = self.METRICS[field].decode(self.conn)
|
||||
return data
|
||||
|
||||
def store_metrics(self, data_json):
|
||||
# called when receiving metrics from other instances
|
||||
data = json.loads(data_json)
|
||||
if self.instance_name != data['instance']:
|
||||
logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}")
|
||||
self.conn.set(root_key + "_instance_" + data['instance'], data['metrics'])
|
||||
|
||||
def should_pipe_execute(self):
|
||||
if self.metrics_have_changed is False:
|
||||
return False
|
||||
@@ -309,9 +302,9 @@ class Metrics:
|
||||
'instance': self.instance_name,
|
||||
'metrics': self.serialize_local_metrics(),
|
||||
}
|
||||
# store a local copy as well
|
||||
self.store_metrics(json.dumps(payload))
|
||||
|
||||
emit_channel_notification("metrics", payload)
|
||||
|
||||
self.previous_send_metrics.set(current_time)
|
||||
self.previous_send_metrics.store_value(self.conn)
|
||||
finally:
|
||||
|
||||
@@ -822,6 +822,15 @@ register(
|
||||
category_slug='system',
|
||||
)
|
||||
|
||||
register(
|
||||
'CLEANUP_HOST_METRICS_LAST_TS',
|
||||
field_class=fields.DateTimeField,
|
||||
label=_('Last cleanup date for HostMetrics'),
|
||||
allow_null=True,
|
||||
category=_('System'),
|
||||
category_slug='system',
|
||||
)
|
||||
|
||||
|
||||
def logging_validate(serializer, attrs):
|
||||
if not serializer.instance or not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or not hasattr(serializer.instance, 'LOG_AGGREGATOR_TYPE'):
|
||||
|
||||
@@ -65,7 +65,7 @@ ENV_BLOCKLIST = frozenset(
|
||||
'INVENTORY_HOSTVARS',
|
||||
'AWX_HOST',
|
||||
'PROJECT_REVISION',
|
||||
'SUPERVISOR_WEB_CONFIG_PATH',
|
||||
'SUPERVISOR_CONFIG_PATH',
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import logging
|
||||
import time
|
||||
import hmac
|
||||
import asyncio
|
||||
import redis
|
||||
|
||||
from django.core.serializers.json import DjangoJSONEncoder
|
||||
from django.conf import settings
|
||||
@@ -80,7 +81,7 @@ class WebsocketSecretAuthHelper:
|
||||
WebsocketSecretAuthHelper.verify_secret(secret)
|
||||
|
||||
|
||||
class BroadcastConsumer(AsyncJsonWebsocketConsumer):
|
||||
class RelayConsumer(AsyncJsonWebsocketConsumer):
|
||||
async def connect(self):
|
||||
try:
|
||||
WebsocketSecretAuthHelper.is_authorized(self.scope)
|
||||
@@ -100,6 +101,21 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer):
|
||||
async def internal_message(self, event):
|
||||
await self.send(event['text'])
|
||||
|
||||
async def receive_json(self, data):
|
||||
(group, message) = unwrap_broadcast_msg(data)
|
||||
if group == "metrics":
|
||||
message = json.loads(message['text'])
|
||||
conn = redis.Redis.from_url(settings.BROKER_URL)
|
||||
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics'])
|
||||
else:
|
||||
await self.channel_layer.group_send(group, message)
|
||||
|
||||
async def consumer_subscribe(self, event):
|
||||
await self.send_json(event)
|
||||
|
||||
async def consumer_unsubscribe(self, event):
|
||||
await self.send_json(event)
|
||||
|
||||
|
||||
class EventConsumer(AsyncJsonWebsocketConsumer):
|
||||
async def connect(self):
|
||||
@@ -128,6 +144,11 @@ class EventConsumer(AsyncJsonWebsocketConsumer):
|
||||
self.channel_name,
|
||||
)
|
||||
|
||||
await self.channel_layer.group_send(
|
||||
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
|
||||
{"type": "consumer.unsubscribe", "groups": list(current_groups), "origin_channel": self.channel_name},
|
||||
)
|
||||
|
||||
@database_sync_to_async
|
||||
def user_can_see_object_id(self, user_access, oid):
|
||||
# At this point user is a channels.auth.UserLazyObject object
|
||||
@@ -176,9 +197,20 @@ class EventConsumer(AsyncJsonWebsocketConsumer):
|
||||
self.channel_name,
|
||||
)
|
||||
|
||||
if len(old_groups):
|
||||
await self.channel_layer.group_send(
|
||||
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
|
||||
{"type": "consumer.unsubscribe", "groups": list(old_groups), "origin_channel": self.channel_name},
|
||||
)
|
||||
|
||||
new_groups_exclusive = new_groups - current_groups
|
||||
for group_name in new_groups_exclusive:
|
||||
await self.channel_layer.group_add(group_name, self.channel_name)
|
||||
|
||||
await self.channel_layer.group_send(
|
||||
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
|
||||
{"type": "consumer.subscribe", "groups": list(new_groups), "origin_channel": self.channel_name},
|
||||
)
|
||||
self.scope['session']['groups'] = new_groups
|
||||
await self.send_json({"groups_current": list(new_groups), "groups_left": list(old_groups), "groups_joined": list(new_groups_exclusive)})
|
||||
|
||||
@@ -200,9 +232,11 @@ def _dump_payload(payload):
|
||||
return None
|
||||
|
||||
|
||||
def emit_channel_notification(group, payload):
|
||||
from awx.main.wsbroadcast import wrap_broadcast_msg # noqa
|
||||
def unwrap_broadcast_msg(payload: dict):
|
||||
return (payload['group'], payload['message'])
|
||||
|
||||
|
||||
def emit_channel_notification(group, payload):
|
||||
payload_dumped = _dump_payload(payload)
|
||||
if payload_dumped is None:
|
||||
return
|
||||
@@ -212,16 +246,6 @@ def emit_channel_notification(group, payload):
|
||||
run_sync(
|
||||
channel_layer.group_send(
|
||||
group,
|
||||
{"type": "internal.message", "text": payload_dumped},
|
||||
)
|
||||
)
|
||||
|
||||
run_sync(
|
||||
channel_layer.group_send(
|
||||
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
|
||||
{
|
||||
"type": "internal.message",
|
||||
"text": wrap_broadcast_msg(group, payload_dumped),
|
||||
},
|
||||
{"type": "internal.message", "text": payload_dumped, "needs_relay": True},
|
||||
)
|
||||
)
|
||||
|
||||
@@ -63,7 +63,7 @@ class RecordedQueryLog(object):
|
||||
if not os.path.isdir(self.dest):
|
||||
os.makedirs(self.dest)
|
||||
progname = ' '.join(sys.argv)
|
||||
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsbroadcast'):
|
||||
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsrelay'):
|
||||
if match in progname:
|
||||
progname = match
|
||||
break
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
import psycopg2
|
||||
import select
|
||||
|
||||
@@ -6,7 +7,6 @@ from contextlib import contextmanager
|
||||
from django.conf import settings
|
||||
from django.db import connection as pg_connection
|
||||
|
||||
|
||||
NOT_READY = ([], [], [])
|
||||
|
||||
|
||||
@@ -14,6 +14,29 @@ def get_local_queuename():
|
||||
return settings.CLUSTER_HOST_ID
|
||||
|
||||
|
||||
def get_task_queuename():
|
||||
if os.getenv('AWX_COMPONENT') != 'web':
|
||||
return settings.CLUSTER_HOST_ID
|
||||
|
||||
from awx.main.models.ha import Instance
|
||||
|
||||
random_task_instance = (
|
||||
Instance.objects.filter(
|
||||
node_type__in=(Instance.Types.CONTROL, Instance.Types.HYBRID),
|
||||
node_state=Instance.States.READY,
|
||||
enabled=True,
|
||||
)
|
||||
.only('hostname')
|
||||
.order_by('?')
|
||||
.first()
|
||||
)
|
||||
|
||||
if random_task_instance is None:
|
||||
raise ValueError('No task instances are READY and Enabled.')
|
||||
|
||||
return random_task_instance.hostname
|
||||
|
||||
|
||||
class PubSub(object):
|
||||
def __init__(self, conn):
|
||||
self.conn = conn
|
||||
|
||||
@@ -6,7 +6,7 @@ from django.conf import settings
|
||||
from django.db import connection
|
||||
import redis
|
||||
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
|
||||
from . import pg_bus_conn
|
||||
|
||||
@@ -21,7 +21,7 @@ class Control(object):
|
||||
if service not in self.services:
|
||||
raise RuntimeError('{} must be in {}'.format(service, self.services))
|
||||
self.service = service
|
||||
self.queuename = host or get_local_queuename()
|
||||
self.queuename = host or get_task_queuename()
|
||||
|
||||
def status(self, *args, **kwargs):
|
||||
r = redis.Redis.from_url(settings.BROKER_URL)
|
||||
|
||||
@@ -26,8 +26,8 @@ class TaskWorker(BaseWorker):
|
||||
`awx.main.dispatch.publish`.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def resolve_callable(cls, task):
|
||||
@staticmethod
|
||||
def resolve_callable(task):
|
||||
"""
|
||||
Transform a dotted notation task into an imported, callable function, e.g.,
|
||||
|
||||
@@ -46,7 +46,8 @@ class TaskWorker(BaseWorker):
|
||||
|
||||
return _call
|
||||
|
||||
def run_callable(self, body):
|
||||
@staticmethod
|
||||
def run_callable(body):
|
||||
"""
|
||||
Given some AMQP message, import the correct Python code and run it.
|
||||
"""
|
||||
|
||||
22
awx/main/management/commands/cleanup_host_metrics.py
Normal file
22
awx/main/management/commands/cleanup_host_metrics.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from awx.main.models import HostMetric
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Run soft-deleting of HostMetrics
|
||||
"""
|
||||
|
||||
help = 'Run soft-deleting of HostMetrics'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('--months-ago', type=int, dest='months-ago', action='store', help='Threshold in months for soft-deleting')
|
||||
|
||||
def handle(self, *args, **options):
|
||||
months_ago = options.get('months-ago') or None
|
||||
|
||||
if not months_ago:
|
||||
months_ago = getattr(settings, 'CLEANUP_HOST_METRICS_THRESHOLD', 12)
|
||||
|
||||
HostMetric.cleanup_task(months_ago)
|
||||
@@ -1,5 +1,6 @@
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from awx.main.tasks.system import clear_setting_cache
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
@@ -31,5 +32,7 @@ class Command(BaseCommand):
|
||||
else:
|
||||
raise CommandError('Please pass --enable flag to allow local auth or --disable flag to disable local auth')
|
||||
|
||||
clear_setting_cache.delay(['DISABLE_LOCAL_AUTH'])
|
||||
|
||||
def handle(self, **options):
|
||||
self._enable_disable_auth(options.get('enable'), options.get('disable'))
|
||||
|
||||
@@ -44,16 +44,18 @@ class Command(BaseCommand):
|
||||
|
||||
for x in ig.instances.all():
|
||||
color = '\033[92m'
|
||||
end_color = '\033[0m'
|
||||
if x.capacity == 0 and x.node_type != 'hop':
|
||||
color = '\033[91m'
|
||||
if not x.enabled:
|
||||
color = '\033[90m[DISABLED] '
|
||||
if no_color:
|
||||
color = ''
|
||||
end_color = ''
|
||||
|
||||
capacity = f' capacity={x.capacity}' if x.node_type != 'hop' else ''
|
||||
version = f" version={x.version or '?'}" if x.node_type != 'hop' else ''
|
||||
heartbeat = f' heartbeat="{x.last_seen:%Y-%m-%d %H:%M:%S}"' if x.capacity or x.node_type == 'hop' else ''
|
||||
print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}\033[0m')
|
||||
print(f'\t{color}{x.hostname}{capacity} node_type={x.node_type}{version}{heartbeat}{end_color}')
|
||||
|
||||
print()
|
||||
|
||||
32
awx/main/management/commands/run_cache_clear.py
Normal file
32
awx/main/management/commands/run_cache_clear.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import logging
|
||||
import json
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from awx.main.dispatch import pg_bus_conn
|
||||
from awx.main.dispatch.worker.task import TaskWorker
|
||||
|
||||
logger = logging.getLogger('awx.main.cache_clear')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Cache Clear
|
||||
Runs as a management command and starts a daemon that listens for a pg_notify message to clear the cache.
|
||||
"""
|
||||
|
||||
help = 'Launch the cache clear daemon'
|
||||
|
||||
def handle(self, *arg, **options):
|
||||
try:
|
||||
with pg_bus_conn(new_connection=True) as conn:
|
||||
conn.listen("tower_settings_change")
|
||||
for e in conn.events(yield_timeouts=True):
|
||||
if e is not None:
|
||||
body = json.loads(e.payload)
|
||||
logger.info(f"Cache clear request received. Clearing now, payload: {e.payload}")
|
||||
TaskWorker.run_callable(body)
|
||||
|
||||
except Exception:
|
||||
# Log unanticipated exception in addition to writing to stderr to get timestamps and other metadata
|
||||
logger.exception('Encountered unhandled error in cache clear main loop')
|
||||
raise
|
||||
@@ -8,7 +8,7 @@ from django.core.cache import cache as django_cache
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import connection as django_connection
|
||||
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
from awx.main.dispatch.control import Control
|
||||
from awx.main.dispatch.pool import AutoscalePool
|
||||
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
|
||||
@@ -76,7 +76,7 @@ class Command(BaseCommand):
|
||||
consumer = None
|
||||
|
||||
try:
|
||||
queues = ['tower_broadcast_all', get_local_queuename()]
|
||||
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
|
||||
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4))
|
||||
consumer.run()
|
||||
except KeyboardInterrupt:
|
||||
|
||||
67
awx/main/management/commands/run_heartbeet.py
Normal file
67
awx/main/management/commands/run_heartbeet.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.conf import settings
|
||||
|
||||
from awx.main.dispatch import pg_bus_conn
|
||||
|
||||
logger = logging.getLogger('awx.main.commands.run_heartbeet')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Launch the web server beacon (heartbeet)'
|
||||
|
||||
def print_banner(self):
|
||||
heartbeet = r"""
|
||||
********** **********
|
||||
************* *************
|
||||
*****************************
|
||||
***********HEART***********
|
||||
*************************
|
||||
*******************
|
||||
*************** _._
|
||||
*********** /`._ `'. __
|
||||
******* \ .\| \ _'` `)
|
||||
*** (``_) \| ).'` /`- /
|
||||
* `\ `;\_ `\\//`-'` /
|
||||
\ `'.'.| / __/`
|
||||
`'--v_|/`'`
|
||||
__||-._
|
||||
/'` `-`` `'\\
|
||||
/ .'` )
|
||||
\ BEET ' )
|
||||
\. /
|
||||
'. /'`
|
||||
`) |
|
||||
//
|
||||
'(.
|
||||
`\`.
|
||||
``"""
|
||||
print(heartbeet)
|
||||
|
||||
def construct_payload(self, action='online'):
|
||||
payload = {
|
||||
'hostname': settings.CLUSTER_HOST_ID,
|
||||
'ip': os.environ.get('MY_POD_IP'),
|
||||
'action': action,
|
||||
}
|
||||
return json.dumps(payload)
|
||||
|
||||
def do_hearbeat_loop(self):
|
||||
with pg_bus_conn(new_connection=True) as conn:
|
||||
while True:
|
||||
logger.debug('Sending heartbeat')
|
||||
conn.notify('web_heartbeet', self.construct_payload())
|
||||
time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS)
|
||||
|
||||
# TODO: Send a message with action=offline if we notice a SIGTERM or SIGINT
|
||||
# (wsrelay can use this to remove the node quicker)
|
||||
def handle(self, *arg, **options):
|
||||
self.print_banner()
|
||||
|
||||
# Note: We don't really try any reconnect logic to pg_notify here,
|
||||
# just let supervisor restart if we fail.
|
||||
self.do_hearbeat_loop()
|
||||
41
awx/main/management/commands/run_rsyslog_configurer.py
Normal file
41
awx/main/management/commands/run_rsyslog_configurer.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import logging
|
||||
import json
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
from awx.main.dispatch import pg_bus_conn
|
||||
from awx.main.dispatch.worker.task import TaskWorker
|
||||
from awx.main.utils.external_logging import reconfigure_rsyslog
|
||||
|
||||
logger = logging.getLogger('awx.main.rsyslog_configurer')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Rsyslog Configurer
|
||||
Runs as a management command and starts rsyslog configurer daemon. Daemon listens
|
||||
for pg_notify then calls reconfigure_rsyslog
|
||||
"""
|
||||
|
||||
help = 'Launch the rsyslog_configurer daemon'
|
||||
|
||||
def handle(self, *arg, **options):
|
||||
try:
|
||||
with pg_bus_conn(new_connection=True) as conn:
|
||||
conn.listen("rsyslog_configurer")
|
||||
# reconfigure rsyslog on start up
|
||||
reconfigure_rsyslog()
|
||||
for e in conn.events(yield_timeouts=True):
|
||||
if e is not None:
|
||||
logger.info("Change in logging settings found. Restarting rsyslogd")
|
||||
# clear the cache of relevant settings then restart
|
||||
setting_keys = [k for k in dir(settings) if k.startswith('LOG_AGGREGATOR')]
|
||||
cache.delete_many(setting_keys)
|
||||
settings._awx_conf_memoizedcache.clear()
|
||||
body = json.loads(e.payload)
|
||||
TaskWorker.run_callable(body)
|
||||
except Exception:
|
||||
# Log unanticipated exception in addition to writing to stderr to get timestamps and other metadata
|
||||
logger.exception('Encountered unhandled error in rsyslog_configurer main loop')
|
||||
raise
|
||||
@@ -13,13 +13,13 @@ from django.db import connection
|
||||
from django.db.migrations.executor import MigrationExecutor
|
||||
|
||||
from awx.main.analytics.broadcast_websocket import (
|
||||
BroadcastWebsocketStatsManager,
|
||||
RelayWebsocketStatsManager,
|
||||
safe_name,
|
||||
)
|
||||
from awx.main.wsbroadcast import BroadcastWebsocketManager
|
||||
from awx.main.wsrelay import WebSocketRelayManager
|
||||
|
||||
|
||||
logger = logging.getLogger('awx.main.wsbroadcast')
|
||||
logger = logging.getLogger('awx.main.wsrelay')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
@@ -99,7 +99,7 @@ class Command(BaseCommand):
|
||||
executor = MigrationExecutor(connection)
|
||||
migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes()))
|
||||
except Exception as exc:
|
||||
logger.info(f'Error on startup of run_wsbroadcast (error: {exc}), retry in 10s...')
|
||||
logger.warning(f'Error on startup of run_wsrelay (error: {exc}), retry in 10s...')
|
||||
time.sleep(10)
|
||||
return
|
||||
|
||||
@@ -130,9 +130,9 @@ class Command(BaseCommand):
|
||||
|
||||
if options.get('status'):
|
||||
try:
|
||||
stats_all = BroadcastWebsocketStatsManager.get_stats_sync()
|
||||
stats_all = RelayWebsocketStatsManager.get_stats_sync()
|
||||
except redis.exceptions.ConnectionError as e:
|
||||
print(f"Unable to get Broadcast Websocket Status. Failed to connect to redis {e}")
|
||||
print(f"Unable to get Relay Websocket Status. Failed to connect to redis {e}")
|
||||
return
|
||||
|
||||
data = {}
|
||||
@@ -151,22 +151,19 @@ class Command(BaseCommand):
|
||||
host_stats = Command.get_connection_status(hostnames, data)
|
||||
lines = Command._format_lines(host_stats)
|
||||
|
||||
print(f'Broadcast websocket connection status from "{my_hostname}" to:')
|
||||
print(f'Relay websocket connection status from "{my_hostname}" to:')
|
||||
print('\n'.join(lines))
|
||||
|
||||
host_stats = Command.get_connection_stats(hostnames, data)
|
||||
lines = Command._format_lines(host_stats)
|
||||
|
||||
print(f'\nBroadcast websocket connection stats from "{my_hostname}" to:')
|
||||
print(f'\nRelay websocket connection stats from "{my_hostname}" to:')
|
||||
print('\n'.join(lines))
|
||||
|
||||
return
|
||||
|
||||
try:
|
||||
broadcast_websocket_mgr = BroadcastWebsocketManager()
|
||||
task = broadcast_websocket_mgr.start()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(task)
|
||||
websocket_relay_manager = WebSocketRelayManager()
|
||||
asyncio.run(websocket_relay_manager.run())
|
||||
except KeyboardInterrupt:
|
||||
logger.debug('Terminating Websocket Broadcaster')
|
||||
logger.info('Terminating Websocket Relayer')
|
||||
@@ -9,6 +9,8 @@ import re
|
||||
import copy
|
||||
import os.path
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import dateutil.relativedelta
|
||||
import yaml
|
||||
|
||||
# Django
|
||||
@@ -17,6 +19,7 @@ from django.db import models, connection
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.db import transaction
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.urls import resolve
|
||||
from django.utils.timezone import now
|
||||
from django.db.models import Q
|
||||
|
||||
@@ -206,8 +209,14 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
|
||||
)
|
||||
|
||||
def get_absolute_url(self, request=None):
|
||||
if self.kind == 'constructed':
|
||||
return reverse('api:constructed_inventory_detail', kwargs={'pk': self.pk}, request=request)
|
||||
if request is not None:
|
||||
# circular import
|
||||
from awx.api.urls.inventory import constructed_inventory_urls
|
||||
|
||||
route = resolve(request.path_info)
|
||||
if any(route.url_name == url.name for url in constructed_inventory_urls):
|
||||
return reverse('api:constructed_inventory_detail', kwargs={'pk': self.pk}, request=request)
|
||||
|
||||
return reverse('api:inventory_detail', kwargs={'pk': self.pk}, request=request)
|
||||
|
||||
variables_dict = VarsDictProperty('variables')
|
||||
@@ -881,6 +890,23 @@ class HostMetric(models.Model):
|
||||
self.deleted = False
|
||||
self.save(update_fields=['deleted'])
|
||||
|
||||
@classmethod
|
||||
def cleanup_task(cls, months_ago):
|
||||
try:
|
||||
months_ago = int(months_ago)
|
||||
if months_ago <= 0:
|
||||
raise ValueError()
|
||||
|
||||
last_automation_before = now() - dateutil.relativedelta.relativedelta(months=months_ago)
|
||||
|
||||
logger.info(f'Cleanup [HostMetric]: soft-deleting records last automated before {last_automation_before}')
|
||||
HostMetric.active_objects.filter(last_automation__lt=last_automation_before).update(
|
||||
deleted=True, deleted_counter=models.F('deleted_counter') + 1, last_deleted=now()
|
||||
)
|
||||
settings.CLEANUP_HOST_METRICS_LAST_TS = now()
|
||||
except (TypeError, ValueError):
|
||||
logger.error(f"Cleanup [HostMetric]: months_ago({months_ago}) has to be a positive integer value")
|
||||
|
||||
|
||||
class HostMetricSummaryMonthly(models.Model):
|
||||
"""
|
||||
|
||||
@@ -32,7 +32,7 @@ from polymorphic.models import PolymorphicModel
|
||||
|
||||
# AWX
|
||||
from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel, prevent_search
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
from awx.main.dispatch.control import Control as ControlDispatcher
|
||||
from awx.main.registrar import activity_stream_registrar
|
||||
from awx.main.models.mixins import ResourceMixin, TaskManagerUnifiedJobMixin, ExecutionEnvironmentMixin
|
||||
@@ -1567,7 +1567,7 @@ class UnifiedJob(
|
||||
return r
|
||||
|
||||
def get_queue_name(self):
|
||||
return self.controller_node or self.execution_node or get_local_queuename()
|
||||
return self.controller_node or self.execution_node or get_task_queuename()
|
||||
|
||||
@property
|
||||
def is_container_group_task(self):
|
||||
|
||||
@@ -28,7 +28,7 @@ class AWXProtocolTypeRouter(ProtocolTypeRouter):
|
||||
|
||||
websocket_urlpatterns = [
|
||||
re_path(r'websocket/$', consumers.EventConsumer.as_asgi()),
|
||||
re_path(r'websocket/broadcast/$', consumers.BroadcastConsumer.as_asgi()),
|
||||
re_path(r'websocket/relay/$', consumers.RelayConsumer.as_asgi()),
|
||||
]
|
||||
|
||||
application = AWXProtocolTypeRouter(
|
||||
|
||||
@@ -8,7 +8,7 @@ from django.conf import settings
|
||||
from awx import MODE
|
||||
from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
|
||||
logger = logging.getLogger('awx.main.scheduler')
|
||||
|
||||
@@ -20,16 +20,16 @@ def run_manager(manager, prefix):
|
||||
manager().schedule()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def task_manager():
|
||||
run_manager(TaskManager, "task")
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def dependency_manager():
|
||||
run_manager(DependencyManager, "dependency")
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def workflow_manager():
|
||||
run_manager(WorkflowManager, "workflow")
|
||||
|
||||
@@ -29,7 +29,7 @@ from gitdb.exc import BadName as BadGitName
|
||||
|
||||
# AWX
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
from awx.main.constants import (
|
||||
PRIVILEGE_ESCALATION_METHODS,
|
||||
STANDARD_INVENTORY_UPDATE_ENV,
|
||||
@@ -806,7 +806,7 @@ class SourceControlMixin(BaseTask):
|
||||
self.release_lock(project)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
class RunJob(SourceControlMixin, BaseTask):
|
||||
"""
|
||||
Run a job using ansible-playbook.
|
||||
@@ -1121,7 +1121,7 @@ class RunJob(SourceControlMixin, BaseTask):
|
||||
update_inventory_computed_fields.delay(inventory.id)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
class RunProjectUpdate(BaseTask):
|
||||
model = ProjectUpdate
|
||||
event_model = ProjectUpdateEvent
|
||||
@@ -1443,7 +1443,7 @@ class RunProjectUpdate(BaseTask):
|
||||
return params
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
||||
model = InventoryUpdate
|
||||
event_model = InventoryUpdateEvent
|
||||
@@ -1706,7 +1706,7 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
||||
raise PostRunError('Error occured while saving inventory data, see traceback or server logs', status='error', tb=traceback.format_exc())
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
class RunAdHocCommand(BaseTask):
|
||||
"""
|
||||
Run an ad hoc command using ansible.
|
||||
@@ -1859,7 +1859,7 @@ class RunAdHocCommand(BaseTask):
|
||||
return d
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
class RunSystemJob(BaseTask):
|
||||
model = SystemJob
|
||||
event_model = SystemJobEvent
|
||||
|
||||
@@ -28,7 +28,7 @@ from awx.main.utils.common import (
|
||||
from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER
|
||||
from awx.main.tasks.signals import signal_state, signal_callback, SignalExit
|
||||
from awx.main.models import Instance, InstanceLink, UnifiedJob
|
||||
from awx.main.dispatch import get_local_queuename
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
from awx.main.dispatch.publish import task
|
||||
|
||||
# Receptorctl
|
||||
@@ -668,6 +668,7 @@ RECEPTOR_CONFIG_STARTER = (
|
||||
'rootcas': '/etc/receptor/tls/ca/receptor-ca.crt',
|
||||
'cert': '/etc/receptor/tls/receptor.crt',
|
||||
'key': '/etc/receptor/tls/receptor.key',
|
||||
'mintls13': False,
|
||||
}
|
||||
},
|
||||
)
|
||||
@@ -712,7 +713,7 @@ def write_receptor_config():
|
||||
links.update(link_state=InstanceLink.States.ESTABLISHED)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def remove_deprovisioned_node(hostname):
|
||||
InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
|
||||
InstanceLink.objects.filter(target__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
|
||||
|
||||
@@ -47,10 +47,11 @@ from awx.main.models import (
|
||||
Inventory,
|
||||
SmartInventoryMembership,
|
||||
Job,
|
||||
HostMetric,
|
||||
)
|
||||
from awx.main.constants import ACTIVE_STATES
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.dispatch import get_local_queuename, reaper
|
||||
from awx.main.dispatch import get_task_queuename, reaper
|
||||
from awx.main.utils.common import (
|
||||
get_type_for_model,
|
||||
ignore_inventory_computed_fields,
|
||||
@@ -59,7 +60,6 @@ from awx.main.utils.common import (
|
||||
ScheduleTaskManager,
|
||||
)
|
||||
|
||||
from awx.main.utils.external_logging import reconfigure_rsyslog
|
||||
from awx.main.utils.reload import stop_local_services
|
||||
from awx.main.utils.pglock import advisory_lock
|
||||
from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config
|
||||
@@ -115,9 +115,6 @@ def dispatch_startup():
|
||||
m = Metrics()
|
||||
m.reset_values()
|
||||
|
||||
# Update Tower's rsyslog.conf file based on loggins settings in the db
|
||||
reconfigure_rsyslog()
|
||||
|
||||
|
||||
def inform_cluster_of_shutdown():
|
||||
try:
|
||||
@@ -132,7 +129,7 @@ def inform_cluster_of_shutdown():
|
||||
logger.exception('Encountered problem with normal shutdown signal.')
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def apply_cluster_membership_policies():
|
||||
from awx.main.signals import disable_activity_stream
|
||||
|
||||
@@ -244,8 +241,10 @@ def apply_cluster_membership_policies():
|
||||
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
|
||||
|
||||
|
||||
@task(queue='tower_broadcast_all')
|
||||
def handle_setting_changes(setting_keys):
|
||||
@task(queue='tower_settings_change')
|
||||
def clear_setting_cache(setting_keys):
|
||||
# log that cache is being cleared
|
||||
logger.info(f"clear_setting_cache of keys {setting_keys}")
|
||||
orig_len = len(setting_keys)
|
||||
for i in range(orig_len):
|
||||
for dependent_key in settings_registry.get_dependent_settings(setting_keys[i]):
|
||||
@@ -254,9 +253,6 @@ def handle_setting_changes(setting_keys):
|
||||
logger.debug('cache delete_many(%r)', cache_keys)
|
||||
cache.delete_many(cache_keys)
|
||||
|
||||
if any([setting.startswith('LOG_AGGREGATOR') for setting in setting_keys]):
|
||||
reconfigure_rsyslog()
|
||||
|
||||
|
||||
@task(queue='tower_broadcast_all')
|
||||
def delete_project_files(project_path):
|
||||
@@ -286,7 +282,7 @@ def profile_sql(threshold=1, minutes=1):
|
||||
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def send_notifications(notification_list, job_id=None):
|
||||
if not isinstance(notification_list, list):
|
||||
raise TypeError("notification_list should be of type list")
|
||||
@@ -317,7 +313,7 @@ def send_notifications(notification_list, job_id=None):
|
||||
logger.exception('Error saving notification {} result.'.format(notification.id))
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def gather_analytics():
|
||||
from awx.conf.models import Setting
|
||||
from rest_framework.fields import DateTimeField
|
||||
@@ -330,7 +326,7 @@ def gather_analytics():
|
||||
analytics.gather()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def purge_old_stdout_files():
|
||||
nowtime = time.time()
|
||||
for f in os.listdir(settings.JOBOUTPUT_ROOT):
|
||||
@@ -378,12 +374,26 @@ def handle_removed_image(remove_images=None):
|
||||
_cleanup_images_and_files(remove_images=remove_images, file_pattern='')
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def cleanup_images_and_files():
|
||||
_cleanup_images_and_files()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def cleanup_host_metrics():
|
||||
from awx.conf.models import Setting
|
||||
from rest_framework.fields import DateTimeField
|
||||
|
||||
last_cleanup = Setting.objects.filter(key='CLEANUP_HOST_METRICS_LAST_TS').first()
|
||||
last_time = DateTimeField().to_internal_value(last_cleanup.value) if last_cleanup and last_cleanup.value else None
|
||||
|
||||
cleanup_interval_secs = getattr(settings, 'CLEANUP_HOST_METRICS_INTERVAL', 30) * 86400
|
||||
if not last_time or ((now() - last_time).total_seconds() > cleanup_interval_secs):
|
||||
months_ago = getattr(settings, 'CLEANUP_HOST_METRICS_THRESHOLD', 12)
|
||||
HostMetric.cleanup_task(months_ago)
|
||||
|
||||
|
||||
@task(queue=get_task_queuename)
|
||||
def cluster_node_health_check(node):
|
||||
"""
|
||||
Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node
|
||||
@@ -402,7 +412,7 @@ def cluster_node_health_check(node):
|
||||
this_inst.local_health_check()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def execution_node_health_check(node):
|
||||
if node == '':
|
||||
logger.warning('Remote health check incorrectly called with blank string')
|
||||
@@ -496,7 +506,7 @@ def inspect_execution_nodes(instance_list):
|
||||
execution_node_health_check.apply_async([hostname])
|
||||
|
||||
|
||||
@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'worker_tasks'])
|
||||
@task(queue=get_task_queuename, bind_kwargs=['dispatch_time', 'worker_tasks'])
|
||||
def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
|
||||
logger.debug("Cluster node heartbeat task.")
|
||||
nowtime = now()
|
||||
@@ -586,7 +596,7 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
|
||||
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def awx_receptor_workunit_reaper():
|
||||
"""
|
||||
When an AWX job is launched via receptor, files such as status, stdin, and stdout are created
|
||||
@@ -622,7 +632,7 @@ def awx_receptor_workunit_reaper():
|
||||
administrative_workunit_reaper(receptor_work_list)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def awx_k8s_reaper():
|
||||
if not settings.RECEPTOR_RELEASE_WORK:
|
||||
return
|
||||
@@ -642,7 +652,7 @@ def awx_k8s_reaper():
|
||||
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def awx_periodic_scheduler():
|
||||
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
|
||||
if acquired is False:
|
||||
@@ -708,7 +718,7 @@ def schedule_manager_success_or_error(instance):
|
||||
ScheduleWorkflowManager().schedule()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def handle_work_success(task_actual):
|
||||
try:
|
||||
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
||||
@@ -720,7 +730,7 @@ def handle_work_success(task_actual):
|
||||
schedule_manager_success_or_error(instance)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def handle_work_error(task_actual):
|
||||
try:
|
||||
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
||||
@@ -760,7 +770,7 @@ def handle_work_error(task_actual):
|
||||
schedule_manager_success_or_error(instance)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def update_inventory_computed_fields(inventory_id):
|
||||
"""
|
||||
Signal handler and wrapper around inventory.update_computed_fields to
|
||||
@@ -801,7 +811,7 @@ def update_smart_memberships_for_inventory(smart_inventory):
|
||||
return False
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def update_host_smart_inventory_memberships():
|
||||
smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False)
|
||||
changed_inventories = set([])
|
||||
@@ -817,7 +827,7 @@ def update_host_smart_inventory_memberships():
|
||||
smart_inventory.update_computed_fields()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def delete_inventory(inventory_id, user_id, retries=5):
|
||||
# Delete inventory as user
|
||||
if user_id is None:
|
||||
@@ -882,7 +892,7 @@ def _reconstruct_relationships(copy_mapping):
|
||||
new_obj.save()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
@task(queue=get_task_queuename)
|
||||
def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, uuid, permission_check_func=None):
|
||||
sub_obj_list = cache.get(uuid)
|
||||
if sub_obj_list is None:
|
||||
|
||||
86
awx/main/tests/functional/api/test_analytics.py
Normal file
86
awx/main/tests/functional/api/test_analytics.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import pytest
|
||||
import requests
|
||||
from awx.api.views.analytics import AnalyticsGenericView, MissingSettings, AUTOMATION_ANALYTICS_API_URL_PATH
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from awx.main.utils import get_awx_version
|
||||
from django.utils import translation
|
||||
|
||||
|
||||
class TestAnalyticsGenericView:
|
||||
@pytest.mark.parametrize(
|
||||
"existing_headers,expected_headers",
|
||||
[
|
||||
({}, {}),
|
||||
({'Hey': 'There'}, {}), # We don't forward just any headers
|
||||
({'Content-Type': 'text/html', 'Content-Length': '12'}, {'Content-Type': 'text/html', 'Content-Length': '12'}),
|
||||
# Requests will auto-add the following headers (so we don't need to test them): 'Accept-Encoding', 'User-Agent', 'Accept'
|
||||
],
|
||||
)
|
||||
def test__request_headers(self, existing_headers, expected_headers):
|
||||
expected_headers['X-Rh-Analytics-Source'] = 'controller'
|
||||
expected_headers['X-Rh-Analytics-Source-Version'] = get_awx_version()
|
||||
expected_headers['Accept-Language'] = translation.get_language()
|
||||
|
||||
request = requests.session()
|
||||
request.headers.update(existing_headers)
|
||||
assert set(expected_headers.items()).issubset(set(AnalyticsGenericView._request_headers(request).items()))
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path,expected_path",
|
||||
[
|
||||
('A/B', f'{AUTOMATION_ANALYTICS_API_URL_PATH}/A/B'),
|
||||
('B', f'{AUTOMATION_ANALYTICS_API_URL_PATH}/B'),
|
||||
('/a/b/c/analytics/reports/my_slug', f'{AUTOMATION_ANALYTICS_API_URL_PATH}/reports/my_slug'),
|
||||
('/a/b/c/analytics/', f'{AUTOMATION_ANALYTICS_API_URL_PATH}/'),
|
||||
('/a/b/c/analytics', f'{AUTOMATION_ANALYTICS_API_URL_PATH}//a/b/c/analytics'), # Because there is no ending / on analytics we get a weird condition
|
||||
('/a/b/c/analytics/', f'{AUTOMATION_ANALYTICS_API_URL_PATH}/'),
|
||||
],
|
||||
)
|
||||
@pytest.mark.django_db
|
||||
def test__get_analytics_path(self, path, expected_path):
|
||||
assert AnalyticsGenericView._get_analytics_path(path) == expected_path
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test__get_analytics_url_no_url(self):
|
||||
with override_settings(AUTOMATION_ANALYTICS_URL=None):
|
||||
with pytest.raises(MissingSettings):
|
||||
agw = AnalyticsGenericView()
|
||||
agw._get_analytics_url('A')
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"request_path,ending_url",
|
||||
[
|
||||
('A', 'A'),
|
||||
('A/B', 'A/B'),
|
||||
('A/B/analytics/', ''), # we split on analytics but because there is nothing after
|
||||
('A/B/analytics/report', 'report'),
|
||||
('A/B/analytics/report/slug', 'report/slug'),
|
||||
],
|
||||
)
|
||||
@pytest.mark.django_db
|
||||
def test__get_analytics_url(self, request_path, ending_url):
|
||||
base_url = 'http://testing'
|
||||
with override_settings(AUTOMATION_ANALYTICS_URL=base_url):
|
||||
agw = AnalyticsGenericView()
|
||||
assert agw._get_analytics_url(request_path) == f'{base_url}{AUTOMATION_ANALYTICS_API_URL_PATH}/{ending_url}'
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"setting_name,setting_value,raises",
|
||||
[
|
||||
('INSIGHTS_TRACKING_STATE', None, True),
|
||||
('INSIGHTS_TRACKING_STATE', False, True),
|
||||
('INSIGHTS_TRACKING_STATE', True, False),
|
||||
('INSIGHTS_TRACKING_STATE', 'Steve', False),
|
||||
('INSIGHTS_TRACKING_STATE', 1, False),
|
||||
('INSIGHTS_TRACKING_STATE', '', True),
|
||||
],
|
||||
)
|
||||
@pytest.mark.django_db
|
||||
def test__get_setting(self, setting_name, setting_value, raises):
|
||||
with override_settings(**{setting_name: setting_value}):
|
||||
if raises:
|
||||
with pytest.raises(MissingSettings):
|
||||
AnalyticsGenericView._get_setting(setting_name, False, None)
|
||||
else:
|
||||
assert AnalyticsGenericView._get_setting(setting_name, False, None) == setting_value
|
||||
@@ -680,3 +680,22 @@ class TestConstructedInventory:
|
||||
inv_src = constructed_inventory.inventory_sources.first()
|
||||
assert inv_src.update_cache_timeout == 55
|
||||
assert inv_src.limit == 'foobar'
|
||||
|
||||
def test_get_absolute_url_for_constructed_inventory(self, constructed_inventory, admin_user, get):
|
||||
"""
|
||||
If we are using the normal inventory API endpoint to look at a
|
||||
constructed inventory, then we should get a normal inventory API route
|
||||
back. If we are accessing it via the special constructed inventory
|
||||
endpoint, then we should get that back.
|
||||
"""
|
||||
|
||||
url_const = reverse('api:constructed_inventory_detail', kwargs={'pk': constructed_inventory.pk})
|
||||
url_inv = reverse('api:inventory_detail', kwargs={'pk': constructed_inventory.pk})
|
||||
|
||||
const_r = get(url=url_const, user=admin_user, expect=200)
|
||||
inv_r = get(url=url_inv, user=admin_user, expect=200)
|
||||
assert const_r.data['url'] == url_const
|
||||
assert inv_r.data['url'] == url_inv
|
||||
assert inv_r.data['url'] != const_r.data['url']
|
||||
assert inv_r.data['related']['constructed_url'] == url_const
|
||||
assert const_r.data['related']['constructed_url'] == url_const
|
||||
|
||||
@@ -26,12 +26,12 @@ def test_python_and_js_licenses():
|
||||
return (is_gpl, is_lgpl)
|
||||
|
||||
def find_embedded_source_version(path, name):
|
||||
for entry in os.listdir(path):
|
||||
# Check variations of '-' and '_' in filenames due to python
|
||||
for fname in [name, name.replace('-', '_')]:
|
||||
if entry.startswith(fname) and entry.endswith('.tar.gz'):
|
||||
v = entry.split(name + '-')[1].split('.tar.gz')[0]
|
||||
return v
|
||||
files = os.listdir(path)
|
||||
tgz_files = [f for f in files if f.endswith('.tar.gz')]
|
||||
for tgz in tgz_files:
|
||||
pkg_name = tgz.split('-')[0].split('_')[0]
|
||||
if pkg_name == name:
|
||||
return tgz.split('-')[1].split('.tar.gz')[0]
|
||||
return None
|
||||
|
||||
list = {}
|
||||
|
||||
@@ -50,6 +50,7 @@ class TestApiRootView:
|
||||
'activity_stream',
|
||||
'workflow_job_templates',
|
||||
'workflow_jobs',
|
||||
'analytics',
|
||||
]
|
||||
view = ApiVersionRootView()
|
||||
ret = view.get(mocker.MagicMock())
|
||||
|
||||
@@ -6,6 +6,7 @@ import urllib.parse as urlparse
|
||||
from django.conf import settings
|
||||
|
||||
from awx.main.utils.reload import supervisor_service_command
|
||||
from awx.main.dispatch.publish import task
|
||||
|
||||
|
||||
def construct_rsyslog_conf_template(settings=settings):
|
||||
@@ -114,6 +115,7 @@ def construct_rsyslog_conf_template(settings=settings):
|
||||
return tmpl
|
||||
|
||||
|
||||
@task(queue='rsyslog_configurer')
|
||||
def reconfigure_rsyslog():
|
||||
tmpl = construct_rsyslog_conf_template()
|
||||
# Write config to a temp file then move it to preserve atomicity
|
||||
|
||||
@@ -388,9 +388,13 @@ class Licenser(object):
|
||||
if subscription_model == SUBSCRIPTION_USAGE_MODEL_UNIQUE_HOSTS:
|
||||
automated_instances = HostMetric.active_objects.count()
|
||||
first_host = HostMetric.active_objects.only('first_automation').order_by('first_automation').first()
|
||||
attrs['deleted_instances'] = HostMetric.objects.filter(deleted=True).count()
|
||||
attrs['reactivated_instances'] = HostMetric.active_objects.filter(deleted_counter__gte=1).count()
|
||||
else:
|
||||
automated_instances = HostMetric.objects.count()
|
||||
automated_instances = 0
|
||||
first_host = HostMetric.objects.only('first_automation').order_by('first_automation').first()
|
||||
attrs['deleted_instances'] = 0
|
||||
attrs['reactivated_instances'] = 0
|
||||
|
||||
if first_host:
|
||||
automated_since = int(first_host.first_automation.timestamp())
|
||||
|
||||
@@ -17,7 +17,7 @@ def supervisor_service_command(command, service='*', communicate=True):
|
||||
"""
|
||||
args = ['supervisorctl']
|
||||
|
||||
supervisor_config_path = os.getenv('SUPERVISOR_WEB_CONFIG_PATH', None)
|
||||
supervisor_config_path = os.getenv('SUPERVISOR_CONFIG_PATH', None)
|
||||
if supervisor_config_path:
|
||||
args.extend(['-c', supervisor_config_path])
|
||||
|
||||
|
||||
@@ -1,208 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import client_exceptions
|
||||
from asgiref.sync import sync_to_async
|
||||
|
||||
from channels.layers import get_channel_layer
|
||||
|
||||
from django.conf import settings
|
||||
from django.apps import apps
|
||||
from django.core.serializers.json import DjangoJSONEncoder
|
||||
|
||||
from awx.main.analytics.broadcast_websocket import (
|
||||
BroadcastWebsocketStats,
|
||||
BroadcastWebsocketStatsManager,
|
||||
)
|
||||
import awx.main.analytics.subsystem_metrics as s_metrics
|
||||
|
||||
logger = logging.getLogger('awx.main.wsbroadcast')
|
||||
|
||||
|
||||
def wrap_broadcast_msg(group, message: str):
|
||||
# TODO: Maybe wrap as "group","message" so that we don't need to
|
||||
# encode/decode as json.
|
||||
return json.dumps(dict(group=group, message=message), cls=DjangoJSONEncoder)
|
||||
|
||||
|
||||
def unwrap_broadcast_msg(payload: dict):
|
||||
return (payload['group'], payload['message'])
|
||||
|
||||
|
||||
@sync_to_async
|
||||
def get_broadcast_hosts():
|
||||
Instance = apps.get_model('main', 'Instance')
|
||||
instances = (
|
||||
Instance.objects.exclude(hostname=Instance.objects.my_hostname())
|
||||
.exclude(node_type='execution')
|
||||
.exclude(node_type='hop')
|
||||
.order_by('hostname')
|
||||
.values('hostname', 'ip_address')
|
||||
.distinct()
|
||||
)
|
||||
return {i['hostname']: i['ip_address'] or i['hostname'] for i in instances}
|
||||
|
||||
|
||||
def get_local_host():
|
||||
Instance = apps.get_model('main', 'Instance')
|
||||
return Instance.objects.my_hostname()
|
||||
|
||||
|
||||
class WebsocketTask:
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
event_loop,
|
||||
stats: BroadcastWebsocketStats,
|
||||
remote_host: str,
|
||||
remote_port: int = settings.BROADCAST_WEBSOCKET_PORT,
|
||||
protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL,
|
||||
verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
|
||||
endpoint: str = 'broadcast',
|
||||
):
|
||||
self.name = name
|
||||
self.event_loop = event_loop
|
||||
self.stats = stats
|
||||
self.remote_host = remote_host
|
||||
self.remote_port = remote_port
|
||||
self.endpoint = endpoint
|
||||
self.protocol = protocol
|
||||
self.verify_ssl = verify_ssl
|
||||
self.channel_layer = None
|
||||
self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
|
||||
|
||||
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
|
||||
raise RuntimeError("Implement me")
|
||||
|
||||
async def connect(self, attempt):
|
||||
from awx.main.consumers import WebsocketSecretAuthHelper # noqa
|
||||
|
||||
logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.")
|
||||
|
||||
'''
|
||||
Can not put get_channel_layer() in the init code because it is in the init
|
||||
path of channel layers i.e. RedisChannelLayer() calls our init code.
|
||||
'''
|
||||
if not self.channel_layer:
|
||||
self.channel_layer = get_channel_layer()
|
||||
|
||||
try:
|
||||
if attempt > 0:
|
||||
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS)
|
||||
except asyncio.CancelledError:
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled")
|
||||
raise
|
||||
|
||||
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/"
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
|
||||
secret_val = WebsocketSecretAuthHelper.construct_secret()
|
||||
try:
|
||||
async with aiohttp.ClientSession(headers={'secret': secret_val}, timeout=timeout) as session:
|
||||
async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket:
|
||||
logger.info(f"Connection from {self.name} to {self.remote_host} established.")
|
||||
self.stats.record_connection_established()
|
||||
attempt = 0
|
||||
await self.run_loop(websocket)
|
||||
except asyncio.CancelledError:
|
||||
# TODO: Check if connected and disconnect
|
||||
# Possibly use run_until_complete() if disconnect is async
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.")
|
||||
self.stats.record_connection_lost()
|
||||
raise
|
||||
except client_exceptions.ClientConnectorError as e:
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} timed out.")
|
||||
except Exception as e:
|
||||
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
|
||||
logger.exception(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.")
|
||||
else:
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} list.")
|
||||
|
||||
self.stats.record_connection_lost()
|
||||
self.start(attempt=attempt + 1)
|
||||
|
||||
def start(self, attempt=0):
|
||||
self.async_task = self.event_loop.create_task(self.connect(attempt=attempt))
|
||||
|
||||
def cancel(self):
|
||||
self.async_task.cancel()
|
||||
|
||||
|
||||
class BroadcastWebsocketTask(WebsocketTask):
|
||||
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
|
||||
async for msg in websocket:
|
||||
self.stats.record_message_received()
|
||||
|
||||
if msg.type == aiohttp.WSMsgType.ERROR:
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.TEXT:
|
||||
try:
|
||||
payload = json.loads(msg.data)
|
||||
except json.JSONDecodeError:
|
||||
logmsg = "Failed to decode broadcast message"
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logmsg = "{} {}".format(logmsg, payload)
|
||||
logger.warning(logmsg)
|
||||
continue
|
||||
(group, message) = unwrap_broadcast_msg(payload)
|
||||
if group == "metrics":
|
||||
self.subsystem_metrics.store_metrics(message)
|
||||
continue
|
||||
await self.channel_layer.group_send(group, {"type": "internal.message", "text": message})
|
||||
|
||||
|
||||
class BroadcastWebsocketManager(object):
|
||||
def __init__(self):
|
||||
self.event_loop = asyncio.get_event_loop()
|
||||
'''
|
||||
{
|
||||
'hostname1': BroadcastWebsocketTask(),
|
||||
'hostname2': BroadcastWebsocketTask(),
|
||||
'hostname3': BroadcastWebsocketTask(),
|
||||
}
|
||||
'''
|
||||
self.broadcast_tasks = dict()
|
||||
self.local_hostname = get_local_host()
|
||||
self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname)
|
||||
|
||||
async def run_per_host_websocket(self):
|
||||
while True:
|
||||
known_hosts = await get_broadcast_hosts()
|
||||
future_remote_hosts = known_hosts.keys()
|
||||
current_remote_hosts = self.broadcast_tasks.keys()
|
||||
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
|
||||
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
|
||||
|
||||
remote_addresses = {k: v.remote_host for k, v in self.broadcast_tasks.items()}
|
||||
for hostname, address in known_hosts.items():
|
||||
if hostname in self.broadcast_tasks and address != remote_addresses[hostname]:
|
||||
deleted_remote_hosts.add(hostname)
|
||||
new_remote_hosts.add(hostname)
|
||||
|
||||
if deleted_remote_hosts:
|
||||
logger.warning(f"Removing {deleted_remote_hosts} from websocket broadcast list")
|
||||
if new_remote_hosts:
|
||||
logger.warning(f"Adding {new_remote_hosts} to websocket broadcast list")
|
||||
|
||||
for h in deleted_remote_hosts:
|
||||
self.broadcast_tasks[h].cancel()
|
||||
del self.broadcast_tasks[h]
|
||||
self.stats_mgr.delete_remote_host_stats(h)
|
||||
|
||||
for h in new_remote_hosts:
|
||||
stats = self.stats_mgr.new_remote_host_stats(h)
|
||||
broadcast_task = BroadcastWebsocketTask(name=self.local_hostname, event_loop=self.event_loop, stats=stats, remote_host=known_hosts[h])
|
||||
broadcast_task.start()
|
||||
self.broadcast_tasks[h] = broadcast_task
|
||||
|
||||
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
|
||||
|
||||
def start(self):
|
||||
self.stats_mgr.start()
|
||||
|
||||
self.async_task = self.event_loop.create_task(self.run_per_host_websocket())
|
||||
return self.async_task
|
||||
305
awx/main/wsrelay.py
Normal file
305
awx/main/wsrelay.py
Normal file
@@ -0,0 +1,305 @@
|
||||
import json
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Dict
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import client_exceptions
|
||||
|
||||
from channels.layers import get_channel_layer
|
||||
|
||||
from django.conf import settings
|
||||
from django.apps import apps
|
||||
|
||||
import psycopg
|
||||
|
||||
from awx.main.analytics.broadcast_websocket import (
|
||||
RelayWebsocketStats,
|
||||
RelayWebsocketStatsManager,
|
||||
)
|
||||
import awx.main.analytics.subsystem_metrics as s_metrics
|
||||
|
||||
logger = logging.getLogger('awx.main.wsrelay')
|
||||
|
||||
|
||||
def wrap_broadcast_msg(group, message: str):
|
||||
# TODO: Maybe wrap as "group","message" so that we don't need to
|
||||
# encode/decode as json.
|
||||
return dict(group=group, message=message)
|
||||
|
||||
|
||||
def get_local_host():
|
||||
Instance = apps.get_model('main', 'Instance')
|
||||
return Instance.objects.my_hostname()
|
||||
|
||||
|
||||
class WebsocketRelayConnection:
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
stats: RelayWebsocketStats,
|
||||
remote_host: str,
|
||||
remote_port: int = settings.BROADCAST_WEBSOCKET_PORT,
|
||||
protocol: str = settings.BROADCAST_WEBSOCKET_PROTOCOL,
|
||||
verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
|
||||
):
|
||||
self.name = name
|
||||
self.event_loop = asyncio.get_event_loop()
|
||||
self.stats = stats
|
||||
self.remote_host = remote_host
|
||||
self.remote_port = remote_port
|
||||
self.protocol = protocol
|
||||
self.verify_ssl = verify_ssl
|
||||
self.channel_layer = None
|
||||
self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
|
||||
self.producers = dict()
|
||||
self.connected = False
|
||||
|
||||
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
|
||||
raise RuntimeError("Implement me")
|
||||
|
||||
async def connect(self):
|
||||
from awx.main.consumers import WebsocketSecretAuthHelper # noqa
|
||||
|
||||
logger.debug(f"Connection attempt from {self.name} to {self.remote_host}")
|
||||
|
||||
'''
|
||||
Can not put get_channel_layer() in the init code because it is in the init
|
||||
path of channel layers i.e. RedisChannelLayer() calls our init code.
|
||||
'''
|
||||
if not self.channel_layer:
|
||||
self.channel_layer = get_channel_layer()
|
||||
|
||||
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/"
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
|
||||
secret_val = WebsocketSecretAuthHelper.construct_secret()
|
||||
try:
|
||||
async with aiohttp.ClientSession(headers={'secret': secret_val}, timeout=timeout) as session:
|
||||
async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket:
|
||||
logger.info(f"Connection from {self.name} to {self.remote_host} established.")
|
||||
self.stats.record_connection_established()
|
||||
self.connected = True
|
||||
await self.run_connection(websocket)
|
||||
except asyncio.CancelledError:
|
||||
# TODO: Check if connected and disconnect
|
||||
# Possibly use run_until_complete() if disconnect is async
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.")
|
||||
except client_exceptions.ClientConnectorError as e:
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} timed out.")
|
||||
except Exception as e:
|
||||
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
|
||||
logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True)
|
||||
else:
|
||||
logger.debug(f"Connection from {self.name} to {self.remote_host} lost, but no exception was raised.")
|
||||
finally:
|
||||
self.connected = False
|
||||
self.stats.record_connection_lost()
|
||||
|
||||
def start(self):
|
||||
self.async_task = self.event_loop.create_task(self.connect())
|
||||
return self.async_task
|
||||
|
||||
def cancel(self):
|
||||
self.async_task.cancel()
|
||||
|
||||
async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse):
|
||||
# create a dedicated subsystem metric producer to handle local subsystem
|
||||
# metrics messages
|
||||
# the "metrics" group is not subscribed to in the typical fashion, so we
|
||||
# just explicitly create it
|
||||
producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics"))
|
||||
self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}}
|
||||
async for msg in websocket:
|
||||
self.stats.record_message_received()
|
||||
|
||||
if msg.type == aiohttp.WSMsgType.ERROR:
|
||||
break
|
||||
elif msg.type == aiohttp.WSMsgType.TEXT:
|
||||
try:
|
||||
payload = json.loads(msg.data)
|
||||
except json.JSONDecodeError:
|
||||
logmsg = "Failed to decode message from web node"
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logmsg = "{} {}".format(logmsg, payload)
|
||||
logger.warning(logmsg)
|
||||
continue
|
||||
|
||||
if payload.get("type") == "consumer.subscribe":
|
||||
for group in payload['groups']:
|
||||
name = f"{self.remote_host}-{group}"
|
||||
origin_channel = payload['origin_channel']
|
||||
if not self.producers.get(name):
|
||||
producer = self.event_loop.create_task(self.run_producer(name, websocket, group))
|
||||
self.producers[name] = {"task": producer, "subscriptions": {origin_channel}}
|
||||
logger.debug(f"Producer {name} started.")
|
||||
else:
|
||||
self.producers[name]["subscriptions"].add(origin_channel)
|
||||
logger.debug(f"Connection from {self.name} to {self.remote_host} added subscription to {group}.")
|
||||
|
||||
if payload.get("type") == "consumer.unsubscribe":
|
||||
for group in payload['groups']:
|
||||
name = f"{self.remote_host}-{group}"
|
||||
origin_channel = payload['origin_channel']
|
||||
try:
|
||||
self.producers[name]["subscriptions"].remove(origin_channel)
|
||||
logger.debug(f"Unsubscribed {origin_channel} from {name}")
|
||||
except KeyError:
|
||||
logger.warning(f"Producer {name} not found.")
|
||||
|
||||
async def run_producer(self, name, websocket, group):
|
||||
try:
|
||||
logger.info(f"Starting producer for {name}")
|
||||
|
||||
consumer_channel = await self.channel_layer.new_channel()
|
||||
await self.channel_layer.group_add(group, consumer_channel)
|
||||
logger.debug(f"Producer {name} added to group {group} and is now awaiting messages.")
|
||||
|
||||
while True:
|
||||
try:
|
||||
msg = await asyncio.wait_for(self.channel_layer.receive(consumer_channel), timeout=10)
|
||||
if not msg.get("needs_relay"):
|
||||
# This is added in by emit_channel_notification(). It prevents us from looping
|
||||
# in the event that we are sharing a redis with a web instance. We'll see the
|
||||
# message once (it'll have needs_relay=True), we'll delete that, and then forward
|
||||
# the message along. The web instance will add it back to the same channels group,
|
||||
# but it won't have needs_relay=True, so we'll ignore it.
|
||||
continue
|
||||
|
||||
# We need to copy the message because we're going to delete the needs_relay key
|
||||
# and we don't want to modify the original message because other producers may
|
||||
# still need to act on it. It seems weird, but it's necessary.
|
||||
msg = dict(msg)
|
||||
del msg["needs_relay"]
|
||||
except asyncio.TimeoutError:
|
||||
current_subscriptions = self.producers[name]["subscriptions"]
|
||||
if len(current_subscriptions) == 0:
|
||||
logger.info(f"Producer {name} has no subscribers, shutting down.")
|
||||
return
|
||||
|
||||
continue
|
||||
|
||||
await websocket.send_json(wrap_broadcast_msg(group, msg))
|
||||
except ConnectionResetError:
|
||||
# This can be hit when a web node is scaling down and we try to write to it.
|
||||
# There's really nothing to do in this case and it's a fairly typical thing to happen.
|
||||
# We'll log it as debug, but it's not really a problem.
|
||||
logger.debug(f"Producer {name} connection reset.")
|
||||
pass
|
||||
except Exception:
|
||||
# Note, this is very intentional and important since we do not otherwise
|
||||
# ever check the result of this future. Without this line you will not see an error if
|
||||
# something goes wrong in here.
|
||||
logger.exception(f"Event relay producer {name} crashed")
|
||||
finally:
|
||||
await self.channel_layer.group_discard(group, consumer_channel)
|
||||
del self.producers[name]
|
||||
|
||||
|
||||
class WebSocketRelayManager(object):
|
||||
def __init__(self):
|
||||
self.local_hostname = get_local_host()
|
||||
self.relay_connections = dict()
|
||||
# hostname -> ip
|
||||
self.known_hosts: Dict[str, str] = dict()
|
||||
|
||||
async def pg_consumer(self, conn):
|
||||
try:
|
||||
await conn.execute("LISTEN web_heartbeet")
|
||||
async for notif in conn.notifies():
|
||||
if notif is not None and notif.channel == "web_heartbeet":
|
||||
try:
|
||||
payload = json.loads(notif.payload)
|
||||
except json.JSONDecodeError:
|
||||
logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`"
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logmsg = "{} {}".format(logmsg, payload)
|
||||
logger.warning(logmsg)
|
||||
continue
|
||||
|
||||
# Skip if the message comes from the same host we are running on
|
||||
# In this case, we'll be sharing a redis, no need to relay.
|
||||
if payload.get("hostname") == self.local_hostname:
|
||||
continue
|
||||
|
||||
if payload.get("action") == "online":
|
||||
hostname = payload["hostname"]
|
||||
ip = payload["ip"]
|
||||
if ip is None:
|
||||
# If we don't get an IP, just try the hostname, maybe it resolves
|
||||
ip = hostname
|
||||
self.known_hosts[hostname] = ip
|
||||
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
|
||||
elif payload.get("action") == "offline":
|
||||
hostname = payload["hostname"]
|
||||
del self.known_hosts[hostname]
|
||||
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
|
||||
except Exception as e:
|
||||
# This catch-all is the same as the one above. asyncio will eat the exception
|
||||
# but we want to know about it.
|
||||
logger.exception(f"pg_consumer exception: {e}")
|
||||
|
||||
async def run(self):
|
||||
event_loop = asyncio.get_running_loop()
|
||||
|
||||
stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname)
|
||||
stats_mgr.start()
|
||||
|
||||
# Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully.
|
||||
database_conf = settings.DATABASES['default']
|
||||
async_conn = await psycopg.AsyncConnection.connect(
|
||||
dbname=database_conf['NAME'],
|
||||
host=database_conf['HOST'],
|
||||
user=database_conf['USER'],
|
||||
password=database_conf['PASSWORD'],
|
||||
port=database_conf['PORT'],
|
||||
**database_conf.get("OPTIONS", {}),
|
||||
)
|
||||
await async_conn.set_autocommit(True)
|
||||
event_loop.create_task(self.pg_consumer(async_conn))
|
||||
|
||||
# Establishes a websocket connection to /websocket/relay on all API servers
|
||||
while True:
|
||||
# logger.info("Current known hosts: {}".format(self.known_hosts))
|
||||
future_remote_hosts = self.known_hosts.keys()
|
||||
current_remote_hosts = self.relay_connections.keys()
|
||||
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
|
||||
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
|
||||
|
||||
# This loop handles if we get an advertisement from a host we already know about but
|
||||
# the advertisement has a different IP than we are currently connected to.
|
||||
for hostname, address in self.known_hosts.items():
|
||||
if hostname not in self.relay_connections:
|
||||
# We've picked up a new hostname that we don't know about yet.
|
||||
continue
|
||||
|
||||
if address != self.relay_connections[hostname].remote_host:
|
||||
deleted_remote_hosts.add(hostname)
|
||||
new_remote_hosts.add(hostname)
|
||||
|
||||
# Delete any hosts with closed connections
|
||||
for hostname, relay_conn in self.relay_connections.items():
|
||||
if not relay_conn.connected:
|
||||
deleted_remote_hosts.add(hostname)
|
||||
|
||||
if deleted_remote_hosts:
|
||||
logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list")
|
||||
|
||||
if new_remote_hosts:
|
||||
logger.info(f"Adding {new_remote_hosts} to websocket broadcast list")
|
||||
|
||||
for h in deleted_remote_hosts:
|
||||
self.relay_connections[h].cancel()
|
||||
del self.relay_connections[h]
|
||||
del self.known_hosts[h]
|
||||
stats_mgr.delete_remote_host_stats(h)
|
||||
|
||||
for h in new_remote_hosts:
|
||||
stats = stats_mgr.new_remote_host_stats(h)
|
||||
relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])
|
||||
relay_connection.start()
|
||||
self.relay_connections[h] = relay_connection
|
||||
|
||||
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
|
||||
@@ -228,6 +228,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000
|
||||
# The number of job events to migrate per-transaction when moving from int -> bigint
|
||||
JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000
|
||||
|
||||
# The prefix of the redis key that stores metrics
|
||||
SUBSYSTEM_METRICS_REDIS_KEY_PREFIX = "awx_metrics"
|
||||
|
||||
# Histogram buckets for the callback_receiver_batch_events_insert_db metric
|
||||
SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000]
|
||||
|
||||
@@ -472,6 +475,7 @@ CELERYBEAT_SCHEDULE = {
|
||||
'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)},
|
||||
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},
|
||||
'cleanup_images': {'task': 'awx.main.tasks.system.cleanup_images_and_files', 'schedule': timedelta(hours=3)},
|
||||
'cleanup_host_metrics': {'task': 'awx.main.tasks.system.cleanup_host_metrics', 'schedule': timedelta(days=1)},
|
||||
}
|
||||
|
||||
# Django Caching Configuration
|
||||
@@ -789,6 +793,7 @@ INSIGHTS_URL_BASE = "https://example.org"
|
||||
INSIGHTS_AGENT_MIME = 'application/example'
|
||||
# See https://github.com/ansible/awx-facts-playbooks
|
||||
INSIGHTS_SYSTEM_ID_FILE = '/etc/redhat-access-insights/machine-id'
|
||||
INSIGHTS_CERT_PATH = "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"
|
||||
|
||||
TOWER_SETTINGS_MANIFEST = {}
|
||||
|
||||
@@ -866,7 +871,10 @@ LOGGING = {
|
||||
'awx.main.commands.run_callback_receiver': {'handlers': ['callback_receiver']}, # level handled by dynamic_level_filter
|
||||
'awx.main.dispatch': {'handlers': ['dispatcher']},
|
||||
'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'},
|
||||
'awx.main.wsbroadcast': {'handlers': ['wsbroadcast']},
|
||||
'awx.main.rsyslog_configurer': {'handlers': ['rsyslog_configurer']},
|
||||
'awx.main.cache_clear': {'handlers': ['cache_clear']},
|
||||
'awx.main.heartbeet': {'handlers': ['heartbeet']},
|
||||
'awx.main.wsrelay': {'handlers': ['wsrelay']},
|
||||
'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False},
|
||||
'awx.main.tasks': {'handlers': ['task_system', 'external_logger'], 'propagate': False},
|
||||
'awx.main.analytics': {'handlers': ['task_system', 'external_logger'], 'level': 'INFO', 'propagate': False},
|
||||
@@ -875,7 +883,7 @@ LOGGING = {
|
||||
'awx.main.signals': {'level': 'INFO'}, # very verbose debug-level logs
|
||||
'awx.api.permissions': {'level': 'INFO'}, # very verbose debug-level logs
|
||||
'awx.analytics': {'handlers': ['external_logger'], 'level': 'INFO', 'propagate': False},
|
||||
'awx.analytics.broadcast_websocket': {'handlers': ['console', 'file', 'wsbroadcast', 'external_logger'], 'level': 'INFO', 'propagate': False},
|
||||
'awx.analytics.broadcast_websocket': {'handlers': ['console', 'file', 'wsrelay', 'external_logger'], 'level': 'INFO', 'propagate': False},
|
||||
'awx.analytics.performance': {'handlers': ['console', 'file', 'tower_warnings', 'external_logger'], 'level': 'DEBUG', 'propagate': False},
|
||||
'awx.analytics.job_lifecycle': {'handlers': ['console', 'job_lifecycle'], 'level': 'DEBUG', 'propagate': False},
|
||||
'django_auth_ldap': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
|
||||
@@ -893,10 +901,13 @@ handler_config = {
|
||||
'tower_warnings': {'filename': 'tower.log'},
|
||||
'callback_receiver': {'filename': 'callback_receiver.log'},
|
||||
'dispatcher': {'filename': 'dispatcher.log', 'formatter': 'dispatcher'},
|
||||
'wsbroadcast': {'filename': 'wsbroadcast.log'},
|
||||
'wsrelay': {'filename': 'wsrelay.log'},
|
||||
'task_system': {'filename': 'task_system.log'},
|
||||
'rbac_migrations': {'filename': 'tower_rbac_migrations.log'},
|
||||
'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'},
|
||||
'rsyslog_configurer': {'filename': 'rsyslog_configurer.log'},
|
||||
'cache_clear': {'filename': 'cache_clear.log'},
|
||||
'heartbeet': {'filename': 'heartbeet.log'},
|
||||
}
|
||||
|
||||
# If running on a VM, we log to files. When running in a container, we log to stdout.
|
||||
@@ -1007,6 +1018,9 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10
|
||||
# How often websocket process will generate stats
|
||||
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5
|
||||
|
||||
# How often should web instances advertise themselves?
|
||||
BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS = 15
|
||||
|
||||
DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}
|
||||
|
||||
# Name of the default task queue
|
||||
@@ -1040,3 +1054,10 @@ UI_NEXT = True
|
||||
# - '': No model - Subscription not counted from Host Metrics
|
||||
# - 'unique_managed_hosts': Compliant = automated - deleted hosts (using /api/v2/host_metrics/)
|
||||
SUBSCRIPTION_USAGE_MODEL = ''
|
||||
|
||||
# Host metrics cleanup - last time of the cleanup run (soft-deleting records)
|
||||
CLEANUP_HOST_METRICS_LAST_TS = None
|
||||
# Host metrics cleanup - minimal interval between two cleanups in days
|
||||
CLEANUP_HOST_METRICS_INTERVAL = 30 # days
|
||||
# Host metrics cleanup - soft-delete HostMetric records with last_automation < [threshold] (in months)
|
||||
CLEANUP_HOST_METRICS_THRESHOLD = 12 # months
|
||||
|
||||
@@ -113,6 +113,9 @@ function ScheduleEdit({
|
||||
days: values.daysToKeep,
|
||||
});
|
||||
} else {
|
||||
if (typeof requestData.extra_data === 'string') {
|
||||
requestData.extra_data = JSON.parse(requestData.extra_data);
|
||||
}
|
||||
requestData.extra_data.days = values.daysToKeep;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ const HelperText = styled(PFHelperText)`
|
||||
`;
|
||||
|
||||
function SubscriptionDetail() {
|
||||
const { me = {}, license_info, version } = useConfig();
|
||||
const { me = {}, license_info, version, systemConfig } = useConfig();
|
||||
const baseURL = '/settings/subscription';
|
||||
const tabsArray = [
|
||||
{
|
||||
@@ -56,35 +56,38 @@ function SubscriptionDetail() {
|
||||
<RoutedTabs tabsArray={tabsArray} />
|
||||
<CardBody>
|
||||
<DetailList>
|
||||
<Detail
|
||||
dataCy="subscription-status"
|
||||
label={t`Status`}
|
||||
value={
|
||||
license_info.compliant ? (
|
||||
<>
|
||||
<Label variant="outline" color="green" icon={<CheckIcon />}>
|
||||
{t`Compliant`}
|
||||
</Label>
|
||||
<HelperText>
|
||||
<HelperTextItem>{t`The number of hosts you have automated against is below your subscription count.`}</HelperTextItem>
|
||||
</HelperText>
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
<Label
|
||||
variant="outline"
|
||||
color="red"
|
||||
icon={<ExclamationCircleIcon />}
|
||||
>
|
||||
{t`Out of compliance`}
|
||||
</Label>
|
||||
<HelperText>
|
||||
<HelperTextItem>{t`You have automated against more hosts than your subscription allows.`}</HelperTextItem>
|
||||
</HelperText>
|
||||
</>
|
||||
)
|
||||
}
|
||||
/>
|
||||
{systemConfig?.SUBSCRIPTION_USAGE_MODEL ===
|
||||
'unique_managed_hosts' && (
|
||||
<Detail
|
||||
dataCy="subscription-status"
|
||||
label={t`Status`}
|
||||
value={
|
||||
license_info.compliant ? (
|
||||
<>
|
||||
<Label variant="outline" color="green" icon={<CheckIcon />}>
|
||||
{t`Compliant`}
|
||||
</Label>
|
||||
<HelperText>
|
||||
<HelperTextItem>{t`The number of hosts you have automated against is below your subscription count.`}</HelperTextItem>
|
||||
</HelperText>
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
<Label
|
||||
variant="outline"
|
||||
color="red"
|
||||
icon={<ExclamationCircleIcon />}
|
||||
>
|
||||
{t`Out of compliance`}
|
||||
</Label>
|
||||
<HelperText>
|
||||
<HelperTextItem>{t`You have automated against more hosts than your subscription allows.`}</HelperTextItem>
|
||||
</HelperText>
|
||||
</>
|
||||
)
|
||||
}
|
||||
/>
|
||||
)}
|
||||
{typeof automatedInstancesCount !== 'undefined' &&
|
||||
automatedInstancesCount !== null && (
|
||||
<Detail
|
||||
@@ -107,11 +110,30 @@ function SubscriptionDetail() {
|
||||
label={t`Hosts imported`}
|
||||
value={license_info.current_instances}
|
||||
/>
|
||||
<Detail
|
||||
dataCy="subscription-hosts-remaining"
|
||||
label={t`Hosts remaining`}
|
||||
value={license_info.free_instances}
|
||||
/>
|
||||
{systemConfig?.SUBSCRIPTION_USAGE_MODEL ===
|
||||
'unique_managed_hosts' && (
|
||||
<Detail
|
||||
dataCy="subscription-hosts-remaining"
|
||||
label={t`Hosts remaining`}
|
||||
value={license_info.free_instances}
|
||||
/>
|
||||
)}
|
||||
{systemConfig?.SUBSCRIPTION_USAGE_MODEL ===
|
||||
'unique_managed_hosts' && (
|
||||
<Detail
|
||||
dataCy="subscription-hosts-deleted"
|
||||
label={t`Hosts deleted`}
|
||||
value={license_info.deleted_instances}
|
||||
/>
|
||||
)}
|
||||
{systemConfig?.SUBSCRIPTION_USAGE_MODEL ===
|
||||
'unique_managed_hosts' && (
|
||||
<Detail
|
||||
dataCy="subscription-hosts-reactivated"
|
||||
label={t`Active hosts previously deleted`}
|
||||
value={license_info.reactivated_instances}
|
||||
/>
|
||||
)}
|
||||
{license_info.instance_count < 9999999 && (
|
||||
<Detail
|
||||
dataCy="subscription-hosts-available"
|
||||
|
||||
@@ -31,6 +31,9 @@ const config = {
|
||||
trial: false,
|
||||
valid_key: true,
|
||||
},
|
||||
systemConfig: {
|
||||
SUBSCRIPTION_USAGE_MODEL: 'unique_managed_hosts',
|
||||
},
|
||||
};
|
||||
|
||||
describe('<SubscriptionDetail />', () => {
|
||||
|
||||
Reference in New Issue
Block a user