mirror of
https://github.com/ansible/awx.git
synced 2026-05-17 06:17:36 -02:30
add support to get ip address from HTTP_X_FORWARDED_FOR header
This commit is contained in:
@@ -186,9 +186,10 @@ class AuthToken(BaseModel):
|
|||||||
h = hashlib.sha1()
|
h = hashlib.sha1()
|
||||||
h.update(settings.SECRET_KEY)
|
h.update(settings.SECRET_KEY)
|
||||||
for header in settings.REMOTE_HOST_HEADERS:
|
for header in settings.REMOTE_HOST_HEADERS:
|
||||||
value = request.META.get(header, '').strip()
|
value = request.META.get(header, '').split(',')[0].strip()
|
||||||
if value:
|
if value:
|
||||||
h.update(value)
|
h.update(value)
|
||||||
|
break
|
||||||
h.update(request.META.get('HTTP_USER_AGENT', ''))
|
h.update(request.META.get('HTTP_USER_AGENT', ''))
|
||||||
return h.hexdigest()
|
return h.hexdigest()
|
||||||
|
|
||||||
|
|||||||
@@ -362,12 +362,12 @@ class BaseTestMixin(QueueTestMixin):
|
|||||||
|
|
||||||
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None,
|
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None,
|
||||||
data_type=None, accept=None, remote_addr=None,
|
data_type=None, accept=None, remote_addr=None,
|
||||||
return_response_object=False):
|
return_response_object=False, client_kwargs=None):
|
||||||
assert method is not None
|
assert method is not None
|
||||||
method_name = method.lower()
|
method_name = method.lower()
|
||||||
#if method_name not in ('options', 'head', 'get', 'delete'):
|
#if method_name not in ('options', 'head', 'get', 'delete'):
|
||||||
# assert data is not None
|
# assert data is not None
|
||||||
client_kwargs = {}
|
client_kwargs = client_kwargs or {}
|
||||||
if accept:
|
if accept:
|
||||||
client_kwargs['HTTP_ACCEPT'] = accept
|
client_kwargs['HTTP_ACCEPT'] = accept
|
||||||
if remote_addr is not None:
|
if remote_addr is not None:
|
||||||
@@ -447,17 +447,19 @@ class BaseTestMixin(QueueTestMixin):
|
|||||||
method='head', accept=accept,
|
method='head', accept=accept,
|
||||||
remote_addr=remote_addr)
|
remote_addr=remote_addr)
|
||||||
|
|
||||||
def get(self, url, expect=200, auth=None, accept=None, remote_addr=None):
|
def get(self, url, expect=200, auth=None, accept=None, remote_addr=None, client_kwargs={}):
|
||||||
return self._generic_rest(url, data=None, expect=expect, auth=auth,
|
return self._generic_rest(url, data=None, expect=expect, auth=auth,
|
||||||
method='get', accept=accept,
|
method='get', accept=accept,
|
||||||
remote_addr=remote_addr)
|
remote_addr=remote_addr,
|
||||||
|
client_kwargs=client_kwargs)
|
||||||
|
|
||||||
def post(self, url, data, expect=204, auth=None, data_type=None,
|
def post(self, url, data, expect=204, auth=None, data_type=None,
|
||||||
accept=None, remote_addr=None):
|
accept=None, remote_addr=None, client_kwargs={}):
|
||||||
return self._generic_rest(url, data=data, expect=expect, auth=auth,
|
return self._generic_rest(url, data=data, expect=expect, auth=auth,
|
||||||
method='post', data_type=data_type,
|
method='post', data_type=data_type,
|
||||||
accept=accept,
|
accept=accept,
|
||||||
remote_addr=remote_addr)
|
remote_addr=remote_addr,
|
||||||
|
client_kwargs=client_kwargs)
|
||||||
|
|
||||||
def put(self, url, data, expect=200, auth=None, data_type=None,
|
def put(self, url, data, expect=200, auth=None, data_type=None,
|
||||||
accept=None, remote_addr=None):
|
accept=None, remote_addr=None):
|
||||||
|
|||||||
@@ -18,7 +18,95 @@ from django.core.urlresolvers import reverse
|
|||||||
from awx.main.models import *
|
from awx.main.models import *
|
||||||
from awx.main.tests.base import BaseTest
|
from awx.main.tests.base import BaseTest
|
||||||
|
|
||||||
__all__ = ['UsersTest', 'LdapTest']
|
__all__ = ['AuthTokenProxyTest', 'UsersTest', 'LdapTest']
|
||||||
|
|
||||||
|
'''
|
||||||
|
Ensure ips from the X-Forwarded-For get honored and used in auth tokens
|
||||||
|
'''
|
||||||
|
class AuthTokenProxyTest(BaseTest):
|
||||||
|
def check_token_and_expires_exist(self, response):
|
||||||
|
self.assertTrue('token' in response)
|
||||||
|
self.assertTrue('expires' in response)
|
||||||
|
|
||||||
|
def check_me_is_admin(self, response):
|
||||||
|
self.assertEquals(response['results'][0]['username'], 'admin')
|
||||||
|
self.assertEquals(response['count'], 1)
|
||||||
|
|
||||||
|
def save_remote_host_headers(self):
|
||||||
|
self._remote_host_headers = settings.REMOTE_HOST_HEADERS[:]
|
||||||
|
|
||||||
|
def restore_remote_host_headers(self):
|
||||||
|
if getattr(self, '_remote_host_headers', None):
|
||||||
|
settings.REMOTE_HOST_HEADERS = self._remote_host_headers
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(AuthTokenProxyTest, self).setUp()
|
||||||
|
self.setup_users()
|
||||||
|
self.setup_instances()
|
||||||
|
self.organizations = self.make_organizations(self.super_django_user, 2)
|
||||||
|
self.organizations[0].admins.add(self.normal_django_user)
|
||||||
|
|
||||||
|
self.assertIn('REMOTE_ADDR', settings.REMOTE_HOST_HEADERS)
|
||||||
|
self.assertIn('REMOTE_HOST', settings.REMOTE_HOST_HEADERS)
|
||||||
|
|
||||||
|
if 'HTTP_X_FORWARDED_FOR' not in settings.REMOTE_HOST_HEADERS:
|
||||||
|
self.save_remote_host_headers()
|
||||||
|
settings.REMOTE_HOST_HEADERS.insert(0, 'HTTP_X_FORWARDED_FOR')
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super(AuthTokenProxyTest, self).tearDown()
|
||||||
|
self.restore_remote_host_headers()
|
||||||
|
|
||||||
|
def _request_auth_token(self, remote_addr):
|
||||||
|
auth_token_url = reverse('api:auth_token_view')
|
||||||
|
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr }
|
||||||
|
|
||||||
|
# Request a new auth token from the remote address specified via 'HTTP_X_FORWARDED_FOR'
|
||||||
|
data = dict(zip(('username', 'password'), self.get_super_credentials()))
|
||||||
|
response = self.post(auth_token_url, data, expect=200, auth=None, remote_addr=None, client_kwargs=client_kwargs)
|
||||||
|
self.check_token_and_expires_exist(response)
|
||||||
|
auth_token = response['token']
|
||||||
|
|
||||||
|
return auth_token
|
||||||
|
|
||||||
|
def _get_me(self, expect, auth, remote_addr, client_kwargs=None):
|
||||||
|
user_me_url = reverse('api:user_me_list')
|
||||||
|
return self.get(user_me_url, expect=expect, auth=auth, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||||
|
|
||||||
|
def test_honor_ip(self):
|
||||||
|
remote_addr = '192.168.75.1'
|
||||||
|
|
||||||
|
auth_token = self._request_auth_token(remote_addr)
|
||||||
|
|
||||||
|
# Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR
|
||||||
|
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr }
|
||||||
|
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||||
|
self.check_me_is_admin(response)
|
||||||
|
|
||||||
|
# Verify we can access our own user information, from the remote address
|
||||||
|
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr)
|
||||||
|
self.check_me_is_admin(response)
|
||||||
|
|
||||||
|
def test_honor_ip_fail(self):
|
||||||
|
remote_addr = '192.168.75.1'
|
||||||
|
remote_addr_diff = '192.168.75.2'
|
||||||
|
|
||||||
|
auth_token = self._request_auth_token(remote_addr)
|
||||||
|
|
||||||
|
# Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR
|
||||||
|
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr_diff }
|
||||||
|
response = self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||||
|
self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr_diff)
|
||||||
|
|
||||||
|
# should use ip address from other headers when HTTP_X_FORARDED_FOR is blank
|
||||||
|
def test_blank_header_fallback(self):
|
||||||
|
remote_addr = '192.168.75.1'
|
||||||
|
|
||||||
|
auth_token = self._request_auth_token(remote_addr)
|
||||||
|
|
||||||
|
client_kwargs = { 'HTTP_X_FORARDED_FOR': '' }
|
||||||
|
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||||
|
self.check_me_is_admin(response)
|
||||||
|
|
||||||
class UsersTest(BaseTest):
|
class UsersTest(BaseTest):
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user