Files
awx/awx/api/views/root.py
Rodrigo Toshiaki Horie acf8721a09 Enhance OpenAPI schema with AI descriptions and fix method names (#16228)
* Enhance OpenAPI schema with AI descriptions and fix method names

Add x-ai-description extensions to API endpoints for better AI agent
comprehension. Fix view method names to
ensure proper drf-spectacular schema generation.

* Enhance OpenAPI schema with AI descriptions and fix method names

Add x-ai-description extensions to API endpoints for better AI agent
comprehension. Fix view method names to
ensure proper drf-spectacular schema generation.
2026-01-21 16:53:19 -03:00

426 lines
21 KiB
Python

# Copyright (c) 2018 Ansible, Inc.
# All Rights Reserved.
import base64
import json
import logging
import operator
from collections import OrderedDict
from django.conf import settings
from django.core.cache import cache
from django.db import connection
from django.utils.encoding import smart_str
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import ensure_csrf_cookie
from django.template.loader import render_to_string
from django.utils.translation import gettext_lazy as _
from django.urls import reverse as django_reverse
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
import requests
from ansible_base.lib.utils.schema import extend_schema_if_available
from awx import MODE
from awx.api.generics import APIView
from awx.conf.registry import settings_registry
from awx.main.analytics import all_collectors
from awx.main.ha import is_ha_environment
from awx.main.tasks.system import clear_setting_cache
from awx.main.utils import get_awx_version, get_custom_venv_choices
from awx.main.utils.licensing import validate_entitlement_manifest
from awx.api.versioning import URLPathVersioning, reverse, drf_reverse
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS
from awx.main.models import Project, Organization, Instance, InstanceGroup, JobTemplate
from awx.main.utils import set_environ
from awx.main.utils.analytics_proxy import TokenError
from awx.main.utils.licensing import get_licenser
logger = logging.getLogger('awx.api.views.root')
class ApiRootView(APIView):
permission_classes = (AllowAny,)
name = _('REST API')
versioning_class = URLPathVersioning
swagger_topic = 'Versioning'
resource_purpose = 'api root and version information'
@method_decorator(ensure_csrf_cookie)
@extend_schema_if_available(extensions={"x-ai-description": "List supported API versions"})
def get(self, request, format=None):
'''List supported API versions'''
v2 = reverse('api:api_v2_root_view', request=request, kwargs={'version': 'v2'})
data = OrderedDict()
data['description'] = _('AWX REST API')
data['current_version'] = v2
data['available_versions'] = dict(v2=v2)
data['custom_logo'] = settings.CUSTOM_LOGO
data['custom_login_info'] = settings.CUSTOM_LOGIN_INFO
data['login_redirect_override'] = settings.LOGIN_REDIRECT_OVERRIDE
if MODE == 'development':
data['docs'] = drf_reverse('api:schema-swagger-ui')
return Response(data)
class ApiVersionRootView(APIView):
permission_classes = (AllowAny,)
swagger_topic = 'Versioning'
resource_purpose = 'api top-level resources'
@extend_schema_if_available(extensions={"x-ai-description": "List top-level API resources"})
def get(self, request, format=None):
'''List top level resources'''
data = OrderedDict()
data['ping'] = reverse('api:api_v2_ping_view', request=request)
data['instances'] = reverse('api:instance_list', request=request)
data['instance_groups'] = reverse('api:instance_group_list', request=request)
data['receptor_addresses'] = reverse('api:receptor_addresses_list', request=request)
data['config'] = reverse('api:api_v2_config_view', request=request)
data['settings'] = reverse('api:setting_category_list', request=request)
data['me'] = reverse('api:user_me_list', request=request)
data['dashboard'] = reverse('api:dashboard_view', request=request)
data['organizations'] = reverse('api:organization_list', request=request)
data['users'] = reverse('api:user_list', request=request)
data['execution_environments'] = reverse('api:execution_environment_list', request=request)
data['projects'] = reverse('api:project_list', request=request)
data['project_updates'] = reverse('api:project_update_list', request=request)
data['teams'] = reverse('api:team_list', request=request)
data['credentials'] = reverse('api:credential_list', request=request)
data['credential_types'] = reverse('api:credential_type_list', request=request)
data['credential_input_sources'] = reverse('api:credential_input_source_list', request=request)
data['metrics'] = reverse('api:metrics_view', request=request)
data['inventory'] = reverse('api:inventory_list', request=request)
data['constructed_inventory'] = reverse('api:constructed_inventory_list', request=request)
data['inventory_sources'] = reverse('api:inventory_source_list', request=request)
data['inventory_updates'] = reverse('api:inventory_update_list', request=request)
data['groups'] = reverse('api:group_list', request=request)
data['hosts'] = reverse('api:host_list', request=request)
data['host_metrics'] = reverse('api:host_metric_list', request=request)
data['host_metric_summary_monthly'] = reverse('api:host_metric_summary_monthly_list', request=request)
data['job_templates'] = reverse('api:job_template_list', request=request)
data['jobs'] = reverse('api:job_list', request=request)
data['ad_hoc_commands'] = reverse('api:ad_hoc_command_list', request=request)
data['system_job_templates'] = reverse('api:system_job_template_list', request=request)
data['system_jobs'] = reverse('api:system_job_list', request=request)
data['schedules'] = reverse('api:schedule_list', request=request)
data['roles'] = reverse('api:role_list', request=request)
data['notification_templates'] = reverse('api:notification_template_list', request=request)
data['notifications'] = reverse('api:notification_list', request=request)
data['labels'] = reverse('api:label_list', request=request)
data['unified_job_templates'] = reverse('api:unified_job_template_list', request=request)
data['unified_jobs'] = reverse('api:unified_job_list', request=request)
data['activity_stream'] = reverse('api:activity_stream_list', request=request)
data['workflow_job_templates'] = reverse('api:workflow_job_template_list', request=request)
data['workflow_jobs'] = reverse('api:workflow_job_list', request=request)
data['workflow_approvals'] = reverse('api:workflow_approval_list', request=request)
data['workflow_job_template_nodes'] = reverse('api:workflow_job_template_node_list', request=request)
data['workflow_job_nodes'] = reverse('api:workflow_job_node_list', request=request)
data['mesh_visualizer'] = reverse('api:mesh_visualizer_view', request=request)
data['bulk'] = reverse('api:bulk', request=request)
data['analytics'] = reverse('api:analytics_root_view', request=request)
data['service_index'] = django_reverse('service-index-root')
data['role_definitions'] = django_reverse('roledefinition-list')
data['role_user_assignments'] = django_reverse('roleuserassignment-list')
data['role_team_assignments'] = django_reverse('roleteamassignment-list')
return Response(data)
class ApiV2RootView(ApiVersionRootView):
name = _('Version 2')
resource_purpose = 'api v2 root'
class ApiV2PingView(APIView):
"""A simple view that reports very basic information about this
instance, which is acceptable to be public information.
"""
permission_classes = (AllowAny,)
authentication_classes = ()
name = _('Ping')
swagger_topic = 'System Configuration'
resource_purpose = 'basic instance information'
@extend_schema_if_available(
extensions={'x-ai-description': 'Return basic information about this instance'},
)
def get(self, request, format=None):
"""Return some basic information about this instance
Everything returned here should be considered public / insecure, as
this requires no auth and is intended for use by the installer process.
"""
response = {'ha': is_ha_environment(), 'version': get_awx_version(), 'active_node': settings.CLUSTER_HOST_ID, 'install_uuid': settings.INSTALL_UUID}
response['instances'] = []
for instance in Instance.objects.exclude(node_type='hop'):
response['instances'].append(
dict(
node=instance.hostname,
node_type=instance.node_type,
uuid=instance.uuid,
heartbeat=instance.last_seen,
capacity=instance.capacity,
version=instance.version,
)
)
response['instances'] = sorted(response['instances'], key=operator.itemgetter('node'))
response['instance_groups'] = []
for instance_group in InstanceGroup.objects.prefetch_related('instances'):
response['instance_groups'].append(
dict(name=instance_group.name, capacity=instance_group.capacity, instances=[x.hostname for x in instance_group.instances.all()])
)
response['instance_groups'] = sorted(response['instance_groups'], key=lambda x: x['name'].lower())
return Response(response)
class ApiV2SubscriptionView(APIView):
permission_classes = (IsAuthenticated,)
name = _('Subscriptions')
swagger_topic = 'System Configuration'
resource_purpose = 'aap subscription validation'
def check_permissions(self, request):
super(ApiV2SubscriptionView, self).check_permissions(request)
if not request.user.is_superuser and request.method.lower() not in {'options', 'head'}:
self.permission_denied(request) # Raises PermissionDenied exception.
@extend_schema_if_available(
extensions={'x-ai-description': 'List valid AAP subscriptions'},
)
def post(self, request):
data = request.data.copy()
try:
user = None
pw = None
basic_auth = False
# determine if the credentials are for basic auth or not
if data.get('subscriptions_client_id'):
user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret')
if pw == '$encrypted$':
pw = settings.SUBSCRIPTIONS_CLIENT_SECRET
elif data.get('subscriptions_username'):
user, pw = data.get('subscriptions_username'), data.get('subscriptions_password')
if pw == '$encrypted$':
pw = settings.SUBSCRIPTIONS_PASSWORD
basic_auth = True
if not user or not pw:
return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST)
with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw, basic_auth)
# update settings if the credentials were valid
if basic_auth:
if user:
settings.SUBSCRIPTIONS_USERNAME = user
if pw:
settings.SUBSCRIPTIONS_PASSWORD = pw
# mutual exclusion for basic auth and service account
# only one should be set at a given time so that
# config/attach/ knows which credentials to use
settings.SUBSCRIPTIONS_CLIENT_ID = ""
settings.SUBSCRIPTIONS_CLIENT_SECRET = ""
else:
if user:
settings.SUBSCRIPTIONS_CLIENT_ID = user
if pw:
settings.SUBSCRIPTIONS_CLIENT_SECRET = pw
# mutual exclusion for basic auth and service account
settings.SUBSCRIPTIONS_USERNAME = ""
settings.SUBSCRIPTIONS_PASSWORD = ""
except Exception as exc:
msg = _("Invalid Subscription")
if isinstance(exc, TokenError) or (
isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401
):
msg = _("The provided credentials are invalid (HTTP 401).")
elif isinstance(exc, requests.exceptions.ProxyError):
msg = _("Unable to connect to proxy server.")
elif isinstance(exc, requests.exceptions.ConnectionError):
msg = _("Could not connect to subscription service.")
elif isinstance(exc, (ValueError, OSError)) and exc.args:
msg = exc.args[0]
else:
logger.exception(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": msg}, status=status.HTTP_400_BAD_REQUEST)
return Response(validated)
class ApiV2AttachView(APIView):
permission_classes = (IsAuthenticated,)
name = _('Attach Subscription')
swagger_topic = 'System Configuration'
resource_purpose = 'subscription attachment'
def check_permissions(self, request):
super(ApiV2AttachView, self).check_permissions(request)
if not request.user.is_superuser and request.method.lower() not in {'options', 'head'}:
self.permission_denied(request) # Raises PermissionDenied exception.
@extend_schema_if_available(
extensions={'x-ai-description': 'Attach a subscription'},
)
def post(self, request):
data = request.data.copy()
subscription_id = data.get('subscription_id', None)
if not subscription_id:
return Response({"error": _("No subscription ID provided.")}, status=status.HTTP_400_BAD_REQUEST)
# Ensure we always use the latest subscription credentials
cache.delete_many(['SUBSCRIPTIONS_CLIENT_ID', 'SUBSCRIPTIONS_CLIENT_SECRET', 'SUBSCRIPTIONS_USERNAME', 'SUBSCRIPTIONS_PASSWORD'])
user = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None)
pw = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None)
basic_auth = False
if not (user and pw):
user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None)
pw = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None)
basic_auth = True
if not (user and pw):
return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST)
if subscription_id and user and pw:
data = request.data.copy()
try:
with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw, basic_auth)
except Exception as exc:
msg = _("Invalid Subscription")
if isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401:
msg = _("The provided credentials are invalid (HTTP 401).")
elif isinstance(exc, requests.exceptions.ProxyError):
msg = _("Unable to connect to proxy server.")
elif isinstance(exc, requests.exceptions.ConnectionError):
msg = _("Could not connect to subscription service.")
elif isinstance(exc, (ValueError, OSError)) and exc.args:
msg = exc.args[0]
else:
logger.exception(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": msg}, status=status.HTTP_400_BAD_REQUEST)
for sub in validated:
if sub['subscription_id'] == subscription_id:
sub['valid_key'] = True
settings.LICENSE = sub
connection.on_commit(lambda: clear_setting_cache.delay(['LICENSE']))
return Response(sub)
return Response({"error": _("Error processing subscription metadata.")}, status=status.HTTP_400_BAD_REQUEST)
class ApiV2ConfigView(APIView):
permission_classes = (IsAuthenticated,)
name = _('Configuration')
swagger_topic = 'System Configuration'
resource_purpose = 'system configuration and license management'
def check_permissions(self, request):
super(ApiV2ConfigView, self).check_permissions(request)
if not request.user.is_superuser and request.method.lower() not in {'options', 'head', 'get'}:
self.permission_denied(request) # Raises PermissionDenied exception.
@extend_schema_if_available(
extensions={'x-ai-description': 'Return various configuration settings'},
)
def get(self, request, format=None):
'''Return various sitewide configuration settings'''
license_data = get_licenser().validate()
if not license_data.get('valid_key', False):
license_data = {}
pendo_state = settings.PENDO_TRACKING_STATE if settings.PENDO_TRACKING_STATE in ('off', 'anonymous', 'detailed') else 'off'
data = dict(
time_zone=settings.TIME_ZONE,
license_info=license_data,
version=get_awx_version(),
eula=render_to_string("eula.md") if license_data.get('license_type', 'UNLICENSED') != 'open' else '',
analytics_status=pendo_state,
analytics_collectors=all_collectors(),
become_methods=PRIVILEGE_ESCALATION_METHODS,
)
if (
request.user.is_superuser
or request.user.is_system_auditor
or Organization.accessible_objects(request.user, 'admin_role').exists()
or Organization.accessible_objects(request.user, 'auditor_role').exists()
or Organization.accessible_objects(request.user, 'project_admin_role').exists()
):
data.update(
dict(
project_base_dir=settings.PROJECTS_ROOT,
project_local_paths=Project.get_local_path_choices(),
custom_virtualenvs=get_custom_venv_choices(),
)
)
elif JobTemplate.accessible_objects(request.user, 'admin_role').exists():
data['custom_virtualenvs'] = get_custom_venv_choices()
return Response(data)
@extend_schema_if_available(extensions={"x-ai-description": "Add or update a subscription manifest license"})
def post(self, request):
if not isinstance(request.data, dict):
return Response({"error": _("Invalid subscription data")}, status=status.HTTP_400_BAD_REQUEST)
try:
data_actual = json.dumps(request.data)
except Exception:
logger.info(smart_str(u"Invalid JSON submitted for license."), extra=dict(actor=request.user.username))
return Response({"error": _("Invalid JSON")}, status=status.HTTP_400_BAD_REQUEST)
license_data = json.loads(data_actual)
if 'license_key' in license_data:
return Response({"error": _('Legacy license submitted. A subscription manifest is now required.')}, status=status.HTTP_400_BAD_REQUEST)
if 'manifest' in license_data:
try:
json_actual = json.loads(base64.b64decode(license_data['manifest']))
if 'license_key' in json_actual:
return Response({"error": _('Legacy license submitted. A subscription manifest is now required.')}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
pass
try:
license_data = validate_entitlement_manifest(license_data['manifest'])
except ValueError as e:
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
except Exception:
logger.exception('Invalid manifest submitted. {}')
return Response({"error": _('Invalid manifest submitted.')}, status=status.HTTP_400_BAD_REQUEST)
try:
license_data_validated = get_licenser().license_from_manifest(license_data)
connection.on_commit(lambda: clear_setting_cache.delay(['LICENSE']))
except Exception:
logger.warning(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": _("Invalid License")}, status=status.HTTP_400_BAD_REQUEST)
else:
license_data_validated = get_licenser().validate()
# If the license is valid, write it to the database.
if license_data_validated['valid_key']:
if not settings_registry.is_setting_read_only('TOWER_URL_BASE'):
settings.TOWER_URL_BASE = "{}://{}".format(request.scheme, request.get_host())
return Response(license_data_validated)
logger.warning(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": _("Invalid subscription")}, status=status.HTTP_400_BAD_REQUEST)
@extend_schema_if_available(
extensions={'x-ai-description': 'Remove the current subscription'},
)
def delete(self, request):
try:
settings.LICENSE = {}
connection.on_commit(lambda: clear_setting_cache.delay(['LICENSE']))
return Response(status=status.HTTP_204_NO_CONTENT)
except Exception:
# FIX: Log
return Response({"error": _("Failed to remove license.")}, status=status.HTTP_400_BAD_REQUEST)