diff --git a/awx/main/models/organization.py b/awx/main/models/organization.py index c5fc875c7b..4d05eb01f4 100644 --- a/awx/main/models/organization.py +++ b/awx/main/models/organization.py @@ -186,9 +186,10 @@ class AuthToken(BaseModel): h = hashlib.sha1() h.update(settings.SECRET_KEY) for header in settings.REMOTE_HOST_HEADERS: - value = request.META.get(header, '').strip() + value = request.META.get(header, '').split(',')[0].strip() if value: h.update(value) + break h.update(request.META.get('HTTP_USER_AGENT', '')) return h.hexdigest() diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 0eeb845678..655bd4cd63 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -362,12 +362,12 @@ class BaseTestMixin(QueueTestMixin): def _generic_rest(self, url, data=None, expect=204, auth=None, method=None, data_type=None, accept=None, remote_addr=None, - return_response_object=False): + return_response_object=False, client_kwargs=None): assert method is not None method_name = method.lower() #if method_name not in ('options', 'head', 'get', 'delete'): # assert data is not None - client_kwargs = {} + client_kwargs = client_kwargs or {} if accept: client_kwargs['HTTP_ACCEPT'] = accept if remote_addr is not None: @@ -447,17 +447,19 @@ class BaseTestMixin(QueueTestMixin): method='head', accept=accept, remote_addr=remote_addr) - def get(self, url, expect=200, auth=None, accept=None, remote_addr=None): + def get(self, url, expect=200, auth=None, accept=None, remote_addr=None, client_kwargs={}): return self._generic_rest(url, data=None, expect=expect, auth=auth, method='get', accept=accept, - remote_addr=remote_addr) + remote_addr=remote_addr, + client_kwargs=client_kwargs) def post(self, url, data, expect=204, auth=None, data_type=None, - accept=None, remote_addr=None): + accept=None, remote_addr=None, client_kwargs={}): return self._generic_rest(url, data=data, expect=expect, auth=auth, method='post', data_type=data_type, accept=accept, - remote_addr=remote_addr) + remote_addr=remote_addr, + client_kwargs=client_kwargs) def put(self, url, data, expect=200, auth=None, data_type=None, accept=None, remote_addr=None): diff --git a/awx/main/tests/users.py b/awx/main/tests/users.py index 767a7f73f4..34aa4f4f3c 100644 --- a/awx/main/tests/users.py +++ b/awx/main/tests/users.py @@ -18,7 +18,95 @@ from django.core.urlresolvers import reverse from awx.main.models import * from awx.main.tests.base import BaseTest -__all__ = ['UsersTest', 'LdapTest'] +__all__ = ['AuthTokenProxyTest', 'UsersTest', 'LdapTest'] + +''' +Ensure ips from the X-Forwarded-For get honored and used in auth tokens +''' +class AuthTokenProxyTest(BaseTest): + def check_token_and_expires_exist(self, response): + self.assertTrue('token' in response) + self.assertTrue('expires' in response) + + def check_me_is_admin(self, response): + self.assertEquals(response['results'][0]['username'], 'admin') + self.assertEquals(response['count'], 1) + + def save_remote_host_headers(self): + self._remote_host_headers = settings.REMOTE_HOST_HEADERS[:] + + def restore_remote_host_headers(self): + if getattr(self, '_remote_host_headers', None): + settings.REMOTE_HOST_HEADERS = self._remote_host_headers + + def setUp(self): + super(AuthTokenProxyTest, self).setUp() + self.setup_users() + self.setup_instances() + self.organizations = self.make_organizations(self.super_django_user, 2) + self.organizations[0].admins.add(self.normal_django_user) + + self.assertIn('REMOTE_ADDR', settings.REMOTE_HOST_HEADERS) + self.assertIn('REMOTE_HOST', settings.REMOTE_HOST_HEADERS) + + if 'HTTP_X_FORWARDED_FOR' not in settings.REMOTE_HOST_HEADERS: + self.save_remote_host_headers() + settings.REMOTE_HOST_HEADERS.insert(0, 'HTTP_X_FORWARDED_FOR') + + def tearDown(self): + super(AuthTokenProxyTest, self).tearDown() + self.restore_remote_host_headers() + + def _request_auth_token(self, remote_addr): + auth_token_url = reverse('api:auth_token_view') + client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr } + + # Request a new auth token from the remote address specified via 'HTTP_X_FORWARDED_FOR' + data = dict(zip(('username', 'password'), self.get_super_credentials())) + response = self.post(auth_token_url, data, expect=200, auth=None, remote_addr=None, client_kwargs=client_kwargs) + self.check_token_and_expires_exist(response) + auth_token = response['token'] + + return auth_token + + def _get_me(self, expect, auth, remote_addr, client_kwargs=None): + user_me_url = reverse('api:user_me_list') + return self.get(user_me_url, expect=expect, auth=auth, remote_addr=remote_addr, client_kwargs=client_kwargs) + + def test_honor_ip(self): + remote_addr = '192.168.75.1' + + auth_token = self._request_auth_token(remote_addr) + + # Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR + client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr } + response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs) + self.check_me_is_admin(response) + + # Verify we can access our own user information, from the remote address + response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr) + self.check_me_is_admin(response) + + def test_honor_ip_fail(self): + remote_addr = '192.168.75.1' + remote_addr_diff = '192.168.75.2' + + auth_token = self._request_auth_token(remote_addr) + + # Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR + client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr_diff } + response = self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs) + self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr_diff) + + # should use ip address from other headers when HTTP_X_FORARDED_FOR is blank + def test_blank_header_fallback(self): + remote_addr = '192.168.75.1' + + auth_token = self._request_auth_token(remote_addr) + + client_kwargs = { 'HTTP_X_FORARDED_FOR': '' } + response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs) + self.check_me_is_admin(response) class UsersTest(BaseTest):