mirror of
https://github.com/ansible/awx.git
synced 2026-01-19 13:41:28 -03:30
Merge pull request #56 from chrismeyersfsu/improvement-auth_behind_proxy
add support to get ip address from HTTP_X_FORWARDED_FOR header
This commit is contained in:
commit
6d99ed2774
@ -186,9 +186,10 @@ class AuthToken(BaseModel):
|
||||
h = hashlib.sha1()
|
||||
h.update(settings.SECRET_KEY)
|
||||
for header in settings.REMOTE_HOST_HEADERS:
|
||||
value = request.META.get(header, '').strip()
|
||||
value = request.META.get(header, '').split(',')[0].strip()
|
||||
if value:
|
||||
h.update(value)
|
||||
break
|
||||
h.update(request.META.get('HTTP_USER_AGENT', ''))
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
@ -362,12 +362,12 @@ class BaseTestMixin(QueueTestMixin):
|
||||
|
||||
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None,
|
||||
data_type=None, accept=None, remote_addr=None,
|
||||
return_response_object=False):
|
||||
return_response_object=False, client_kwargs=None):
|
||||
assert method is not None
|
||||
method_name = method.lower()
|
||||
#if method_name not in ('options', 'head', 'get', 'delete'):
|
||||
# assert data is not None
|
||||
client_kwargs = {}
|
||||
client_kwargs = client_kwargs or {}
|
||||
if accept:
|
||||
client_kwargs['HTTP_ACCEPT'] = accept
|
||||
if remote_addr is not None:
|
||||
@ -447,17 +447,19 @@ class BaseTestMixin(QueueTestMixin):
|
||||
method='head', accept=accept,
|
||||
remote_addr=remote_addr)
|
||||
|
||||
def get(self, url, expect=200, auth=None, accept=None, remote_addr=None):
|
||||
def get(self, url, expect=200, auth=None, accept=None, remote_addr=None, client_kwargs={}):
|
||||
return self._generic_rest(url, data=None, expect=expect, auth=auth,
|
||||
method='get', accept=accept,
|
||||
remote_addr=remote_addr)
|
||||
remote_addr=remote_addr,
|
||||
client_kwargs=client_kwargs)
|
||||
|
||||
def post(self, url, data, expect=204, auth=None, data_type=None,
|
||||
accept=None, remote_addr=None):
|
||||
accept=None, remote_addr=None, client_kwargs={}):
|
||||
return self._generic_rest(url, data=data, expect=expect, auth=auth,
|
||||
method='post', data_type=data_type,
|
||||
accept=accept,
|
||||
remote_addr=remote_addr)
|
||||
remote_addr=remote_addr,
|
||||
client_kwargs=client_kwargs)
|
||||
|
||||
def put(self, url, data, expect=200, auth=None, data_type=None,
|
||||
accept=None, remote_addr=None):
|
||||
|
||||
@ -18,7 +18,95 @@ from django.core.urlresolvers import reverse
|
||||
from awx.main.models import *
|
||||
from awx.main.tests.base import BaseTest
|
||||
|
||||
__all__ = ['UsersTest', 'LdapTest']
|
||||
__all__ = ['AuthTokenProxyTest', 'UsersTest', 'LdapTest']
|
||||
|
||||
'''
|
||||
Ensure ips from the X-Forwarded-For get honored and used in auth tokens
|
||||
'''
|
||||
class AuthTokenProxyTest(BaseTest):
|
||||
def check_token_and_expires_exist(self, response):
|
||||
self.assertTrue('token' in response)
|
||||
self.assertTrue('expires' in response)
|
||||
|
||||
def check_me_is_admin(self, response):
|
||||
self.assertEquals(response['results'][0]['username'], 'admin')
|
||||
self.assertEquals(response['count'], 1)
|
||||
|
||||
def save_remote_host_headers(self):
|
||||
self._remote_host_headers = settings.REMOTE_HOST_HEADERS[:]
|
||||
|
||||
def restore_remote_host_headers(self):
|
||||
if getattr(self, '_remote_host_headers', None):
|
||||
settings.REMOTE_HOST_HEADERS = self._remote_host_headers
|
||||
|
||||
def setUp(self):
|
||||
super(AuthTokenProxyTest, self).setUp()
|
||||
self.setup_users()
|
||||
self.setup_instances()
|
||||
self.organizations = self.make_organizations(self.super_django_user, 2)
|
||||
self.organizations[0].admins.add(self.normal_django_user)
|
||||
|
||||
self.assertIn('REMOTE_ADDR', settings.REMOTE_HOST_HEADERS)
|
||||
self.assertIn('REMOTE_HOST', settings.REMOTE_HOST_HEADERS)
|
||||
|
||||
if 'HTTP_X_FORWARDED_FOR' not in settings.REMOTE_HOST_HEADERS:
|
||||
self.save_remote_host_headers()
|
||||
settings.REMOTE_HOST_HEADERS.insert(0, 'HTTP_X_FORWARDED_FOR')
|
||||
|
||||
def tearDown(self):
|
||||
super(AuthTokenProxyTest, self).tearDown()
|
||||
self.restore_remote_host_headers()
|
||||
|
||||
def _request_auth_token(self, remote_addr):
|
||||
auth_token_url = reverse('api:auth_token_view')
|
||||
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr }
|
||||
|
||||
# Request a new auth token from the remote address specified via 'HTTP_X_FORWARDED_FOR'
|
||||
data = dict(zip(('username', 'password'), self.get_super_credentials()))
|
||||
response = self.post(auth_token_url, data, expect=200, auth=None, remote_addr=None, client_kwargs=client_kwargs)
|
||||
self.check_token_and_expires_exist(response)
|
||||
auth_token = response['token']
|
||||
|
||||
return auth_token
|
||||
|
||||
def _get_me(self, expect, auth, remote_addr, client_kwargs=None):
|
||||
user_me_url = reverse('api:user_me_list')
|
||||
return self.get(user_me_url, expect=expect, auth=auth, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||
|
||||
def test_honor_ip(self):
|
||||
remote_addr = '192.168.75.1'
|
||||
|
||||
auth_token = self._request_auth_token(remote_addr)
|
||||
|
||||
# Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR
|
||||
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr }
|
||||
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||
self.check_me_is_admin(response)
|
||||
|
||||
# Verify we can access our own user information, from the remote address
|
||||
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr)
|
||||
self.check_me_is_admin(response)
|
||||
|
||||
def test_honor_ip_fail(self):
|
||||
remote_addr = '192.168.75.1'
|
||||
remote_addr_diff = '192.168.75.2'
|
||||
|
||||
auth_token = self._request_auth_token(remote_addr)
|
||||
|
||||
# Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR
|
||||
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr_diff }
|
||||
response = self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||
self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr_diff)
|
||||
|
||||
# should use ip address from other headers when HTTP_X_FORARDED_FOR is blank
|
||||
def test_blank_header_fallback(self):
|
||||
remote_addr = '192.168.75.1'
|
||||
|
||||
auth_token = self._request_auth_token(remote_addr)
|
||||
|
||||
client_kwargs = { 'HTTP_X_FORARDED_FOR': '' }
|
||||
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
|
||||
self.check_me_is_admin(response)
|
||||
|
||||
class UsersTest(BaseTest):
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user