From 096a0a262ac021d406a889c5cb01a03ec64ccc66 Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Tue, 28 Jun 2016 15:42:56 -0400 Subject: [PATCH 01/11] Ported old/ha.py test --- awx/main/tests/old/ha.py | 24 ------------------------ awx/main/tests/unit/test_ha.py | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 24 deletions(-) delete mode 100644 awx/main/tests/old/ha.py create mode 100644 awx/main/tests/unit/test_ha.py diff --git a/awx/main/tests/old/ha.py b/awx/main/tests/old/ha.py deleted file mode 100644 index 4fd0ae1c40..0000000000 --- a/awx/main/tests/old/ha.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. - -# Python -import mock - -# Django -from django.test import SimpleTestCase - -# AWX -from awx.main.models import * # noqa -from awx.main.ha import * # noqa - -__all__ = ['HAUnitTest',] - -class HAUnitTest(SimpleTestCase): - - @mock.patch('awx.main.models.Instance.objects.count', return_value=2) - def test_multiple_instances(self, ignore): - self.assertTrue(is_ha_environment()) - - @mock.patch('awx.main.models.Instance.objects.count', return_value=1) - def test_db_localhost(self, ignore): - self.assertFalse(is_ha_environment()) - diff --git a/awx/main/tests/unit/test_ha.py b/awx/main/tests/unit/test_ha.py new file mode 100644 index 0000000000..07249a67fb --- /dev/null +++ b/awx/main/tests/unit/test_ha.py @@ -0,0 +1,15 @@ +# Copyright (c) 2016 Ansible, Inc. + +# Python +import mock + +# AWX +from awx.main.ha import is_ha_environment + +@mock.patch('awx.main.models.Instance.objects.count', lambda: 2) +def test_multiple_instances(): + assert is_ha_environment() + +@mock.patch('awx.main.models.Instance.objects.count', lambda: 1) +def test_db_localhost(): + assert is_ha_environment() is False From 66f86de3fa29d57fd4da32ffb41867a18ccef0fc Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Tue, 28 Jun 2016 16:58:14 -0400 Subject: [PATCH 02/11] Ported old test_licences.py tests --- .../tests/functional/core/test_licenses.py | 132 ++++++++++++++++ awx/main/tests/old/licenses.py | 147 ------------------ 2 files changed, 132 insertions(+), 147 deletions(-) create mode 100644 awx/main/tests/functional/core/test_licenses.py delete mode 100644 awx/main/tests/old/licenses.py diff --git a/awx/main/tests/functional/core/test_licenses.py b/awx/main/tests/functional/core/test_licenses.py new file mode 100644 index 0000000000..37f3c63fa9 --- /dev/null +++ b/awx/main/tests/functional/core/test_licenses.py @@ -0,0 +1,132 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved. + +import json +import mock +import os +import tempfile +import time +import pytest +from datetime import datetime + +from awx.main.models import Host +from awx.main.task_engine import TaskSerializer, TaskEngager + + +@pytest.mark.django_db +def test_license_writer(inventory, admin): + writer = TaskEngager( + company_name='acmecorp', + contact_name='Michael DeHaan', + contact_email='michael@ansibleworks.com', + license_date=25000, # seconds since epoch + instance_count=500) + + data = writer.get_data() + + Host.objects.bulk_create( + [ + Host( + name='host.%d' % n, + inventory=inventory, + created_by=admin, + modified=datetime.now(), + created=datetime.now()) + for n in range(12) + ] + ) + + assert data['instance_count'] == 500 + assert data['contact_name'] == 'Michael DeHaan' + assert data['contact_email'] == 'michael@ansibleworks.com' + assert data['license_date'] == 25000 + assert data['license_key'] == "11bae31f31c6a6cdcb483a278cdbe98bd8ac5761acd7163a50090b0f098b3a13" + + strdata = writer.get_string() + strdata_loaded = json.loads(strdata) + assert strdata_loaded == data + + reader = TaskSerializer() + + vdata = reader.from_string(strdata) + + assert vdata['available_instances'] == 500 + assert vdata['current_instances'] == 12 + assert vdata['free_instances'] == 488 + assert vdata['date_warning'] is True + assert vdata['date_expired'] is True + assert vdata['license_date'] == 25000 + assert vdata['time_remaining'] < 0 + assert vdata['valid_key'] is True + assert vdata['compliant'] is False + assert vdata['subscription_name'] + +@pytest.mark.django_db +def test_expired_licenses(): + reader = TaskSerializer() + writer = TaskEngager( + company_name='Tower', + contact_name='Tower Admin', + contact_email='tower@ansible.com', + license_date=int(time.time() - 3600), + instance_count=100, + trial=True) + strdata = writer.get_string() + vdata = reader.from_string(strdata) + + assert vdata['compliant'] is False + assert vdata['grace_period_remaining'] < 0 + + writer = TaskEngager( + company_name='Tower', + contact_name='Tower Admin', + contact_email='tower@ansible.com', + license_date=int(time.time() - 2592001), + instance_count=100, + trial=False) + strdata = writer.get_string() + vdata = reader.from_string(strdata) + + assert vdata['compliant'] is False + assert vdata['grace_period_remaining'] < 0 + + writer = TaskEngager( + company_name='Tower', + contact_name='Tower Admin', + contact_email='tower@ansible.com', + license_date=int(time.time() - 3600), + instance_count=100, + trial=False) + strdata = writer.get_string() + vdata = reader.from_string(strdata) + + assert vdata['compliant'] is False + assert vdata['grace_period_remaining'] > 0 + +@pytest.mark.django_db +def test_aws_license(): + os.environ['AWX_LICENSE_FILE'] = 'non-existent-license-file.json' + + h, path = tempfile.mkstemp() + with os.fdopen(h, 'w') as f: + json.dump({'instance_count': 100}, f) + + def fetch_ami(_self): + _self.attributes['ami-id'] = 'ami-00000000' + return True + + def fetch_instance(_self): + _self.attributes['instance-id'] = 'i-00000000' + return True + + with mock.patch('awx.main.task_engine.TEMPORARY_TASK_FILE', path): + with mock.patch('awx.main.task_engine.TemporaryTaskEngine.fetch_ami', fetch_ami): + with mock.patch('awx.main.task_engine.TemporaryTaskEngine.fetch_instance', fetch_instance): + reader = TaskSerializer() + license = reader.from_file() + assert license['is_aws'] + assert license['time_remaining'] + assert license['free_instances'] > 0 + assert license['grace_period_remaining'] > 0 + + os.unlink(path) diff --git a/awx/main/tests/old/licenses.py b/awx/main/tests/old/licenses.py deleted file mode 100644 index 136c3ce8a5..0000000000 --- a/awx/main/tests/old/licenses.py +++ /dev/null @@ -1,147 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved. - -import json -import os -import tempfile - -from awx.main.models import Host, Inventory, Organization -from awx.main.tests.base import BaseTest -import awx.main.task_engine -from awx.main.task_engine import * # noqa - -class LicenseTests(BaseTest): - - def setUp(self): - self.start_redis() - self.setup_instances() - super(LicenseTests, self).setUp() - self.setup_users() - u = self.super_django_user - org = Organization.objects.create(name='o1', created_by=u) - org.admin_role.members.add(self.normal_django_user) - self.inventory = Inventory.objects.create(name='hi', organization=org, created_by=u) - Host.objects.create(name='a1', inventory=self.inventory, created_by=u) - Host.objects.create(name='a2', inventory=self.inventory, created_by=u) - Host.objects.create(name='a3', inventory=self.inventory, created_by=u) - Host.objects.create(name='a4', inventory=self.inventory, created_by=u) - Host.objects.create(name='a5', inventory=self.inventory, created_by=u) - Host.objects.create(name='a6', inventory=self.inventory, created_by=u) - Host.objects.create(name='a7', inventory=self.inventory, created_by=u) - Host.objects.create(name='a8', inventory=self.inventory, created_by=u) - Host.objects.create(name='a9', inventory=self.inventory, created_by=u) - Host.objects.create(name='a10', inventory=self.inventory, created_by=u) - Host.objects.create(name='a11', inventory=self.inventory, created_by=u) - Host.objects.create(name='a12', inventory=self.inventory, created_by=u) - self._temp_task_file = awx.main.task_engine.TEMPORARY_TASK_FILE - self._temp_task_fetch_ami = awx.main.task_engine.TemporaryTaskEngine.fetch_ami - self._temp_task_fetch_instance = awx.main.task_engine.TemporaryTaskEngine.fetch_instance - - def tearDown(self): - awx.main.task_engine.TEMPORARY_TASK_FILE = self._temp_task_file - awx.main.task_engine.TemporaryTaskEngine.fetch_ami = self._temp_task_fetch_ami - awx.main.task_engine.TemporaryTaskEngine.fetch_instance = self._temp_task_fetch_instance - super(LicenseTests, self).tearDown() - self.stop_redis() - - def test_license_writer(self): - - writer = TaskEngager( - company_name='acmecorp', - contact_name='Michael DeHaan', - contact_email='michael@ansibleworks.com', - license_date=25000, # seconds since epoch - instance_count=500) - - data = writer.get_data() - - assert data['instance_count'] == 500 - assert data['contact_name'] == 'Michael DeHaan' - assert data['contact_email'] == 'michael@ansibleworks.com' - assert data['license_date'] == 25000 - assert data['license_key'] == "11bae31f31c6a6cdcb483a278cdbe98bd8ac5761acd7163a50090b0f098b3a13" - - strdata = writer.get_string() - strdata_loaded = json.loads(strdata) - assert strdata_loaded == data - - reader = TaskSerializer() - - vdata = reader.from_string(strdata) - - assert vdata['available_instances'] == 500 - assert vdata['current_instances'] == 12 - assert vdata['free_instances'] == 488 - assert vdata['date_warning'] is True - assert vdata['date_expired'] is True - assert vdata['license_date'] == 25000 - assert vdata['time_remaining'] < 0 - assert vdata['valid_key'] is True - assert vdata['compliant'] is False - assert vdata['subscription_name'] - - def test_expired_licenses(self): - reader = TaskSerializer() - writer = TaskEngager( - company_name='Tower', - contact_name='Tower Admin', - contact_email='tower@ansible.com', - license_date=int(time.time() - 3600), - instance_count=100, - trial=True) - strdata = writer.get_string() - vdata = reader.from_string(strdata) - - assert vdata['compliant'] is False - assert vdata['grace_period_remaining'] < 0 - - writer = TaskEngager( - company_name='Tower', - contact_name='Tower Admin', - contact_email='tower@ansible.com', - license_date=int(time.time() - 2592001), - instance_count=100, - trial=False) - strdata = writer.get_string() - vdata = reader.from_string(strdata) - - assert vdata['compliant'] is False - assert vdata['grace_period_remaining'] < 0 - - writer = TaskEngager( - company_name='Tower', - contact_name='Tower Admin', - contact_email='tower@ansible.com', - license_date=int(time.time() - 3600), - instance_count=100, - trial=False) - strdata = writer.get_string() - vdata = reader.from_string(strdata) - - assert vdata['compliant'] is False - assert vdata['grace_period_remaining'] > 0 - - def test_aws_license(self): - os.environ['AWX_LICENSE_FILE'] = 'non-existent-license-file.json' - h, path = tempfile.mkstemp() - self._temp_paths.append(path) - with os.fdopen(h, 'w') as f: - json.dump({'instance_count': 100}, f) - awx.main.task_engine.TEMPORARY_TASK_FILE = path - - def fetch_ami(_self): - _self.attributes['ami-id'] = 'ami-00000000' - return True - - def fetch_instance(_self): - _self.attributes['instance-id'] = 'i-00000000' - return True - - awx.main.task_engine.TemporaryTaskEngine.fetch_ami = fetch_ami - awx.main.task_engine.TemporaryTaskEngine.fetch_instance = fetch_instance - reader = TaskSerializer() - license = reader.from_file() - self.assertTrue(license['is_aws']) - self.assertTrue(license['time_remaining']) - self.assertTrue(license['free_instances'] > 0) - self.assertTrue(license['grace_period_remaining'] > 0) From 172b6b48b4216f6f86b3116c978d62ba2960c7a6 Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Wed, 29 Jun 2016 14:36:25 -0400 Subject: [PATCH 03/11] Ported old redact.py tests --- .../{old/redact.py => unit/test_redact.py} | 124 +++++++++++------- 1 file changed, 76 insertions(+), 48 deletions(-) rename awx/main/tests/{old/redact.py => unit/test_redact.py} (54%) diff --git a/awx/main/tests/old/redact.py b/awx/main/tests/unit/test_redact.py similarity index 54% rename from awx/main/tests/old/redact.py rename to awx/main/tests/unit/test_redact.py index 3ce38bc7ad..47d6419642 100644 --- a/awx/main/tests/old/redact.py +++ b/awx/main/tests/unit/test_redact.py @@ -1,11 +1,9 @@ - import textwrap +import re # AWX from awx.main.redact import UriCleaner -from awx.main.tests.base import BaseTest, URI - -__all__ = ['UriCleanTests'] +from awx.main.tests.base import URI TEST_URIS = [ URI('no host', scheme='https', username='myusername', password='mypass', host=None), @@ -80,59 +78,89 @@ TEST_CLEARTEXT.append({ 'host_occurrences' : 4 }) -class UriCleanTests(BaseTest): - # should redact sensitive usernames and passwords - def test_uri_scm_simple_redacted(self): - for uri in TEST_URIS: - redacted_str = UriCleaner.remove_sensitive(str(uri)) - if uri.username: - self.check_not_found(redacted_str, uri.username, uri.description) - if uri.password: - self.check_not_found(redacted_str, uri.password, uri.description) +def check_found(string, substr, count=-1, description=None, word_boundary=False): + if word_boundary: + count_actual = len(re.findall(r'\b%s\b' % re.escape(substr), string)) + else: + count_actual = string.count(substr) - # should replace secret data with safe string, UriCleaner.REPLACE_STR - def test_uri_scm_simple_replaced(self): - for uri in TEST_URIS: - redacted_str = UriCleaner.remove_sensitive(str(uri)) - self.check_found(redacted_str, UriCleaner.REPLACE_STR, uri.get_secret_count()) + msg = '' + if description: + msg = 'Test "%s".\n' % description + if count == -1: + assert count_actual > 0 + else: + msg += 'Found %d occurances of "%s" instead of %d in: "%s"' % (count_actual, substr, count, string) + if count_actual != count: + raise Exception(msg) - # should redact multiple uris in text - def test_uri_scm_multiple(self): - cleartext = '' - for uri in TEST_URIS: - cleartext += str(uri) + ' ' - for uri in TEST_URIS: - cleartext += str(uri) + '\n' +def check_not_found(string, substr, description=None, word_boundary=False): + if word_boundary: + count = len(re.findall(r'\b%s\b' % re.escape(substr), string)) + else: + count = string.find(substr) + if count == -1: + count = 0 + msg = '' + if description: + msg = 'Test "%s".\n' % description + msg += '"%s" found in: "%s"' % (substr, string) + if count != 0: + raise Exception(msg) + + +# should redact sensitive usernames and passwords +def test_uri_scm_simple_redacted(): + for uri in TEST_URIS: redacted_str = UriCleaner.remove_sensitive(str(uri)) if uri.username: - self.check_not_found(redacted_str, uri.username, uri.description) + check_not_found(redacted_str, uri.username, uri.description) if uri.password: - self.check_not_found(redacted_str, uri.password, uri.description) + check_not_found(redacted_str, uri.password, uri.description) - # should replace multiple secret data with safe string - def test_uri_scm_multiple_replaced(self): - cleartext = '' - find_count = 0 - for uri in TEST_URIS: - cleartext += str(uri) + ' ' - find_count += uri.get_secret_count() +# should replace secret data with safe string, UriCleaner.REPLACE_STR +def test_uri_scm_simple_replaced(): + for uri in TEST_URIS: + redacted_str = UriCleaner.remove_sensitive(str(uri)) + check_found(redacted_str, UriCleaner.REPLACE_STR, uri.get_secret_count()) - for uri in TEST_URIS: - cleartext += str(uri) + '\n' - find_count += uri.get_secret_count() +# should redact multiple uris in text +def test_uri_scm_multiple(): + cleartext = '' + for uri in TEST_URIS: + cleartext += str(uri) + ' ' + for uri in TEST_URIS: + cleartext += str(uri) + '\n' - redacted_str = UriCleaner.remove_sensitive(cleartext) - self.check_found(redacted_str, UriCleaner.REPLACE_STR, find_count) + redacted_str = UriCleaner.remove_sensitive(str(uri)) + if uri.username: + check_not_found(redacted_str, uri.username, uri.description) + if uri.password: + check_not_found(redacted_str, uri.password, uri.description) - # should redact and replace multiple secret data within a complex cleartext blob - def test_uri_scm_cleartext_redact_and_replace(self): - for test_data in TEST_CLEARTEXT: - uri = test_data['uri'] - redacted_str = UriCleaner.remove_sensitive(test_data['text']) - self.check_not_found(redacted_str, uri.username, uri.description) - self.check_not_found(redacted_str, uri.password, uri.description) - # Ensure the host didn't get redacted - self.check_found(redacted_str, uri.host, test_data['host_occurrences'], uri.description) +# should replace multiple secret data with safe string +def test_uri_scm_multiple_replaced(): + cleartext = '' + find_count = 0 + for uri in TEST_URIS: + cleartext += str(uri) + ' ' + find_count += uri.get_secret_count() + for uri in TEST_URIS: + cleartext += str(uri) + '\n' + find_count += uri.get_secret_count() + + redacted_str = UriCleaner.remove_sensitive(cleartext) + check_found(redacted_str, UriCleaner.REPLACE_STR, find_count) + +# should redact and replace multiple secret data within a complex cleartext blob +def test_uri_scm_cleartext_redact_and_replace(): + for test_data in TEST_CLEARTEXT: + uri = test_data['uri'] + redacted_str = UriCleaner.remove_sensitive(test_data['text']) + check_not_found(redacted_str, uri.username, uri.description) + check_not_found(redacted_str, uri.password, uri.description) + # Ensure the host didn't get redacted + check_found(redacted_str, uri.host, test_data['host_occurrences'], uri.description) From 9742d02ee8cf6516aeef5b3ba7650860ee4d2ef4 Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 30 Jun 2016 11:31:59 -0400 Subject: [PATCH 04/11] Eliminated redundant http request code in test suite --- awx/main/tests/functional/conftest.py | 135 +++++--------------------- 1 file changed, 22 insertions(+), 113 deletions(-) diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 5e6ef333bb..c363d6f1d3 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -293,17 +293,23 @@ def permissions(): 'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,}, } -@pytest.fixture -def post(): - def rf(url, data, user=None, middleware=None, expect=None, **kwargs): - view, view_args, view_kwargs = resolve(urlparse(url)[2]) + +def _request(verb): + def rf(url, data_or_user=None, user=None, middleware=None, expect=None, **kwargs): + if type(data_or_user) is User and user is None: + user = data_or_user + elif 'data' not in kwargs: + kwargs['data'] = data_or_user if 'format' not in kwargs: kwargs['format'] = 'json' - request = APIRequestFactory().post(url, data, **kwargs) + + view, view_args, view_kwargs = resolve(urlparse(url)[2]) + request = getattr(APIRequestFactory(), verb)(url, **kwargs) if middleware: middleware.process_request(request) if user: force_authenticate(request, user=user) + response = view(request, *view_args, **view_kwargs) if middleware: middleware.process_response(request, response) @@ -311,134 +317,37 @@ def post(): if response.status_code != expect: print(response.data) assert response.status_code == expect + response.render() return response return rf +@pytest.fixture +def post(): + return _request('post') + @pytest.fixture def get(): - def rf(url, user=None, middleware=None, expect=None, **kwargs): - view, view_args, view_kwargs = resolve(urlparse(url)[2]) - if 'format' not in kwargs: - kwargs['format'] = 'json' - request = APIRequestFactory().get(url, **kwargs) - if middleware: - middleware.process_request(request) - if user: - force_authenticate(request, user=user) - response = view(request, *view_args, **view_kwargs) - if middleware: - middleware.process_response(request, response) - if expect: - if response.status_code != expect: - print(response.data) - assert response.status_code == expect - return response - return rf + return _request('get') @pytest.fixture def put(): - def rf(url, data, user=None, middleware=None, expect=None, **kwargs): - view, view_args, view_kwargs = resolve(urlparse(url)[2]) - if 'format' not in kwargs: - kwargs['format'] = 'json' - request = APIRequestFactory().put(url, data, **kwargs) - if middleware: - middleware.process_request(request) - if user: - force_authenticate(request, user=user) - response = view(request, *view_args, **view_kwargs) - if middleware: - middleware.process_response(request, response) - if expect: - if response.status_code != expect: - print(response.data) - assert response.status_code == expect - return response - return rf + return _request('put') @pytest.fixture def patch(): - def rf(url, data, user=None, middleware=None, expect=None, **kwargs): - view, view_args, view_kwargs = resolve(urlparse(url)[2]) - if 'format' not in kwargs: - kwargs['format'] = 'json' - request = APIRequestFactory().patch(url, data, **kwargs) - if middleware: - middleware.process_request(request) - if user: - force_authenticate(request, user=user) - response = view(request, *view_args, **view_kwargs) - if middleware: - middleware.process_response(request, response) - if expect: - if response.status_code != expect: - print(response.data) - assert response.status_code == expect - return response - return rf + return _request('patch') @pytest.fixture def delete(): - def rf(url, user=None, middleware=None, expect=None, **kwargs): - view, view_args, view_kwargs = resolve(urlparse(url)[2]) - if 'format' not in kwargs: - kwargs['format'] = 'json' - request = APIRequestFactory().delete(url, **kwargs) - if middleware: - middleware.process_request(request) - if user: - force_authenticate(request, user=user) - response = view(request, *view_args, **view_kwargs) - if middleware: - middleware.process_response(request, response) - if expect: - if response.status_code != expect: - print(response.data) - assert response.status_code == expect - return response - return rf + return _request('delete') @pytest.fixture def head(): - def rf(url, user=None, middleware=None, expect=None, **kwargs): - view, view_args, view_kwargs = resolve(urlparse(url)[2]) - if 'format' not in kwargs: - kwargs['format'] = 'json' - request = APIRequestFactory().head(url, **kwargs) - if middleware: - middleware.process_request(request) - if user: - force_authenticate(request, user=user) - response = view(request, *view_args, **view_kwargs) - if middleware: - middleware.process_response(request, response) - if expect: - if response.status_code != expect: - print(response.data) - assert response.status_code == expect - return response - return rf + return _request('head') @pytest.fixture def options(): - def rf(url, data, user=None, middleware=None, expect=None, **kwargs): - view, view_args, view_kwargs = resolve(urlparse(url)[2]) - if 'format' not in kwargs: - kwargs['format'] = 'json' - request = APIRequestFactory().options(url, data, **kwargs) - if middleware: - middleware.process_request(request) - if user: - force_authenticate(request, user=user) - response = view(request, *view_args, **view_kwargs) - if middleware: - middleware.process_response(request, response) - if expect: - if response.status_code != expect: - print(response.data) - assert response.status_code == expect - return response - return rf + return _request('options') From e2c9d26e0d4b7128b949906de0be2723c139bf62 Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 30 Jun 2016 11:38:06 -0400 Subject: [PATCH 05/11] Ported old/views.py --- .../functional/api/test_unified_jobs_view.py | 79 +++++++++++- awx/main/tests/functional/conftest.py | 6 + awx/main/tests/old/views.py | 113 ------------------ 3 files changed, 83 insertions(+), 115 deletions(-) delete mode 100644 awx/main/tests/old/views.py diff --git a/awx/main/tests/functional/api/test_unified_jobs_view.py b/awx/main/tests/functional/api/test_unified_jobs_view.py index ab9487bc38..174bb60080 100644 --- a/awx/main/tests/functional/api/test_unified_jobs_view.py +++ b/awx/main/tests/functional/api/test_unified_jobs_view.py @@ -1,8 +1,84 @@ import pytest -from awx.main.models import UnifiedJob from django.core.urlresolvers import reverse +from awx.main.models import UnifiedJob, ProjectUpdate +from awx.main.tests.base import URI + + +TEST_STDOUTS = [] +uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs") +TEST_STDOUTS.append({ + 'description': 'uri in a plain text document', + 'uri' : uri, + 'text' : 'hello world %s goodbye world' % uri, + 'occurrences' : 1 +}) + +uri = URI(scheme="https", username="applepie@@@", password="thatyouknow@@@@", host="github.ginger.com/theirrepo.git/info/refs") +TEST_STDOUTS.append({ + 'description': 'uri appears twice in a multiline plain text document', + 'uri' : uri, + 'text' : 'hello world %s \n\nyoyo\n\nhello\n%s' % (uri, uri), + 'occurrences' : 2 +}) + + + +@pytest.fixture +def test_cases(project): + ret = [] + for e in TEST_STDOUTS: + e['project'] = ProjectUpdate(project=project) + e['project'].result_stdout_text = e['text'] + e['project'].save() + ret.append(e) + return ret + +@pytest.fixture +def negative_test_cases(job_factory): + ret = [] + for e in TEST_STDOUTS: + e['job'] = job_factory() + e['job'].result_stdout_text = e['text'] + e['job'].save() + ret.append(e) + return ret + + +formats = [ + ('json', 'application/json'), + ('ansi', 'text/plain'), + ('txt', 'text/plain'), + ('html', 'text/html'), +] + +@pytest.mark.parametrize("format,content_type", formats) +@pytest.mark.django_db +def test_project_update_redaction_enabled(get, format, content_type, test_cases, admin): + for test_data in test_cases: + job = test_data['project'] + response = get(reverse("api:project_update_stdout", args=(job.pk,)) + "?format=" + format, user=admin, expect=200, accept=content_type) + assert content_type in response['CONTENT-TYPE'] + print(response.data) + assert response.data is not None + content = response.data['content'] if format == 'json' else response.data + assert test_data['uri'].username not in content + assert test_data['uri'].password not in content + assert content.count(test_data['uri'].host) == test_data['occurrences'] + +@pytest.mark.parametrize("format,content_type", formats) +@pytest.mark.django_db +def test_job_redaction_disabled(get, format, content_type, negative_test_cases, admin): + for test_data in negative_test_cases: + job = test_data['job'] + response = get(reverse("api:job_stdout", args=(job.pk,)) + "?format=" + format, user=admin, expect=200, format=format) + content = response.data['content'] if format == 'json' else response.data + assert response.data is not None + assert test_data['uri'].username in content + assert test_data['uri'].password in content + + @pytest.mark.django_db def test_options_fields_choices(instance, options, user): @@ -14,4 +90,3 @@ def test_options_fields_choices(instance, options, user): assert UnifiedJob.LAUNCH_TYPE_CHOICES == response.data['actions']['GET']['launch_type']['choices'] assert 'choice' == response.data['actions']['GET']['status']['type'] assert UnifiedJob.STATUS_CHOICES == response.data['actions']['GET']['status']['choices'] - diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index c363d6f1d3..65ccb9a443 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -124,6 +124,12 @@ def project_factory(organization): return prj return factory +@pytest.fixture +def job_factory(job_template, admin): + def factory(job_template=job_template, initial_state='new', created_by=admin): + return job_template.create_job(created_by=created_by, status=initial_state) + return factory + @pytest.fixture def team_factory(organization): def factory(name): diff --git a/awx/main/tests/old/views.py b/awx/main/tests/old/views.py deleted file mode 100644 index 4ccd6f64ec..0000000000 --- a/awx/main/tests/old/views.py +++ /dev/null @@ -1,113 +0,0 @@ -# Django -from django.core.urlresolvers import reverse - -# Reuse Test code -from awx.main.tests.base import ( - BaseLiveServerTest, - QueueStartStopTestMixin, - URI, -) -from awx.main.models.projects import * # noqa - -__all__ = ['UnifiedJobStdoutRedactedTests'] - - -TEST_STDOUTS = [] -uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs") -TEST_STDOUTS.append({ - 'description': 'uri in a plain text document', - 'uri' : uri, - 'text' : 'hello world %s goodbye world' % uri, - 'occurrences' : 1 -}) - -uri = URI(scheme="https", username="applepie@@@", password="thatyouknow@@@@", host="github.ginger.com/theirrepo.git/info/refs") -TEST_STDOUTS.append({ - 'description': 'uri appears twice in a multiline plain text document', - 'uri' : uri, - 'text' : 'hello world %s \n\nyoyo\n\nhello\n%s' % (uri, uri), - 'occurrences' : 2 -}) - -class UnifiedJobStdoutRedactedTests(BaseLiveServerTest, QueueStartStopTestMixin): - - def setUp(self): - super(UnifiedJobStdoutRedactedTests, self).setUp() - self.setup_instances() - self.setup_users() - self.test_cases = [] - self.negative_test_cases = [] - - proj = self.make_project() - - for e in TEST_STDOUTS: - e['project'] = ProjectUpdate(project=proj) - e['project'].result_stdout_text = e['text'] - e['project'].save() - self.test_cases.append(e) - for d in TEST_STDOUTS: - d['job'] = self.make_job() - d['job'].result_stdout_text = d['text'] - d['job'].save() - self.negative_test_cases.append(d) - - # This is more of a functional test than a unit test. - # should filter out username and password - def check_sensitive_redacted(self, test_data, response): - uri = test_data['uri'] - self.assertIsNotNone(response['content']) - self.check_not_found(response['content'], uri.username, test_data['description']) - self.check_not_found(response['content'], uri.password, test_data['description']) - # Ensure the host didn't get redacted - self.check_found(response['content'], uri.host, test_data['occurrences'], test_data['description']) - - def check_sensitive_not_redacted(self, test_data, response): - uri = test_data['uri'] - self.assertIsNotNone(response['content']) - self.check_found(response['content'], uri.username, description=test_data['description']) - self.check_found(response['content'], uri.password, description=test_data['description']) - - def _get_url_job_stdout(self, job, url_base, format='json'): - formats = { - 'json': 'application/json', - 'ansi': 'text/plain', - 'txt': 'text/plain', - 'html': 'text/html', - } - content_type = formats[format] - project_update_stdout_url = reverse(url_base, args=(job.pk,)) + "?format=" + format - return self.get(project_update_stdout_url, expect=200, auth=self.get_super_credentials(), accept=content_type) - - def _test_redaction_enabled(self, format): - for test_data in self.test_cases: - response = self._get_url_job_stdout(test_data['project'], "api:project_update_stdout", format=format) - self.check_sensitive_redacted(test_data, response) - - def _test_redaction_disabled(self, format): - for test_data in self.negative_test_cases: - response = self._get_url_job_stdout(test_data['job'], "api:job_stdout", format=format) - self.check_sensitive_not_redacted(test_data, response) - - def test_project_update_redaction_enabled_json(self): - self._test_redaction_enabled('json') - - def test_project_update_redaction_enabled_ansi(self): - self._test_redaction_enabled('ansi') - - def test_project_update_redaction_enabled_html(self): - self._test_redaction_enabled('html') - - def test_project_update_redaction_enabled_txt(self): - self._test_redaction_enabled('txt') - - def test_job_redaction_disabled_json(self): - self._test_redaction_disabled('json') - - def test_job_redaction_disabled_ansi(self): - self._test_redaction_disabled('ansi') - - def test_job_redaction_disabled_html(self): - self._test_redaction_disabled('html') - - def test_job_redaction_disabled_txt(self): - self._test_redaction_disabled('txt') From fa58ca44b13ad4462adbee90f34663f3eabd0a0a Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 30 Jun 2016 11:52:56 -0400 Subject: [PATCH 06/11] Ported old/unified_jobs.py --- .../functional/api/test_unified_jobs_view.py | 3 +- awx/main/tests/old/unified_jobs.py | 55 ------------------- awx/main/tests/unit/test_unified_jobs.py | 48 ++++++++++++++++ 3 files changed, 50 insertions(+), 56 deletions(-) delete mode 100644 awx/main/tests/old/unified_jobs.py create mode 100644 awx/main/tests/unit/test_unified_jobs.py diff --git a/awx/main/tests/functional/api/test_unified_jobs_view.py b/awx/main/tests/functional/api/test_unified_jobs_view.py index 174bb60080..ed7034a28e 100644 --- a/awx/main/tests/functional/api/test_unified_jobs_view.py +++ b/awx/main/tests/functional/api/test_unified_jobs_view.py @@ -60,7 +60,6 @@ def test_project_update_redaction_enabled(get, format, content_type, test_cases, job = test_data['project'] response = get(reverse("api:project_update_stdout", args=(job.pk,)) + "?format=" + format, user=admin, expect=200, accept=content_type) assert content_type in response['CONTENT-TYPE'] - print(response.data) assert response.data is not None content = response.data['content'] if format == 'json' else response.data assert test_data['uri'].username not in content @@ -90,3 +89,5 @@ def test_options_fields_choices(instance, options, user): assert UnifiedJob.LAUNCH_TYPE_CHOICES == response.data['actions']['GET']['launch_type']['choices'] assert 'choice' == response.data['actions']['GET']['status']['type'] assert UnifiedJob.STATUS_CHOICES == response.data['actions']['GET']['status']['choices'] + + diff --git a/awx/main/tests/old/unified_jobs.py b/awx/main/tests/old/unified_jobs.py deleted file mode 100644 index dad9acb3ea..0000000000 --- a/awx/main/tests/old/unified_jobs.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import mock -from mock import Mock -from StringIO import StringIO -from django.utils.timezone import now - -# Django -from django.test import SimpleTestCase - -# AWX -from awx.main.models import * # noqa - -__all__ = ['UnifiedJobsUnitTest',] - -class UnifiedJobsUnitTest(SimpleTestCase): - - # stdout file present - @mock.patch('os.path.exists', return_value=True) - @mock.patch('codecs.open', return_value='my_file_handler') - def test_result_stdout_raw_handle_file__found(self, exists, open): - unified_job = UnifiedJob() - unified_job.result_stdout_file = 'dummy' - - with mock.patch('os.stat', return_value=Mock(st_size=1)): - result = unified_job.result_stdout_raw_handle() - - self.assertEqual(result, 'my_file_handler') - - # stdout file missing, job finished - @mock.patch('os.path.exists', return_value=False) - def test_result_stdout_raw_handle__missing(self, exists): - unified_job = UnifiedJob() - unified_job.result_stdout_file = 'dummy' - unified_job.finished = now() - - result = unified_job.result_stdout_raw_handle() - - self.assertIsInstance(result, StringIO) - self.assertEqual(result.read(), 'stdout capture is missing') - - # stdout file missing, job not finished - @mock.patch('os.path.exists', return_value=False) - def test_result_stdout_raw_handle__pending(self, exists): - unified_job = UnifiedJob() - unified_job.result_stdout_file = 'dummy' - unified_job.finished = None - - result = unified_job.result_stdout_raw_handle() - - self.assertIsInstance(result, StringIO) - self.assertEqual(result.read(), 'Waiting for results...') - diff --git a/awx/main/tests/unit/test_unified_jobs.py b/awx/main/tests/unit/test_unified_jobs.py new file mode 100644 index 0000000000..edd6978b47 --- /dev/null +++ b/awx/main/tests/unit/test_unified_jobs.py @@ -0,0 +1,48 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved + +# Python +import mock +from mock import Mock +from StringIO import StringIO +from django.utils.timezone import now + +# AWX +from awx.main.models import UnifiedJob + + +# stdout file present +@mock.patch('os.path.exists', return_value=True) +@mock.patch('codecs.open', return_value='my_file_handler') +def test_result_stdout_raw_handle_file__found(exists, open): + unified_job = UnifiedJob() + unified_job.result_stdout_file = 'dummy' + + with mock.patch('os.stat', return_value=Mock(st_size=1)): + result = unified_job.result_stdout_raw_handle() + + assert result == 'my_file_handler' + +# stdout file missing, job finished +@mock.patch('os.path.exists', return_value=False) +def test_result_stdout_raw_handle__missing(exists): + unified_job = UnifiedJob() + unified_job.result_stdout_file = 'dummy' + unified_job.finished = now() + + result = unified_job.result_stdout_raw_handle() + + assert isinstance(result, StringIO) + assert result.read() == 'stdout capture is missing' + +# stdout file missing, job not finished +@mock.patch('os.path.exists', return_value=False) +def test_result_stdout_raw_handle__pending(exists): + unified_job = UnifiedJob() + unified_job.result_stdout_file = 'dummy' + unified_job.finished = None + + result = unified_job.result_stdout_raw_handle() + + assert isinstance(result, StringIO) + assert result.read() == 'Waiting for results...' From 6e022ae183daf0c36d7b00eabb726eb9a00168e8 Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 30 Jun 2016 12:04:23 -0400 Subject: [PATCH 07/11] Broke out URI test helper from test base.py --- awx/main/tests/URI.py | 43 +++++++++++++++++++++++++++++++++++++++ awx/main/tests/base.py | 46 +----------------------------------------- 2 files changed, 44 insertions(+), 45 deletions(-) create mode 100644 awx/main/tests/URI.py diff --git a/awx/main/tests/URI.py b/awx/main/tests/URI.py new file mode 100644 index 0000000000..d04da03436 --- /dev/null +++ b/awx/main/tests/URI.py @@ -0,0 +1,43 @@ +# Helps with test cases. +# Save all components of a uri (i.e. scheme, username, password, etc.) so that +# when we construct a uri string and decompose it, we can verify the decomposition +class URI(object): + DEFAULTS = { + 'scheme' : 'http', + 'username' : 'MYUSERNAME', + 'password' : 'MYPASSWORD', + 'host' : 'host.com', + } + + def __init__(self, description='N/A', scheme=DEFAULTS['scheme'], username=DEFAULTS['username'], password=DEFAULTS['password'], host=DEFAULTS['host']): + self.description = description + self.scheme = scheme + self.username = username + self.password = password + self.host = host + + def get_uri(self): + uri = "%s://" % self.scheme + if self.username: + uri += "%s" % self.username + if self.password: + uri += ":%s" % self.password + if (self.username or self.password) and self.host is not None: + uri += "@%s" % self.host + elif self.host is not None: + uri += "%s" % self.host + return uri + + def get_secret_count(self): + secret_count = 0 + if self.username: + secret_count += 1 + if self.password: + secret_count += 1 + return secret_count + + def __string__(self): + return self.get_uri() + + def __repr__(self): + return self.get_uri() diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 6257e26438..cd3754b23f 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -35,6 +35,7 @@ from awx.main.management.commands.run_task_system import run_taskmanager from awx.main.utils import get_ansible_version from awx.main.task_engine import TaskEngager as LicenseWriter from awx.sso.backends import LDAPSettings +from awx.main.tests.URI import URI # noqa TEST_PLAYBOOK = '''- hosts: mygroup gather_facts: false @@ -732,48 +733,3 @@ class BaseJobExecutionTest(QueueStartStopTestMixin, BaseLiveServerTest): ''' Base class for celery task tests. ''' - -# Helps with test cases. -# Save all components of a uri (i.e. scheme, username, password, etc.) so that -# when we construct a uri string and decompose it, we can verify the decomposition -class URI(object): - DEFAULTS = { - 'scheme' : 'http', - 'username' : 'MYUSERNAME', - 'password' : 'MYPASSWORD', - 'host' : 'host.com', - } - - def __init__(self, description='N/A', scheme=DEFAULTS['scheme'], username=DEFAULTS['username'], password=DEFAULTS['password'], host=DEFAULTS['host']): - self.description = description - self.scheme = scheme - self.username = username - self.password = password - self.host = host - - def get_uri(self): - uri = "%s://" % self.scheme - if self.username: - uri += "%s" % self.username - if self.password: - uri += ":%s" % self.password - if (self.username or self.password) and self.host is not None: - uri += "@%s" % self.host - elif self.host is not None: - uri += "%s" % self.host - return uri - - def get_secret_count(self): - secret_count = 0 - if self.username: - secret_count += 1 - if self.password: - secret_count += 1 - return secret_count - - def __string__(self): - return self.get_uri() - - def __repr__(self): - return self.get_uri() - From d79188f865acd85826224afae1caea3d41c4bc2b Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 30 Jun 2016 12:04:51 -0400 Subject: [PATCH 08/11] Cleaned up unit/test_redact.py --- awx/main/tests/unit/test_redact.py | 52 ++++++------------------------ 1 file changed, 10 insertions(+), 42 deletions(-) diff --git a/awx/main/tests/unit/test_redact.py b/awx/main/tests/unit/test_redact.py index 47d6419642..3535869ee1 100644 --- a/awx/main/tests/unit/test_redact.py +++ b/awx/main/tests/unit/test_redact.py @@ -1,9 +1,8 @@ import textwrap -import re # AWX from awx.main.redact import UriCleaner -from awx.main.tests.base import URI +from awx.main.tests.URI import URI TEST_URIS = [ URI('no host', scheme='https', username='myusername', password='mypass', host=None), @@ -79,52 +78,21 @@ TEST_CLEARTEXT.append({ }) -def check_found(string, substr, count=-1, description=None, word_boundary=False): - if word_boundary: - count_actual = len(re.findall(r'\b%s\b' % re.escape(substr), string)) - else: - count_actual = string.count(substr) - - msg = '' - if description: - msg = 'Test "%s".\n' % description - if count == -1: - assert count_actual > 0 - else: - msg += 'Found %d occurances of "%s" instead of %d in: "%s"' % (count_actual, substr, count, string) - if count_actual != count: - raise Exception(msg) - -def check_not_found(string, substr, description=None, word_boundary=False): - if word_boundary: - count = len(re.findall(r'\b%s\b' % re.escape(substr), string)) - else: - count = string.find(substr) - if count == -1: - count = 0 - - msg = '' - if description: - msg = 'Test "%s".\n' % description - msg += '"%s" found in: "%s"' % (substr, string) - if count != 0: - raise Exception(msg) - # should redact sensitive usernames and passwords def test_uri_scm_simple_redacted(): for uri in TEST_URIS: redacted_str = UriCleaner.remove_sensitive(str(uri)) if uri.username: - check_not_found(redacted_str, uri.username, uri.description) + assert uri.username not in redacted_str if uri.password: - check_not_found(redacted_str, uri.password, uri.description) + assert uri.username not in redacted_str # should replace secret data with safe string, UriCleaner.REPLACE_STR def test_uri_scm_simple_replaced(): for uri in TEST_URIS: redacted_str = UriCleaner.remove_sensitive(str(uri)) - check_found(redacted_str, UriCleaner.REPLACE_STR, uri.get_secret_count()) + assert redacted_str.count(UriCleaner.REPLACE_STR) == uri.get_secret_count() # should redact multiple uris in text def test_uri_scm_multiple(): @@ -136,9 +104,9 @@ def test_uri_scm_multiple(): redacted_str = UriCleaner.remove_sensitive(str(uri)) if uri.username: - check_not_found(redacted_str, uri.username, uri.description) + assert uri.username not in redacted_str if uri.password: - check_not_found(redacted_str, uri.password, uri.description) + assert uri.username not in redacted_str # should replace multiple secret data with safe string def test_uri_scm_multiple_replaced(): @@ -153,14 +121,14 @@ def test_uri_scm_multiple_replaced(): find_count += uri.get_secret_count() redacted_str = UriCleaner.remove_sensitive(cleartext) - check_found(redacted_str, UriCleaner.REPLACE_STR, find_count) + assert redacted_str.count(UriCleaner.REPLACE_STR) == find_count # should redact and replace multiple secret data within a complex cleartext blob def test_uri_scm_cleartext_redact_and_replace(): for test_data in TEST_CLEARTEXT: uri = test_data['uri'] redacted_str = UriCleaner.remove_sensitive(test_data['text']) - check_not_found(redacted_str, uri.username, uri.description) - check_not_found(redacted_str, uri.password, uri.description) + assert uri.username not in redacted_str + assert uri.password not in redacted_str # Ensure the host didn't get redacted - check_found(redacted_str, uri.host, test_data['host_occurrences'], uri.description) + assert redacted_str.count(uri.host) == test_data['host_occurrences'] From 55730f873094f5d910124584d1188ea58f02adf8 Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 7 Jul 2016 11:58:38 -0400 Subject: [PATCH 09/11] Ported old/organizations.py tests --- .../functional/api/test_organizations.py | 182 ++++++++ .../tests/functional/test_auth_token_limit.py | 39 ++ awx/main/tests/old/organizations.py | 413 ------------------ 3 files changed, 221 insertions(+), 413 deletions(-) create mode 100644 awx/main/tests/functional/api/test_organizations.py create mode 100644 awx/main/tests/functional/test_auth_token_limit.py delete mode 100644 awx/main/tests/old/organizations.py diff --git a/awx/main/tests/functional/api/test_organizations.py b/awx/main/tests/functional/api/test_organizations.py new file mode 100644 index 0000000000..8a9da1c662 --- /dev/null +++ b/awx/main/tests/functional/api/test_organizations.py @@ -0,0 +1,182 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved. + +# Python +import pytest + +# Django +from django.core.urlresolvers import reverse + +# AWX +from awx.main.models import * # noqa + + +@pytest.mark.django_db +def test_organization_list_access_tests(options, head, get, admin, alice): + options(reverse('api:organization_list'), user=admin, expect=200) + head(reverse('api:organization_list'), user=admin, expect=200) + get(reverse('api:organization_list'), user=admin, expect=200) + options(reverse('api:organization_list'), user=alice, expect=200) + head(reverse('api:organization_list'), user=alice, expect=200) + get(reverse('api:organization_list'), user=alice, expect=200) + options(reverse('api:organization_list'), user=None, expect=401) + head(reverse('api:organization_list'), user=None, expect=401) + get(reverse('api:organization_list'), user=None, expect=401) + + +@pytest.mark.django_db +def test_organization_access_tests(organization, get, admin, alice, bob): + organization.member_role.members.add(alice) + get(reverse('api:organization_detail', args=(organization.id,)), user=admin, expect=200) + get(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=200) + get(reverse('api:organization_detail', args=(organization.id,)), user=bob, expect=403) + get(reverse('api:organization_detail', args=(organization.id,)), user=None, expect=401) + + +@pytest.mark.django_db +def test_organization_list_integrity(organization, get, admin, alice): + res = get(reverse('api:organization_list'), user=admin) + for field in ['id', 'url', 'name', 'description', 'created']: + assert field in res.data['results'][0] + + +@pytest.mark.django_db +def test_organization_list_visibility(organizations, get, admin, alice): + orgs = organizations(2) + + res = get(reverse('api:organization_list'), user=admin) + assert res.data['count'] == 2 + assert len(res.data['results']) == 2 + + res = get(reverse('api:organization_list'), user=alice) + assert res.data['count'] == 0 + + orgs[1].member_role.members.add(alice) + + res = get(reverse('api:organization_list'), user=alice) + assert res.data['count'] == 1 + assert len(res.data['results']) == 1 + assert res.data['results'][0]['id'] == orgs[1].id + + +@pytest.mark.django_db +def test_organization_project_list(organization, project_factory, get, alice, bob, rando): + prj1 = project_factory('project-one') + project_factory('project-two') + organization.admin_role.members.add(alice) + organization.member_role.members.add(bob) + prj1.use_role.members.add(bob) + assert get(reverse('api:organization_projects_list', args=(organization.id,)), user=alice).data['count'] == 2 + assert get(reverse('api:organization_projects_list', args=(organization.id,)), user=bob).data['count'] == 1 + assert get(reverse('api:organization_projects_list', args=(organization.id,)), user=rando).status_code == 403 + + +@pytest.mark.django_db +def test_organization_user_list(organization, get, admin, alice, bob): + organization.admin_role.members.add(alice) + organization.member_role.members.add(alice) + organization.member_role.members.add(bob) + assert get(reverse('api:organization_users_list', args=(organization.id,)), user=admin).data['count'] == 2 + assert get(reverse('api:organization_users_list', args=(organization.id,)), user=alice).data['count'] == 2 + assert get(reverse('api:organization_users_list', args=(organization.id,)), user=bob).data['count'] == 2 + assert get(reverse('api:organization_admins_list', args=(organization.id,)), user=admin).data['count'] == 1 + assert get(reverse('api:organization_admins_list', args=(organization.id,)), user=alice).data['count'] == 1 + assert get(reverse('api:organization_admins_list', args=(organization.id,)), user=bob).data['count'] == 1 + + +@pytest.mark.django_db +def test_organization_inventory_list(organization, inventory_factory, get, alice, bob, rando): + inv1 = inventory_factory('inventory-one') + inventory_factory('inventory-two') + organization.admin_role.members.add(alice) + organization.member_role.members.add(bob) + inv1.use_role.members.add(bob) + assert get(reverse('api:organization_inventories_list', args=(organization.id,)), user=alice).data['count'] == 2 + assert get(reverse('api:organization_inventories_list', args=(organization.id,)), user=bob).data['count'] == 1 + get(reverse('api:organization_inventories_list', args=(organization.id,)), user=rando, expect=403) + + +@pytest.mark.django_db +def test_create_organization(post, admin, alice): + new_org = { + 'name': 'new org', + 'description': 'my description' + } + res = post(reverse('api:organization_list'), new_org, user=admin, expect=201) + assert res.data['name'] == new_org['name'] + res = post(reverse('api:organization_list'), new_org, user=admin, expect=400) + + +@pytest.mark.django_db +def test_create_organization_xfail(post, alice): + new_org = { + 'name': 'new org', + 'description': 'my description' + } + post(reverse('api:organization_list'), new_org, user=alice, expect=403) + + +@pytest.mark.django_db +def test_add_user_to_organization(post, organization, alice, bob): + organization.admin_role.members.add(alice) + post(reverse('api:organization_users_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=204) + assert bob in organization.member_role + post(reverse('api:organization_users_list', args=(organization.id,)), {'id': bob.id, 'disassociate': True} , user=alice, expect=204) + assert bob not in organization.member_role + + +@pytest.mark.django_db +def test_add_user_to_organization_xfail(post, organization, alice, bob): + organization.member_role.members.add(alice) + post(reverse('api:organization_users_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=403) + + +@pytest.mark.django_db +def test_add_admin_to_organization(post, organization, alice, bob): + organization.admin_role.members.add(alice) + post(reverse('api:organization_admins_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=204) + assert bob in organization.admin_role + assert bob in organization.member_role + post(reverse('api:organization_admins_list', args=(organization.id,)), {'id': bob.id, 'disassociate': True} , user=alice, expect=204) + assert bob not in organization.admin_role + assert bob not in organization.member_role + + +@pytest.mark.django_db +def test_add_admin_to_organization_xfail(post, organization, alice, bob): + organization.member_role.members.add(alice) + post(reverse('api:organization_admins_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=403) + + +@pytest.mark.django_db +def test_update_organization(get, put, organization, alice, bob): + organization.admin_role.members.add(alice) + data = get(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=200).data + data['description'] = 'hi' + put(reverse('api:organization_detail', args=(organization.id,)), data, user=alice, expect=200) + organization.refresh_from_db() + assert organization.description == 'hi' + data['description'] = 'bye' + put(reverse('api:organization_detail', args=(organization.id,)), data, user=bob, expect=403) + + +@pytest.mark.django_db +def test_delete_organization(delete, organization, admin): + delete(reverse('api:organization_detail', args=(organization.id,)), user=admin, expect=204) + + +@pytest.mark.django_db +def test_delete_organization2(delete, organization, alice): + organization.admin_role.members.add(alice) + delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=204) + + +@pytest.mark.django_db +def test_delete_organization_xfail1(delete, organization, alice): + organization.member_role.members.add(alice) + delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=403) + + +@pytest.mark.django_db +def test_delete_organization_xfail2(delete, organization): + delete(reverse('api:organization_detail', args=(organization.id,)), user=None, expect=401) diff --git a/awx/main/tests/functional/test_auth_token_limit.py b/awx/main/tests/functional/test_auth_token_limit.py new file mode 100644 index 0000000000..bbe30320c4 --- /dev/null +++ b/awx/main/tests/functional/test_auth_token_limit.py @@ -0,0 +1,39 @@ +import pytest +from datetime import timedelta + +from django.utils.timezone import now as tz_now +from django.test.utils import override_settings + +from awx.main.models import AuthToken, User + + +@override_settings(AUTH_TOKEN_PER_USER=3) +@pytest.mark.django_db +def test_get_tokens_over_limit(): + now = tz_now() + # Times are relative to now + # (key, created on in seconds , expiration in seconds) + test_data = [ + # a is implicitly expired + ("a", -1000, -10), + # b's are invalid due to session limit of 3 + ("b", -100, 60), + ("bb", -100, 60), + ("c", -90, 70), + ("d", -80, 80), + ("e", -70, 90), + ] + user = User.objects.create_superuser('admin', 'foo@bar.com', 'password') + for key, t_create, t_expire in test_data: + AuthToken.objects.create( + user=user, + key=key, + request_hash='this_is_a_hash', + created=now + timedelta(seconds=t_create), + expires=now + timedelta(seconds=t_expire), + ) + invalid_tokens = AuthToken.get_tokens_over_limit(user, now=now) + invalid_keys = [x.key for x in invalid_tokens] + assert len(invalid_keys) == 2 + assert 'b' in invalid_keys + assert 'bb' in invalid_keys diff --git a/awx/main/tests/old/organizations.py b/awx/main/tests/old/organizations.py deleted file mode 100644 index 136e2603cc..0000000000 --- a/awx/main/tests/old/organizations.py +++ /dev/null @@ -1,413 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved. - -# Python -from datetime import timedelta - -# Django -from django.core.urlresolvers import reverse -from django.test.utils import override_settings -from django.contrib.auth.models import User -from django.utils.timezone import now as tz_now - -# AWX -from awx.main.models import * # noqa -from awx.main.tests.base import BaseTest - -__all__ = ['AuthTokenLimitUnitTest', 'OrganizationsTest'] - -class AuthTokenLimitUnitTest(BaseTest): - - def setUp(self): - self.now = tz_now() - # Times are relative to now - # (key, created on in seconds , expiration in seconds) - self.test_data = [ - # a is implicitly expired - ("a", -1000, -10), - # b's are invalid due to session limit of 3 - ("b", -100, 60), - ("bb", -100, 60), - ("c", -90, 70), - ("d", -80, 80), - ("e", -70, 90), - ] - self.user = User.objects.create_superuser('admin', 'foo@bar.com', 'password') - for key, t_create, t_expire in self.test_data: - AuthToken.objects.create( - user=self.user, - key=key, - request_hash='this_is_a_hash', - created=self.now + timedelta(seconds=t_create), - expires=self.now + timedelta(seconds=t_expire), - ) - super(AuthTokenLimitUnitTest, self).setUp() - - @override_settings(AUTH_TOKEN_PER_USER=3) - def test_get_tokens_over_limit(self): - invalid_tokens = AuthToken.get_tokens_over_limit(self.user, now=self.now) - invalid_keys = [x.key for x in invalid_tokens] - self.assertEqual(len(invalid_keys), 2) - self.assertIn('b', invalid_keys) - self.assertIn('bb', invalid_keys) - -class OrganizationsTest(BaseTest): - - def collection(self): - return reverse('api:organization_list') - - def setUp(self): - super(OrganizationsTest, self).setUp() - self.setup_instances() - # TODO: Test non-enterprise license - self.create_test_license_file() - self.setup_users() - - self.organizations = self.make_organizations(self.super_django_user, 10) - self.projects = self.make_projects(self.normal_django_user, 10) - - # add projects to organizations in a more or less arbitrary way - for project in self.projects[0:2]: - self.organizations[0].projects.add(project) - for project in self.projects[3:8]: - self.organizations[1].projects.add(project) - for project in self.projects[9:10]: - self.organizations[2].projects.add(project) - self.organizations[0].projects.add(self.projects[-1]) - self.organizations[9].projects.add(self.projects[-2]) - - # get the URL for various organization records - self.a_detail_url = "%s%s" % (self.collection(), self.organizations[0].pk) - self.b_detail_url = "%s%s" % (self.collection(), self.organizations[1].pk) - self.c_detail_url = "%s%s" % (self.collection(), self.organizations[2].pk) - - # configuration: - # admin_user is an admin and regular user in all organizations - # other_user is all organizations - # normal_user is a user in organization 0, and an admin of organization 1 - # nobody_user is a user not a member of any organizations - - for x in self.organizations: - x.admin_role.members.add(self.super_django_user) - x.member_role.members.add(self.super_django_user) - x.member_role.members.add(self.other_django_user) - - self.organizations[0].member_role.members.add(self.normal_django_user) - self.organizations[1].admin_role.members.add(self.normal_django_user) - - def test_get_organization_list(self): - url = reverse('api:organization_list') - - # no credentials == 401 - self.options(url, expect=401) - self.head(url, expect=401) - self.get(url, expect=401) - - # wrong credentials == 401 - with self.current_user(self.get_invalid_credentials()): - self.options(url, expect=401) - self.head(url, expect=401) - self.get(url, expect=401) - - # superuser credentials == 200, full list - with self.current_user(self.super_django_user): - self.options(url, expect=200) - self.head(url, expect=200) - response = self.get(url, expect=200) - self.check_pagination_and_size(response, 10, previous=None, next=None) - self.assertEqual(len(response['results']), - Organization.objects.count()) - for field in ['id', 'url', 'name', 'description', 'created']: - self.assertTrue(field in response['results'][0], - 'field %s not in result' % field) - - # check that the related URL functionality works - related = response['results'][0]['related'] - for x in ['projects', 'users', 'admins']: - self.assertTrue(x in related and related[x].endswith("/%s/" % x), "looking for %s in related" % x) - - # normal credentials == 200, get only organizations of which user is a member - with self.current_user(self.normal_django_user): - self.options(url, expect=200) - self.head(url, expect=200) - response = self.get(url, expect=200) - self.check_pagination_and_size(response, 2, previous=None, next=None) - - # no admin rights? get empty list - with self.current_user(self.other_django_user): - response = self.get(url, expect=200) - self.check_pagination_and_size(response, len(self.organizations), previous=None, next=None) - - # not a member of any orgs? get empty list - with self.current_user(self.nobody_django_user): - response = self.get(url, expect=200) - self.check_pagination_and_size(response, 0, previous=None, next=None) - - def test_get_item(self): - - # first get all the URLs - data = self.get(self.collection(), expect=200, auth=self.get_super_credentials()) - urls = [item['url'] for item in data['results']] - - # make sure super user can fetch records - data = self.get(urls[0], expect=200, auth=self.get_super_credentials()) - [self.assertTrue(key in data) for key in ['name', 'description', 'url']] - - # make sure invalid user cannot - data = self.get(urls[0], expect=401, auth=self.get_invalid_credentials()) - - # normal user should be able to get org 0 and org 1 but not org 9 (as he's not a user or admin of it) - data = self.get(urls[0], expect=200, auth=self.get_normal_credentials()) - data = self.get(urls[1], expect=200, auth=self.get_normal_credentials()) - data = self.get(urls[9], expect=403, auth=self.get_normal_credentials()) - - # other user is a member, but not admin, can access org - data = self.get(urls[0], expect=200, auth=self.get_other_credentials()) - - # nobody user is not a member, cannot access org - data = self.get(urls[0], expect=403, auth=self.get_nobody_credentials()) - - def test_get_item_subobjects_projects(self): - - # first get all the orgs - orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials()) - - # find projects attached to the first org - projects0_url = orgs['results'][0]['related']['projects'] - projects1_url = orgs['results'][1]['related']['projects'] - projects9_url = orgs['results'][9]['related']['projects'] - - self.get(projects0_url, expect=401, auth=None) - self.get(projects0_url, expect=401, auth=self.get_invalid_credentials()) - - # normal user is just a member of the first org, so can see all projects under the org - self.get(projects0_url, expect=200, auth=self.get_normal_credentials()) - - # however in the second org, he's an admin and should see all of them - projects1a = self.get(projects1_url, expect=200, auth=self.get_normal_credentials()) - self.assertEquals(projects1a['count'], 5) - - # but the non-admin cannot access the list of projects in the org. He should use /projects/ instead! - self.get(projects1_url, expect=200, auth=self.get_other_credentials()) - - # superuser should be able to read anything - projects9a = self.get(projects9_url, expect=200, auth=self.get_super_credentials()) - self.assertEquals(projects9a['count'], 1) - - # nobody user is not a member of any org, so can't see projects... - self.get(projects0_url, expect=403, auth=self.get_nobody_credentials()) - projects1a = self.get(projects1_url, expect=403, auth=self.get_nobody_credentials()) - - def test_get_item_subobjects_users(self): - - # see if we can list the users added to the organization - orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials()) - org1_users_url = orgs['results'][1]['related']['users'] - org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials()) - self.assertEquals(org1_users['count'], 2) - org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials()) - self.assertEquals(org1_users['count'], 2) - org1_users = self.get(org1_users_url, expect=200, auth=self.get_other_credentials()) - self.assertEquals(org1_users['count'], 2) - - def test_get_item_subobjects_admins(self): - - # see if we can list the users added to the organization - orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials()) - org1_users_url = orgs['results'][1]['related']['admins'] - org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials()) - self.assertEquals(org1_users['count'], 2) - org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials()) - self.assertEquals(org1_users['count'], 2) - - def test_get_organization_inventories_list(self): - pass - - def _test_get_item_subobjects_tags(self): - # FIXME: Update to support taggit! - - # put some tags on the org - org1 = Organization.objects.get(pk=2) - tag1 = Tag.objects.create(name='atag') - tag2 = Tag.objects.create(name='btag') - org1.tags.add(tag1) - org1.tags.add(tag2) - - # see if we can list the users added to the organization - orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials()) - org1_tags_url = orgs['results'][1]['related']['tags'] - org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_normal_credentials()) - self.assertEquals(org1_tags['count'], 2) - org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_super_credentials()) - self.assertEquals(org1_tags['count'], 2) - org1_tags = self.get(org1_tags_url, expect=403, auth=self.get_other_credentials()) - - def _test_get_item_subobjects_audit_trail(self): - # FIXME: Update to support whatever audit trail framework is used. - url = '/api/v1/organizations/2/audit_trail/' - self.get(url, expect=200, auth=self.get_normal_credentials()) - # FIXME: verify that some audit trail records are auto-created on save AND post - - def test_post_item(self): - - new_org = dict(name='magic test org', description='8675309') - - # need to be a valid user - self.post(self.collection(), new_org, expect=401, auth=None) - self.post(self.collection(), new_org, expect=401, auth=self.get_invalid_credentials()) - - # only super users can create organizations - self.post(self.collection(), new_org, expect=403, auth=self.get_normal_credentials()) - self.post(self.collection(), new_org, expect=403, auth=self.get_other_credentials()) - data1 = self.post(self.collection(), new_org, expect=201, auth=self.get_super_credentials()) - - # duplicate post results in 400 - response = self.post(self.collection(), new_org, expect=400, auth=self.get_super_credentials()) - self.assertTrue('name' in response, response) - self.assertTrue('Name' in response['name'][0], response) - - # look at what we got back from the post, make sure we added an org - last_org = Organization.objects.order_by('-pk')[0] - self.assertTrue(data1['url'].endswith("/%d/" % last_org.pk)) - - # Test that not even super users can create an organization with a basic license - self.create_basic_license_file() - cant_org = dict(name='silly user org', description='4815162342') - self.post(self.collection(), cant_org, expect=402, auth=self.get_super_credentials()) - - def test_post_item_subobjects_users(self): - - url = reverse('api:organization_users_list', args=(self.organizations[1].pk,)) - users = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(users['count'], 2) - self.post(url, dict(id=self.normal_django_user.pk), expect=204, auth=self.get_normal_credentials()) - users = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(users['count'], 3) - self.post(url, dict(id=self.normal_django_user.pk, disassociate=True), expect=204, auth=self.get_normal_credentials()) - users = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(users['count'], 2) - - # post a completely new user to verify we can add users to the subcollection directly - new_user = dict(username='NewUser9000', password='NewPassword9000') - which_org = Organization.accessible_objects(self.normal_django_user, 'admin_role')[0] - url = reverse('api:organization_users_list', args=(which_org.pk,)) - self.post(url, new_user, expect=201, auth=self.get_normal_credentials()) - - all_users = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(all_users['count'], 3) - - def test_post_item_subobjects_admins(self): - - url = reverse('api:organization_admins_list', args=(self.organizations[1].pk,)) - admins = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(admins['count'], 2) - self.post(url, dict(id=self.other_django_user.pk), expect=204, auth=self.get_normal_credentials()) - admins = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(admins['count'], 3) - self.post(url, dict(id=self.other_django_user.pk, disassociate=1), expect=204, auth=self.get_normal_credentials()) - admins = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(admins['count'], 2) - - def _test_post_item_subobjects_tags(self): - # FIXME: Update to support taggit! - - tag = Tag.objects.create(name='blippy') - url = '/api/v1/organizations/2/tags/' - tags = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(tags['count'], 0) - self.post(url, dict(id=tag.pk), expect=204, auth=self.get_normal_credentials()) - tags = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(tags['count'], 1) - self.assertEqual(tags['results'][0]['id'], tag.pk) - self.post(url, dict(id=tag.pk, disassociate=1), expect=204, auth=self.get_normal_credentials()) - tags = self.get(url, expect=200, auth=self.get_normal_credentials()) - self.assertEqual(tags['count'], 0) - - def _test_post_item_subobjects_audit_trail(self): - # FIXME: Update to support whatever audit trail framework is used. - # audit trails are system things, and no user can post to them. - url = '/api/v1/organizations/2/audit_trail/' - self.post(url, dict(id=1), expect=405, auth=self.get_super_credentials()) - - def test_put_item(self): - - # first get some urls and data to put back to them - urls = self.get_urls(self.collection(), auth=self.get_super_credentials()) - self.get(urls[0], expect=200, auth=self.get_super_credentials()) - data1 = self.get(urls[1], expect=200, auth=self.get_super_credentials()) - - # test that an unauthenticated user cannot do a put - new_data1 = data1.copy() - new_data1['description'] = 'updated description' - self.put(urls[0], new_data1, expect=401, auth=None) - self.put(urls[0], new_data1, expect=401, auth=self.get_invalid_credentials()) - - # user normal is an admin of org 0 and a member of org 1 so should be able to put only org 1 - self.put(urls[0], new_data1, expect=403, auth=self.get_normal_credentials()) - self.put(urls[1], new_data1, expect=200, auth=self.get_normal_credentials()) - - # get back org 1 and see if it changed - get_result = self.get(urls[1], expect=200, auth=self.get_normal_credentials()) - self.assertEquals(get_result['description'], 'updated description') - - # super user can also put even though they aren't added to the org users or admins list - self.put(urls[1], new_data1, expect=200, auth=self.get_super_credentials()) - - # make sure posting to this URL is not supported - self.post(urls[1], new_data1, expect=405, auth=self.get_super_credentials()) - - def test_put_item_subobjects_projects(self): - - # any attempt to put a subobject should be a 405, edit the actual resource or POST with 'disassociate' to delete - # this is against a collection URL anyway, so we really need not repeat this test for other object types - # as a PUT against a collection doesn't make much sense. - - orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials()) - projects0_url = orgs['results'][0]['related']['projects'] - sub_projects = self.get(projects0_url, expect=200, auth=self.get_super_credentials()) - self.assertEquals(sub_projects['count'], 3) - first_sub_project = sub_projects['results'][0] - self.put(projects0_url, first_sub_project, expect=405, auth=self.get_super_credentials()) - - def test_delete_item(self): - - # first get some urls - urls = self.get_urls(self.collection(), auth=self.get_super_credentials()) - urldata1 = self.get(urls[1], auth=self.get_super_credentials()) - - # check authentication -- admins of the org and superusers can delete objects only - self.delete(urls[0], expect=401, auth=None) - self.delete(urls[0], expect=401, auth=self.get_invalid_credentials()) - self.delete(urls[8], expect=403, auth=self.get_normal_credentials()) - self.delete(urls[1], expect=204, auth=self.get_normal_credentials()) - self.delete(urls[0], expect=204, auth=self.get_super_credentials()) - - # check that when we have deleted an object it comes back 404 via GET - self.get(urls[1], expect=404, auth=self.get_normal_credentials()) - assert Organization.objects.filter(pk=urldata1['id']).count() == 0 - - # also check that DELETE on the collection doesn't work - self.delete(self.collection(), expect=405, auth=self.get_super_credentials()) - - # Test that not even super users can delete an organization with a basic license - self.create_basic_license_file() - self.delete(urls[2], expect=402, auth=self.get_super_credentials()) - - def test_invalid_post_data(self): - url = reverse('api:organization_list') - # API should gracefully handle data of an invalid type. - self.post(url, expect=400, data=None, auth=self.get_super_credentials()) - self.post(url, expect=400, data=99, auth=self.get_super_credentials()) - self.post(url, expect=400, data='abcd', auth=self.get_super_credentials()) - self.post(url, expect=400, data=3.14, auth=self.get_super_credentials()) - self.post(url, expect=400, data=True, auth=self.get_super_credentials()) - self.post(url, expect=400, data=[1,2,3], auth=self.get_super_credentials()) - url = reverse('api:organization_users_list', args=(self.organizations[0].pk,)) - self.post(url, expect=400, data=None, auth=self.get_super_credentials()) - self.post(url, expect=400, data=99, auth=self.get_super_credentials()) - self.post(url, expect=400, data='abcd', auth=self.get_super_credentials()) - self.post(url, expect=400, data=3.14, auth=self.get_super_credentials()) - self.post(url, expect=400, data=True, auth=self.get_super_credentials()) - self.post(url, expect=400, data=[1,2,3], auth=self.get_super_credentials()) - -# TODO: tests for tag disassociation From 856ad80c214c9f70f3ef904daba6904a8cacc9ad Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 7 Jul 2016 13:53:21 -0400 Subject: [PATCH 10/11] Mock feature_enabled for organization tests --- awx/main/tests/functional/api/test_organizations.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/awx/main/tests/functional/api/test_organizations.py b/awx/main/tests/functional/api/test_organizations.py index 8a9da1c662..7aa266deb3 100644 --- a/awx/main/tests/functional/api/test_organizations.py +++ b/awx/main/tests/functional/api/test_organizations.py @@ -3,6 +3,8 @@ # Python import pytest +import mock + # Django from django.core.urlresolvers import reverse @@ -97,6 +99,7 @@ def test_organization_inventory_list(organization, inventory_factory, get, alice @pytest.mark.django_db +@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) def test_create_organization(post, admin, alice): new_org = { 'name': 'new org', @@ -108,6 +111,7 @@ def test_create_organization(post, admin, alice): @pytest.mark.django_db +@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) def test_create_organization_xfail(post, alice): new_org = { 'name': 'new org', @@ -161,22 +165,26 @@ def test_update_organization(get, put, organization, alice, bob): @pytest.mark.django_db +@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) def test_delete_organization(delete, organization, admin): delete(reverse('api:organization_detail', args=(organization.id,)), user=admin, expect=204) @pytest.mark.django_db +@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) def test_delete_organization2(delete, organization, alice): organization.admin_role.members.add(alice) delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=204) @pytest.mark.django_db +@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) def test_delete_organization_xfail1(delete, organization, alice): organization.member_role.members.add(alice) delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=403) @pytest.mark.django_db +@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) def test_delete_organization_xfail2(delete, organization): delete(reverse('api:organization_detail', args=(organization.id,)), user=None, expect=401) From 521fa13662ba2f5bee63f48cf39792d231538f0d Mon Sep 17 00:00:00 2001 From: Akita Noek Date: Thu, 7 Jul 2016 14:41:11 -0400 Subject: [PATCH 11/11] More mock fixes for organizations.py tests --- awx/main/tests/functional/api/test_organizations.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/awx/main/tests/functional/api/test_organizations.py b/awx/main/tests/functional/api/test_organizations.py index 7aa266deb3..d141ddd6b5 100644 --- a/awx/main/tests/functional/api/test_organizations.py +++ b/awx/main/tests/functional/api/test_organizations.py @@ -165,26 +165,26 @@ def test_update_organization(get, put, organization, alice, bob): @pytest.mark.django_db -@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) +@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True) def test_delete_organization(delete, organization, admin): delete(reverse('api:organization_detail', args=(organization.id,)), user=admin, expect=204) @pytest.mark.django_db -@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) +@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True) def test_delete_organization2(delete, organization, alice): organization.admin_role.members.add(alice) delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=204) @pytest.mark.django_db -@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) +@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True) def test_delete_organization_xfail1(delete, organization, alice): organization.member_role.members.add(alice) delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=403) @pytest.mark.django_db -@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True) +@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True) def test_delete_organization_xfail2(delete, organization): delete(reverse('api:organization_detail', args=(organization.id,)), user=None, expect=401)