mirror of
https://github.com/ansible/awx.git
synced 2026-01-10 15:32:07 -03:30
[4.6] Fix Cert base authentication for OPA (#6909)
* Remove unused setting * Fix mTLS auth to OPA server - Workaround https://github.com/Turall/OPA-python-client/issues/32 - Add tests for `opa_cert_file` context manager
This commit is contained in:
parent
4532c627e3
commit
ae0a8a80eb
@ -274,38 +274,91 @@ class OPA_AUTH_TYPES:
|
||||
|
||||
@contextlib.contextmanager
|
||||
def opa_cert_file():
|
||||
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.CERTIFICATE:
|
||||
with tempfile.NamedTemporaryFile(delete=True, mode='w', suffix=".pem") as cert_temp:
|
||||
cert_temp.write(settings.OPA_AUTH_CA_CERT)
|
||||
cert_temp.write("\n")
|
||||
cert_temp.write(settings.OPA_AUTH_CLIENT_CERT)
|
||||
cert_temp.write("\n")
|
||||
cert_temp.write(settings.OPA_AUTH_CLIENT_KEY)
|
||||
cert_temp.write("\n")
|
||||
cert_temp.flush()
|
||||
yield cert_temp.name
|
||||
elif settings.OPA_SSL and settings.OPA_AUTH_CA_CERT:
|
||||
with tempfile.NamedTemporaryFile(delete=True, mode='w', suffix=".pem") as cert_temp:
|
||||
cert_temp.write(settings.OPA_AUTH_CA_CERT)
|
||||
cert_temp.write("\n")
|
||||
cert_temp.flush()
|
||||
yield cert_temp.name
|
||||
else:
|
||||
yield None
|
||||
"""
|
||||
Context manager that creates temporary certificate files for OPA authentication.
|
||||
|
||||
For mTLS (mutual TLS), we need:
|
||||
- Client certificate and key for client authentication
|
||||
- CA certificate (optional) for server verification
|
||||
|
||||
Returns:
|
||||
tuple: (client_cert_path, verify_path)
|
||||
- client_cert_path: Path to client cert file or None if not using client cert
|
||||
- verify_path: Path to CA cert file, True to use system CA store, or False for no verification
|
||||
"""
|
||||
client_cert_temp = None
|
||||
ca_temp = None
|
||||
|
||||
try:
|
||||
# Case 1: Full mTLS with client cert and optional CA cert
|
||||
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.CERTIFICATE:
|
||||
# Create client certificate file (required for mTLS)
|
||||
client_cert_temp = tempfile.NamedTemporaryFile(delete=True, mode='w', suffix=".pem")
|
||||
client_cert_temp.write(settings.OPA_AUTH_CLIENT_CERT)
|
||||
client_cert_temp.write("\n")
|
||||
client_cert_temp.write(settings.OPA_AUTH_CLIENT_KEY)
|
||||
client_cert_temp.write("\n")
|
||||
client_cert_temp.flush()
|
||||
|
||||
# If CA cert is provided, use it for server verification
|
||||
# Otherwise, use system CA store (True)
|
||||
if settings.OPA_AUTH_CA_CERT:
|
||||
ca_temp = tempfile.NamedTemporaryFile(delete=True, mode='w', suffix=".pem")
|
||||
ca_temp.write(settings.OPA_AUTH_CA_CERT)
|
||||
ca_temp.write("\n")
|
||||
ca_temp.flush()
|
||||
verify_path = ca_temp.name
|
||||
else:
|
||||
verify_path = True # Use system CA store
|
||||
|
||||
yield (client_cert_temp.name, verify_path)
|
||||
|
||||
# Case 2: TLS with only server verification (no client cert)
|
||||
elif settings.OPA_SSL:
|
||||
# If CA cert is provided, use it for server verification
|
||||
# Otherwise, use system CA store (True)
|
||||
if settings.OPA_AUTH_CA_CERT:
|
||||
ca_temp = tempfile.NamedTemporaryFile(delete=True, mode='w', suffix=".pem")
|
||||
ca_temp.write(settings.OPA_AUTH_CA_CERT)
|
||||
ca_temp.write("\n")
|
||||
ca_temp.flush()
|
||||
verify_path = ca_temp.name
|
||||
else:
|
||||
verify_path = True # Use system CA store
|
||||
|
||||
yield (None, verify_path)
|
||||
|
||||
# Case 3: No TLS
|
||||
else:
|
||||
yield (None, False)
|
||||
|
||||
finally:
|
||||
# Clean up temporary files
|
||||
if client_cert_temp:
|
||||
client_cert_temp.close()
|
||||
if ca_temp:
|
||||
ca_temp.close()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def opa_client(headers=None):
|
||||
with opa_cert_file() as cert_temp_file_name:
|
||||
with opa_cert_file() as cert_files:
|
||||
cert, verify = cert_files
|
||||
|
||||
with OpaClient(
|
||||
host=settings.OPA_HOST,
|
||||
port=settings.OPA_PORT,
|
||||
headers=headers,
|
||||
ssl=settings.OPA_SSL,
|
||||
cert=cert_temp_file_name,
|
||||
cert=cert,
|
||||
timeout=settings.OPA_REQUEST_TIMEOUT,
|
||||
retries=settings.OPA_REQUEST_RETRIES,
|
||||
) as client:
|
||||
# Workaround for https://github.com/Turall/OPA-python-client/issues/32
|
||||
# by directly setting cert and verify on requests.session
|
||||
client._session.cert = cert
|
||||
client._session.verify = verify
|
||||
|
||||
yield client
|
||||
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
@ -22,7 +23,7 @@ from awx.main.models import (
|
||||
)
|
||||
from awx.main.exceptions import PolicyEvaluationError
|
||||
from awx.main.tasks import policy
|
||||
from awx.main.tasks.policy import JobSerializer
|
||||
from awx.main.tasks.policy import JobSerializer, OPA_AUTH_TYPES
|
||||
|
||||
|
||||
def _parse_exception_message(exception: PolicyEvaluationError):
|
||||
@ -371,3 +372,262 @@ def test_evaluate_policy_failed_exception(opa_client, job):
|
||||
assert exception["Errors"]["Job template"] == opa_failed_exception
|
||||
|
||||
assert opa_client.query_rule.call_count == 3
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize(
|
||||
"settings_kwargs, expected_client_cert, expected_verify, verify_content",
|
||||
[
|
||||
# Case 1: Certificate-based authentication (mTLS)
|
||||
(
|
||||
{
|
||||
"OPA_HOST": "opa.example.com",
|
||||
"OPA_SSL": True,
|
||||
"OPA_AUTH_TYPE": OPA_AUTH_TYPES.CERTIFICATE,
|
||||
"OPA_AUTH_CLIENT_CERT": "-----BEGIN CERTIFICATE-----\nMIICert\n-----END CERTIFICATE-----",
|
||||
"OPA_AUTH_CLIENT_KEY": "-----BEGIN PRIVATE KEY-----\nMIIKey\n-----END PRIVATE KEY-----",
|
||||
"OPA_AUTH_CA_CERT": "-----BEGIN CERTIFICATE-----\nMIICACert\n-----END CERTIFICATE-----",
|
||||
},
|
||||
True, # Client cert should be created
|
||||
"file", # Verify path should be a file
|
||||
"-----BEGIN CERTIFICATE-----", # Expected content in verify file
|
||||
),
|
||||
# Case 2: SSL with server verification only
|
||||
(
|
||||
{
|
||||
"OPA_HOST": "opa.example.com",
|
||||
"OPA_SSL": True,
|
||||
"OPA_AUTH_TYPE": OPA_AUTH_TYPES.NONE,
|
||||
"OPA_AUTH_CA_CERT": "-----BEGIN CERTIFICATE-----\nMIICACert\n-----END CERTIFICATE-----",
|
||||
},
|
||||
False, # No client cert should be created
|
||||
"file", # Verify path should be a file
|
||||
"-----BEGIN CERTIFICATE-----", # Expected content in verify file
|
||||
),
|
||||
# Case 3: SSL with system CA store
|
||||
(
|
||||
{
|
||||
"OPA_HOST": "opa.example.com",
|
||||
"OPA_SSL": True,
|
||||
"OPA_AUTH_TYPE": OPA_AUTH_TYPES.NONE,
|
||||
"OPA_AUTH_CA_CERT": "", # No custom CA cert
|
||||
},
|
||||
False, # No client cert should be created
|
||||
True, # Verify path should be True (system CA store)
|
||||
None, # No file to check content
|
||||
),
|
||||
# Case 4: No SSL
|
||||
(
|
||||
{
|
||||
"OPA_HOST": "opa.example.com",
|
||||
"OPA_SSL": False,
|
||||
"OPA_AUTH_TYPE": OPA_AUTH_TYPES.NONE,
|
||||
},
|
||||
False, # No client cert should be created
|
||||
False, # Verify path should be False (no verification)
|
||||
None, # No file to check content
|
||||
),
|
||||
],
|
||||
ids=[
|
||||
"certificate_auth",
|
||||
"ssl_server_verification",
|
||||
"ssl_system_ca_store",
|
||||
"no_ssl",
|
||||
],
|
||||
)
|
||||
def test_opa_cert_file(settings_kwargs, expected_client_cert, expected_verify, verify_content):
|
||||
"""Parameterized test for the opa_cert_file context manager.
|
||||
|
||||
Tests different configurations:
|
||||
- Certificate-based authentication (mTLS)
|
||||
- SSL with server verification only
|
||||
- SSL with system CA store
|
||||
- No SSL
|
||||
"""
|
||||
with override_settings(**settings_kwargs):
|
||||
client_cert_path = None
|
||||
verify_path = None
|
||||
|
||||
with policy.opa_cert_file() as cert_files:
|
||||
client_cert_path, verify_path = cert_files
|
||||
|
||||
# Check client cert based on expected_client_cert
|
||||
if expected_client_cert:
|
||||
assert client_cert_path is not None
|
||||
with open(client_cert_path, 'r') as f:
|
||||
content = f.read()
|
||||
assert "-----BEGIN CERTIFICATE-----" in content
|
||||
assert "-----BEGIN PRIVATE KEY-----" in content
|
||||
else:
|
||||
assert client_cert_path is None
|
||||
|
||||
# Check verify path based on expected_verify
|
||||
if expected_verify == "file":
|
||||
assert verify_path is not None
|
||||
assert os.path.isfile(verify_path)
|
||||
with open(verify_path, 'r') as f:
|
||||
content = f.read()
|
||||
assert verify_content in content
|
||||
else:
|
||||
assert verify_path is expected_verify
|
||||
|
||||
# Verify files are deleted after context manager exits
|
||||
if expected_client_cert:
|
||||
assert not os.path.exists(client_cert_path), "Client cert file was not deleted"
|
||||
|
||||
if expected_verify == "file":
|
||||
assert not os.path.exists(verify_path), "CA cert file was not deleted"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(
|
||||
OPA_HOST='opa.example.com',
|
||||
OPA_SSL=False, # SSL disabled
|
||||
OPA_AUTH_TYPE=OPA_AUTH_TYPES.CERTIFICATE, # But cert auth enabled
|
||||
OPA_AUTH_CLIENT_CERT="-----BEGIN CERTIFICATE-----\nMIICert\n-----END CERTIFICATE-----",
|
||||
OPA_AUTH_CLIENT_KEY="-----BEGIN PRIVATE KEY-----\nMIIKey\n-----END PRIVATE KEY-----",
|
||||
)
|
||||
def test_evaluate_policy_cert_auth_requires_ssl():
|
||||
"""Test that policy evaluation raises an error when certificate auth is used without SSL."""
|
||||
project = Project.objects.create(name='proj1')
|
||||
inventory = Inventory.objects.create(name='inv1', opa_query_path="inventory/response")
|
||||
org = Organization.objects.create(name="org1", opa_query_path="organization/response")
|
||||
jt = JobTemplate.objects.create(name="jt1", opa_query_path="job_template/response")
|
||||
job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project, organization=org, job_template=jt)
|
||||
|
||||
with pytest.raises(PolicyEvaluationError) as pe:
|
||||
policy.evaluate_policy(job)
|
||||
|
||||
assert "OPA_AUTH_TYPE=Certificate requires OPA_SSL to be enabled" in str(pe.value)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(
|
||||
OPA_HOST='opa.example.com',
|
||||
OPA_SSL=True,
|
||||
OPA_AUTH_TYPE=OPA_AUTH_TYPES.CERTIFICATE,
|
||||
OPA_AUTH_CLIENT_CERT="", # Missing client cert
|
||||
OPA_AUTH_CLIENT_KEY="", # Missing client key
|
||||
OPA_AUTH_CA_CERT="", # Missing CA cert
|
||||
)
|
||||
def test_evaluate_policy_missing_cert_settings():
|
||||
"""Test that policy evaluation raises an error when certificate settings are missing."""
|
||||
project = Project.objects.create(name='proj1')
|
||||
inventory = Inventory.objects.create(name='inv1', opa_query_path="inventory/response")
|
||||
org = Organization.objects.create(name="org1", opa_query_path="organization/response")
|
||||
jt = JobTemplate.objects.create(name="jt1", opa_query_path="job_template/response")
|
||||
job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project, organization=org, job_template=jt)
|
||||
|
||||
with pytest.raises(PolicyEvaluationError) as pe:
|
||||
policy.evaluate_policy(job)
|
||||
|
||||
error_msg = str(pe.value)
|
||||
assert "Following certificate settings are missing for OPA_AUTH_TYPE=Certificate:" in error_msg
|
||||
assert "OPA_AUTH_CLIENT_CERT" in error_msg
|
||||
assert "OPA_AUTH_CLIENT_KEY" in error_msg
|
||||
assert "OPA_AUTH_CA_CERT" in error_msg
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(
|
||||
OPA_HOST='opa.example.com',
|
||||
OPA_PORT=8181,
|
||||
OPA_SSL=True,
|
||||
OPA_AUTH_TYPE=OPA_AUTH_TYPES.CERTIFICATE,
|
||||
OPA_AUTH_CLIENT_CERT="-----BEGIN CERTIFICATE-----\nMIICert\n-----END CERTIFICATE-----",
|
||||
OPA_AUTH_CLIENT_KEY="-----BEGIN PRIVATE KEY-----\nMIIKey\n-----END PRIVATE KEY-----",
|
||||
OPA_AUTH_CA_CERT="-----BEGIN CERTIFICATE-----\nMIICACert\n-----END CERTIFICATE-----",
|
||||
OPA_REQUEST_TIMEOUT=2.5,
|
||||
OPA_REQUEST_RETRIES=3,
|
||||
)
|
||||
def test_opa_client_context_manager_mtls():
|
||||
"""Test that opa_client context manager correctly initializes the OPA client."""
|
||||
# Mock the OpaClient class
|
||||
with mock.patch('awx.main.tasks.policy.OpaClient') as mock_opa_client:
|
||||
# Setup the mock
|
||||
mock_instance = mock_opa_client.return_value
|
||||
mock_instance.__enter__.return_value = mock_instance
|
||||
mock_instance._session = mock.MagicMock()
|
||||
|
||||
# Use the context manager
|
||||
with policy.opa_client(headers={'Custom-Header': 'Value'}) as client:
|
||||
# Verify the client was initialized with the correct parameters
|
||||
mock_opa_client.assert_called_once_with(
|
||||
host='opa.example.com',
|
||||
port=8181,
|
||||
headers={'Custom-Header': 'Value'},
|
||||
ssl=True,
|
||||
cert=mock.ANY, # We can't check the exact value as it's a temporary file
|
||||
timeout=2.5,
|
||||
retries=3,
|
||||
)
|
||||
|
||||
# Verify the session properties were set correctly
|
||||
assert client._session.cert is not None
|
||||
assert client._session.verify is not None
|
||||
|
||||
# Check the content of the cert file
|
||||
cert_file_path = client._session.cert
|
||||
assert os.path.isfile(cert_file_path)
|
||||
with open(cert_file_path, 'r') as f:
|
||||
cert_content = f.read()
|
||||
assert "-----BEGIN CERTIFICATE-----" in cert_content
|
||||
assert "MIICert" in cert_content
|
||||
assert "-----BEGIN PRIVATE KEY-----" in cert_content
|
||||
assert "MIIKey" in cert_content
|
||||
|
||||
# Check the content of the verify file
|
||||
verify_file_path = client._session.verify
|
||||
assert os.path.isfile(verify_file_path)
|
||||
with open(verify_file_path, 'r') as f:
|
||||
verify_content = f.read()
|
||||
assert "-----BEGIN CERTIFICATE-----" in verify_content
|
||||
assert "MIICACert" in verify_content
|
||||
|
||||
# Verify the client is the mocked instance
|
||||
assert client is mock_instance
|
||||
|
||||
# Store file paths for checking after context exit
|
||||
cert_path = client._session.cert
|
||||
verify_path = client._session.verify
|
||||
|
||||
# Verify files are deleted after context manager exits
|
||||
assert not os.path.exists(cert_path), "Client cert file was not deleted"
|
||||
assert not os.path.exists(verify_path), "CA cert file was not deleted"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(
|
||||
OPA_HOST='opa.example.com',
|
||||
OPA_SSL=True,
|
||||
OPA_AUTH_TYPE=OPA_AUTH_TYPES.TOKEN,
|
||||
OPA_AUTH_TOKEN='secret-token',
|
||||
OPA_AUTH_CUSTOM_HEADERS={'X-Custom': 'Header'},
|
||||
)
|
||||
def test_opa_client_token_auth():
|
||||
"""Test that token authentication correctly adds the Authorization header."""
|
||||
# Create a job for testing
|
||||
project = Project.objects.create(name='proj1')
|
||||
inventory = Inventory.objects.create(name='inv1', opa_query_path="inventory/response")
|
||||
org = Organization.objects.create(name="org1", opa_query_path="organization/response")
|
||||
jt = JobTemplate.objects.create(name="jt1", opa_query_path="job_template/response")
|
||||
job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project, organization=org, job_template=jt)
|
||||
|
||||
# Mock the OpaClient class
|
||||
with mock.patch('awx.main.tasks.policy.opa_client') as mock_opa_client_cm:
|
||||
# Setup the mock
|
||||
mock_client = mock.MagicMock()
|
||||
mock_opa_client_cm.return_value.__enter__.return_value = mock_client
|
||||
mock_client.query_rule.return_value = {
|
||||
"result": {
|
||||
"allowed": True,
|
||||
"violations": [],
|
||||
}
|
||||
}
|
||||
|
||||
# Call evaluate_policy
|
||||
policy.evaluate_policy(job)
|
||||
|
||||
# Verify opa_client was called with the correct headers
|
||||
expected_headers = {'X-Custom': 'Header', 'Authorization': 'Bearer secret-token'}
|
||||
mock_opa_client_cm.assert_called_once_with(headers=expected_headers)
|
||||
|
||||
@ -1242,7 +1242,6 @@ INDIRECT_HOST_AUDIT_RECORD_MAX_AGE_DAYS = 7
|
||||
# setting for Policy as Code feature
|
||||
FEATURE_POLICY_AS_CODE_ENABLED = False
|
||||
|
||||
OPA_POLICY_EVALUATION_DEFAULT_RESULT = {'allowed': True} # Default policy enforcement decision if policy evaluation fail for any reason.
|
||||
OPA_HOST = '' # The hostname used to connect to the OPA server. If empty, policy enforcement will be disabled.
|
||||
OPA_PORT = 8181 # The port used to connect to the OPA server. Defaults to 8181.
|
||||
OPA_SSL = False # Enable or disable the use of SSL to connect to the OPA server. Defaults to false.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user