Files
awx/awx/main/tests/unit/api/test_schema.py
Rodrigo Toshiaki Horie 6a031158ce Fix OpenAPI schema enum values for CredentialType kind field (#16262)
The OpenAPI schema incorrectly showed all 12 credential type kinds as
valid for POST/PUT/PATCH operations, when only 'cloud' and 'net' are
allowed for custom credential types. This caused API clients and LLM
agents to receive HTTP 400 errors when attempting to create credential
types with invalid kind values.

Add postprocessing hook to filter CredentialTypeRequest and
PatchedCredentialTypeRequest schemas to only show 'cloud', 'net',
and null as valid enum values, matching the existing validation logic.

No API behavior changes - this is purely a documentation fix.

Co-authored-by: Claude <noreply@anthropic.com>
2026-02-04 16:39:03 -03:00

425 lines
16 KiB
Python

import copy
import warnings
from unittest.mock import Mock, patch
from rest_framework.permissions import IsAuthenticated
from awx.api.schema import (
CustomAutoSchema,
AuthenticatedSpectacularAPIView,
AuthenticatedSpectacularSwaggerView,
AuthenticatedSpectacularRedocView,
filter_credential_type_schema,
)
class TestCustomAutoSchema:
"""Unit tests for CustomAutoSchema class."""
def test_get_tags_with_swagger_topic(self):
"""Test get_tags returns swagger_topic when available."""
view = Mock()
view.swagger_topic = 'custom_topic'
view.get_serializer = Mock(return_value=Mock())
schema = CustomAutoSchema()
schema.view = view
tags = schema.get_tags()
assert tags == ['Custom_Topic']
def test_get_tags_with_serializer_meta_model(self):
"""Test get_tags returns model verbose_name_plural from serializer."""
# Create a mock model with verbose_name_plural
mock_model = Mock()
mock_model._meta.verbose_name_plural = 'test models'
# Create a mock serializer with Meta.model
mock_serializer = Mock()
mock_serializer.Meta.model = mock_model
view = Mock(spec=[]) # View without swagger_topic
view.get_serializer = Mock(return_value=mock_serializer)
schema = CustomAutoSchema()
schema.view = view
tags = schema.get_tags()
assert tags == ['Test Models']
def test_get_tags_with_view_model(self):
"""Test get_tags returns model verbose_name_plural from view."""
# Create a mock model with verbose_name_plural
mock_model = Mock()
mock_model._meta.verbose_name_plural = 'view models'
view = Mock(spec=['model']) # View without swagger_topic or get_serializer
view.model = mock_model
schema = CustomAutoSchema()
schema.view = view
tags = schema.get_tags()
assert tags == ['View Models']
def test_get_tags_without_get_serializer(self):
"""Test get_tags when view doesn't have get_serializer method."""
mock_model = Mock()
mock_model._meta.verbose_name_plural = 'test objects'
view = Mock(spec=['model'])
view.model = mock_model
schema = CustomAutoSchema()
schema.view = view
tags = schema.get_tags()
assert tags == ['Test Objects']
def test_get_tags_serializer_exception_with_warning(self):
"""Test get_tags handles exception in get_serializer with warning."""
mock_model = Mock()
mock_model._meta.verbose_name_plural = 'fallback models'
view = Mock(spec=['get_serializer', 'model', '__class__'])
view.__class__.__name__ = 'TestView'
view.get_serializer = Mock(side_effect=Exception('Serializer error'))
view.model = mock_model
schema = CustomAutoSchema()
schema.view = view
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
tags = schema.get_tags()
# Check that a warning was raised
assert len(w) == 1
assert 'TestView.get_serializer() raised an exception' in str(w[0].message)
# Should still get tags from view.model
assert tags == ['Fallback Models']
def test_get_tags_serializer_without_meta_model(self):
"""Test get_tags when serializer doesn't have Meta.model."""
mock_serializer = Mock(spec=[]) # No Meta attribute
view = Mock(spec=['get_serializer'])
view.__class__.__name__ = 'NoMetaView'
view.get_serializer = Mock(return_value=mock_serializer)
schema = CustomAutoSchema()
schema.view = view
with patch.object(CustomAutoSchema.__bases__[0], 'get_tags', return_value=['Default Tag']) as mock_super:
tags = schema.get_tags()
mock_super.assert_called_once()
assert tags == ['Default Tag']
def test_get_tags_fallback_to_super(self):
"""Test get_tags falls back to parent class method."""
view = Mock(spec=['get_serializer'])
view.get_serializer = Mock(return_value=Mock(spec=[]))
schema = CustomAutoSchema()
schema.view = view
with patch.object(CustomAutoSchema.__bases__[0], 'get_tags', return_value=['Super Tag']) as mock_super:
tags = schema.get_tags()
mock_super.assert_called_once()
assert tags == ['Super Tag']
def test_get_tags_empty_with_warning(self):
"""Test get_tags returns 'api' fallback when no tags can be determined."""
view = Mock(spec=['get_serializer'])
view.__class__.__name__ = 'EmptyView'
view.get_serializer = Mock(return_value=Mock(spec=[]))
schema = CustomAutoSchema()
schema.view = view
with patch.object(CustomAutoSchema.__bases__[0], 'get_tags', return_value=[]):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
tags = schema.get_tags()
# Check that a warning was raised
assert len(w) == 1
assert 'Could not determine tags for EmptyView' in str(w[0].message)
# Should fallback to 'api'
assert tags == ['api']
def test_get_tags_swagger_topic_title_case(self):
"""Test that swagger_topic is properly title-cased."""
view = Mock()
view.swagger_topic = 'multi_word_topic'
view.get_serializer = Mock(return_value=Mock())
schema = CustomAutoSchema()
schema.view = view
tags = schema.get_tags()
assert tags == ['Multi_Word_Topic']
def test_is_deprecated_true(self):
"""Test is_deprecated returns True when view has deprecated=True."""
view = Mock()
view.deprecated = True
schema = CustomAutoSchema()
schema.view = view
assert schema.is_deprecated() is True
def test_is_deprecated_false(self):
"""Test is_deprecated returns False when view has deprecated=False."""
view = Mock()
view.deprecated = False
schema = CustomAutoSchema()
schema.view = view
assert schema.is_deprecated() is False
def test_is_deprecated_missing_attribute(self):
"""Test is_deprecated returns False when view doesn't have deprecated attribute."""
view = Mock(spec=[])
schema = CustomAutoSchema()
schema.view = view
assert schema.is_deprecated() is False
def test_get_tags_serializer_meta_without_model(self):
"""Test get_tags when serializer has Meta but no model attribute."""
mock_serializer = Mock()
mock_serializer.Meta = Mock(spec=[]) # Meta exists but no model
mock_model = Mock()
mock_model._meta.verbose_name_plural = 'backup models'
view = Mock(spec=['get_serializer', 'model'])
view.get_serializer = Mock(return_value=mock_serializer)
view.model = mock_model
schema = CustomAutoSchema()
schema.view = view
tags = schema.get_tags()
# Should fall back to view.model
assert tags == ['Backup Models']
def test_get_tags_complex_scenario_exception_recovery(self):
"""Test complex scenario where serializer fails but view.model exists."""
mock_model = Mock()
mock_model._meta.verbose_name_plural = 'recovery models'
view = Mock(spec=['get_serializer', 'model', '__class__'])
view.__class__.__name__ = 'ComplexView'
view.get_serializer = Mock(side_effect=ValueError('Invalid serializer'))
view.model = mock_model
schema = CustomAutoSchema()
schema.view = view
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
tags = schema.get_tags()
# Should have warned about the exception
assert len(w) == 1
assert 'ComplexView.get_serializer() raised an exception' in str(w[0].message)
# But still recovered and got tags from view.model
assert tags == ['Recovery Models']
def test_get_tags_priority_order(self):
"""Test that get_tags respects priority: swagger_topic > serializer.Meta.model > view.model."""
# Set up a view with all three options
mock_model_view = Mock()
mock_model_view._meta.verbose_name_plural = 'view models'
mock_model_serializer = Mock()
mock_model_serializer._meta.verbose_name_plural = 'serializer models'
mock_serializer = Mock()
mock_serializer.Meta.model = mock_model_serializer
view = Mock()
view.swagger_topic = 'priority_topic'
view.get_serializer = Mock(return_value=mock_serializer)
view.model = mock_model_view
schema = CustomAutoSchema()
schema.view = view
tags = schema.get_tags()
# swagger_topic should take priority
assert tags == ['Priority_Topic']
class TestAuthenticatedSchemaViews:
"""Unit tests for authenticated schema view classes."""
def test_authenticated_spectacular_api_view_requires_authentication(self):
"""Test that AuthenticatedSpectacularAPIView requires authentication."""
assert IsAuthenticated in AuthenticatedSpectacularAPIView.permission_classes
def test_authenticated_spectacular_swagger_view_requires_authentication(self):
"""Test that AuthenticatedSpectacularSwaggerView requires authentication."""
assert IsAuthenticated in AuthenticatedSpectacularSwaggerView.permission_classes
def test_authenticated_spectacular_redoc_view_requires_authentication(self):
"""Test that AuthenticatedSpectacularRedocView requires authentication."""
assert IsAuthenticated in AuthenticatedSpectacularRedocView.permission_classes
class TestFilterCredentialTypeSchema:
"""Unit tests for filter_credential_type_schema postprocessing hook."""
def test_filters_both_schemas_correctly(self):
"""Test that both CredentialTypeRequest and PatchedCredentialTypeRequest schemas are filtered."""
result = {
'components': {
'schemas': {
'CredentialTypeRequest': {
'properties': {
'kind': {
'enum': [
'ssh',
'vault',
'net',
'scm',
'cloud',
'registry',
'token',
'insights',
'external',
'kubernetes',
'galaxy',
'cryptography',
None,
],
'type': 'string',
}
}
},
'PatchedCredentialTypeRequest': {
'properties': {
'kind': {
'enum': [
'ssh',
'vault',
'net',
'scm',
'cloud',
'registry',
'token',
'insights',
'external',
'kubernetes',
'galaxy',
'cryptography',
None,
],
'type': 'string',
}
}
},
}
}
}
returned = filter_credential_type_schema(result, None, None, None)
# POST/PUT schema: no None (required field)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['description'] == "* `cloud` - Cloud\\n* `net` - Network"
# PATCH schema: includes None (optional field)
assert result['components']['schemas']['PatchedCredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net', None]
assert result['components']['schemas']['PatchedCredentialTypeRequest']['properties']['kind']['description'] == "* `cloud` - Cloud\\n* `net` - Network"
# Other properties should be preserved
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['type'] == 'string'
# Function should return the result
assert returned is result
def test_handles_empty_result(self):
"""Test graceful handling when result dict is empty."""
result = {}
original = copy.deepcopy(result)
returned = filter_credential_type_schema(result, None, None, None)
assert result == original
assert returned is result
def test_handles_missing_enum(self):
"""Test that schemas without enum key are not modified."""
result = {'components': {'schemas': {'CredentialTypeRequest': {'properties': {'kind': {'type': 'string', 'description': 'Some description'}}}}}}
original = copy.deepcopy(result)
filter_credential_type_schema(result, None, None, None)
assert result == original
def test_filters_only_target_schemas(self):
"""Test that only CredentialTypeRequest schemas are modified, not others."""
result = {
'components': {
'schemas': {
'CredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'cloud', 'net', None]}}},
'OtherSchema': {'properties': {'kind': {'enum': ['option1', 'option2']}}},
}
}
}
other_schema_before = copy.deepcopy(result['components']['schemas']['OtherSchema'])
filter_credential_type_schema(result, None, None, None)
# CredentialTypeRequest should be filtered (no None for required field)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
# OtherSchema should be unchanged
assert result['components']['schemas']['OtherSchema'] == other_schema_before
def test_handles_only_one_schema_present(self):
"""Test that function works when only one target schema is present."""
result = {'components': {'schemas': {'CredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'cloud', 'net', None]}}}}}}
filter_credential_type_schema(result, None, None, None)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
def test_handles_missing_properties(self):
"""Test graceful handling when schema has no properties key."""
result = {'components': {'schemas': {'CredentialTypeRequest': {}}}}
original = copy.deepcopy(result)
filter_credential_type_schema(result, None, None, None)
assert result == original
def test_differentiates_required_vs_optional_fields(self):
"""Test that CredentialTypeRequest excludes None but PatchedCredentialTypeRequest includes it."""
result = {
'components': {
'schemas': {
'CredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'vault', 'net', 'scm', 'cloud', 'registry', None]}}},
'PatchedCredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'vault', 'net', 'scm', 'cloud', 'registry', None]}}},
}
}
}
filter_credential_type_schema(result, None, None, None)
# POST/PUT schema: no None (required field)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
# PATCH schema: includes None (optional field)
assert result['components']['schemas']['PatchedCredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net', None]