Tests for trust proxy and existing explicit proxy

* Integration tests to ensure the integration of the two features.
This commit is contained in:
Chris Meyers 2024-06-11 13:37:47 -04:00 committed by Chris Meyers
parent 2c8eef413b
commit 4bbfc8a946

View File

@ -2,21 +2,28 @@ import pytest
from awx.api.versioning import reverse
from django.test.utils import override_settings
from ansible_base.jwt_consumer.common.util import generate_x_trusted_proxy_header
from ansible_base.lib.testing.fixtures import rsa_keypair_factory, rsa_keypair # noqa: F401; pylint: disable=unused-import
class HeaderTrackingMiddleware(object):
def __init__(self):
self.environ = {}
def process_request(self, request):
pass
def process_response(self, request, response):
self.environ = request.environ
@pytest.mark.django_db
def test_proxy_ip_allowed(get, patch, admin):
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'system'})
patch(url, user=admin, data={'REMOTE_HOST_HEADERS': ['HTTP_X_FROM_THE_LOAD_BALANCER', 'REMOTE_ADDR', 'REMOTE_HOST']})
class HeaderTrackingMiddleware(object):
environ = {}
def process_request(self, request):
pass
def process_response(self, request, response):
self.environ = request.environ
# By default, `PROXY_IP_ALLOWED_LIST` is disabled, so custom `REMOTE_HOST_HEADERS`
# should just pass through
middleware = HeaderTrackingMiddleware()
@ -45,6 +52,64 @@ def test_proxy_ip_allowed(get, patch, admin):
assert middleware.environ['HTTP_X_FROM_THE_LOAD_BALANCER'] == 'some-actual-ip'
@pytest.mark.django_db
def test_x_trusted_proxy(get, patch, admin, rsa_keypair): # noqa: F811
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'system'})
patch(url, user=admin, data={'REMOTE_HOST_HEADERS': ['HTTP_X_FROM_THE_LOAD_BALANCER', 'REMOTE_ADDR', 'REMOTE_HOST']})
# Invalid x_trusted_proxy value SHOULD result in sensitive headers deleted
middleware = HeaderTrackingMiddleware()
headers = {
'HTTP_X_TRUSTED_PROXY': generate_x_trusted_proxy_header(rsa_keypair.private),
'HTTP_X_FROM_THE_LOAD_BALANCER': 'some-actual-ip',
}
get(url, user=admin, middleware=middleware, **headers)
assert middleware.environ['HTTP_X_FROM_THE_LOAD_BALANCER'] == 'some-actual-ip'
@pytest.mark.django_db
class TestTrustedProxyAllowListIntegration:
@pytest.fixture
def url(self, patch, admin):
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'system'})
patch(url, user=admin, data={'REMOTE_HOST_HEADERS': ['HTTP_X_FROM_THE_LOAD_BALANCER', 'REMOTE_ADDR', 'REMOTE_HOST']})
patch(url, user=admin, data={'PROXY_IP_ALLOWED_LIST': ['my.proxy.example.org']})
return url
@pytest.fixture
def middleware(self):
return HeaderTrackingMiddleware()
def test_x_trusted_proxy_valid_signature(self, get, admin, rsa_keypair, url, middleware): # noqa: F811
# Invalid x_trusted_proxy value SHOULD result in sensitive headers deleted
headers = {
'HTTP_X_TRUSTED_PROXY': generate_x_trusted_proxy_header(rsa_keypair.private),
'HTTP_X_FROM_THE_LOAD_BALANCER': 'some-actual-ip',
}
with override_settings(ANSIBLE_BASE_JWT_KEY=rsa_keypair.public):
get(url, user=admin, middleware=middleware, **headers)
assert middleware.environ['HTTP_X_FROM_THE_LOAD_BALANCER'] == 'some-actual-ip'
def test_x_trusted_proxy_invalid_signature(self, get, admin, url, middleware):
# Invalid x_trusted_proxy value SHOULD result in sensitive headers deleted
headers = {
'HTTP_X_TRUSTED_PROXY': 'DEAD-BEEF',
'HTTP_X_FROM_THE_LOAD_BALANCER': 'some-actual-ip',
}
get(url, user=admin, middleware=middleware, **headers)
assert 'HTTP_X_FROM_THE_LOAD_BALANCER' not in middleware.environ
def test_x_trusted_proxy_invalid_signature_valid_proxy(self, get, admin, url, middleware):
# A valid explicit proxy SHOULD result in sensitive headers NOT being deleted, regardless of the trusted proxy signature results
headers = {
'HTTP_X_TRUSTED_PROXY': 'DEAD-BEEF',
'REMOTE_ADDR': 'my.proxy.example.org',
'HTTP_X_FROM_THE_LOAD_BALANCER': 'some-actual-ip',
}
get(url, user=admin, middleware=middleware, **headers)
assert middleware.environ['HTTP_X_FROM_THE_LOAD_BALANCER'] == 'some-actual-ip'
@pytest.mark.django_db
class TestDeleteViews:
def test_sublist_delete_permission_check(self, inventory_source, host, rando, delete):