functional -> old

This commit is contained in:
Wayne Witzel III
2016-02-02 09:13:16 -05:00
parent a44318f0cd
commit f12ab22591
30 changed files with 7 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
Old Tests
=========
This are the old django.TestCase / unittest.TestCase tests for Tower.
No new tests should be added to this folder. Overtime, we will be refactoring
tests out of this folder as py.test tests and into their respective functional
and unit folders.

View File

@@ -0,0 +1,55 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
class ActivityStreamTest(BaseTest):
def collection(self):
return reverse('api:activity_stream_list')
def item(self, item_id):
return reverse('api:activity_stream_detail', args=(item_id,))
def setUp(self):
super(ActivityStreamTest, self).setUp()
self.setup_instances()
self.create_test_license_file()
# TODO: Test non-enterprise license
self.setup_users()
self.org_created = self.post(reverse('api:organization_list'), dict(name='test org', description='test descr'), expect=201, auth=self.get_super_credentials())
def test_get_activity_stream_list(self):
url = self.collection()
with self.current_user(self.super_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 1, previous=None, next=None)
def test_basic_fields(self):
item_id = ActivityStream.objects.order_by('pk')[0].pk
org_item = self.item(item_id)
with self.current_user(self.super_django_user):
response = self.get(org_item, expect=200)
self.assertTrue("related" in response)
self.assertTrue("organization" in response['related'])
self.assertTrue("summary_fields" in response)
self.assertTrue("organization" in response['summary_fields'])
self.assertTrue(response['summary_fields']['organization'][0]['name'] == self.org_created['name'])
def test_changeby_user(self):
item_id = ActivityStream.objects.order_by('pk')[0].pk
org_item = self.item(item_id)
with self.current_user(self.super_django_user):
response = self.get(org_item, expect=200)
self.assertEqual(response['summary_fields']['actor']['username'], self.super_django_user.username)

1240
awx/main/tests/old/ad_hoc.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,34 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# AWX
from awx.main.tests.base import BaseTest
from command_base import BaseCommandMixin
__all__ = ['AgeDeletedCommandFunctionalTest']
class AgeDeletedCommandFunctionalTest(BaseCommandMixin, BaseTest):
def setUp(self):
super(AgeDeletedCommandFunctionalTest, self).setUp()
self.create_test_license_file()
self.setup_instances()
self.setup_users()
self.organization = self.make_organization(self.super_django_user)
self.credential = self.make_credential()
self.credential2 = self.make_credential()
self.credential.mark_inactive(True)
self.credential2.mark_inactive(True)
self.credential_active = self.make_credential()
self.super_django_user.mark_inactive(True)
def test_default(self):
result, stdout, stderr = self.run_command('age_deleted')
self.assertEqual(stdout, 'Aged %d items\n' % 3)
def test_type(self):
result, stdout, stderr = self.run_command('age_deleted', type='Credential')
self.assertEqual(stdout, 'Aged %d items\n' % 2)
def test_id_type(self):
result, stdout, stderr = self.run_command('age_deleted', type='Credential', id=self.credential.pk)
self.assertEqual(stdout, 'Aged %d items\n' % 1)

View File

@@ -0,0 +1,238 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from datetime import datetime
from dateutil.relativedelta import relativedelta
import mock
#Django
from django.core.management.base import CommandError
# AWX
from awx.main.tests.base import BaseTest
from awx.fact.tests.base import MongoDBRequired, FactScanBuilder, TEST_FACT_PACKAGES, TEST_FACT_ANSIBLE, TEST_FACT_SERVICES
from command_base import BaseCommandMixin
from awx.main.management.commands.cleanup_facts import Command, CleanupFacts
from awx.fact.models.fact import * # noqa
__all__ = ['CommandTest','CleanupFactsUnitTest', 'CleanupFactsCommandFunctionalTest']
class CleanupFactsCommandFunctionalTest(BaseCommandMixin, BaseTest, MongoDBRequired):
def setUp(self):
super(CleanupFactsCommandFunctionalTest, self).setUp()
self.create_test_license_file()
self.builder = FactScanBuilder()
self.builder.add_fact('ansible', TEST_FACT_ANSIBLE)
def test_invoke_zero_ok(self):
self.builder.set_epoch(datetime(year=2015, day=2, month=1, microsecond=0))
self.builder.build(scan_count=20, host_count=10)
result, stdout, stderr = self.run_command('cleanup_facts', granularity='2y', older_than='1d')
self.assertEqual(stdout, 'Deleted %s facts.\n' % ((200 / 2)))
def test_invoke_zero_deleted(self):
result, stdout, stderr = self.run_command('cleanup_facts', granularity='1w',older_than='5d')
self.assertEqual(stdout, 'Deleted 0 facts.\n')
def test_invoke_all_deleted(self):
self.builder.build(scan_count=20, host_count=10)
result, stdout, stderr = self.run_command('cleanup_facts', granularity='0d', older_than='0d')
self.assertEqual(stdout, 'Deleted 200 facts.\n')
def test_invoke_params_required(self):
result, stdout, stderr = self.run_command('cleanup_facts')
self.assertIsInstance(result, CommandError)
self.assertEqual(str(result), 'Both --granularity and --older_than are required.')
def test_module(self):
self.builder.add_fact('packages', TEST_FACT_PACKAGES)
self.builder.add_fact('services', TEST_FACT_SERVICES)
self.builder.build(scan_count=5, host_count=5)
result, stdout, stderr = self.run_command('cleanup_facts', granularity='0d', older_than='0d', module='packages')
self.assertEqual(stdout, 'Deleted 25 facts.\n')
class CommandTest(BaseTest):
def setUp(self):
super(CommandTest, self).setUp()
self.create_test_license_file()
@mock.patch('awx.main.management.commands.cleanup_facts.CleanupFacts.run')
def test_parameters_ok(self, run):
kv = {
'older_than': '1d',
'granularity': '1d',
'module': None,
}
cmd = Command()
cmd.handle(None, **kv)
run.assert_called_once_with(relativedelta(days=1), relativedelta(days=1), module=None)
def test_string_time_to_timestamp_ok(self):
kvs = [
{
'time': '2w',
'timestamp': relativedelta(weeks=2),
'msg': '2 weeks',
},
{
'time': '23d',
'timestamp': relativedelta(days=23),
'msg': '23 days',
},
{
'time': '11m',
'timestamp': relativedelta(months=11),
'msg': '11 months',
},
{
'time': '14y',
'timestamp': relativedelta(years=14),
'msg': '14 years',
},
]
for kv in kvs:
cmd = Command()
res = cmd.string_time_to_timestamp(kv['time'])
self.assertEqual(kv['timestamp'], res, "%s should convert to %s" % (kv['time'], kv['msg']))
def test_string_time_to_timestamp_invalid(self):
kvs = [
{
'time': '2weeks',
'msg': 'weeks instead of w',
},
{
'time': '2days',
'msg': 'days instead of d',
},
{
'time': '23',
'msg': 'no unit specified',
},
{
'time': None,
'msg': 'no value specified',
},
{
'time': 'zigzag',
'msg': 'random string specified',
},
]
for kv in kvs:
cmd = Command()
res = cmd.string_time_to_timestamp(kv['time'])
self.assertIsNone(res, kv['msg'])
# Mock run() just in case, but it should never get called because an error should be thrown
@mock.patch('awx.main.management.commands.cleanup_facts.CleanupFacts.run')
def test_parameters_fail(self, run):
kvs = [
{
'older_than': '1week',
'granularity': '1d',
'msg': 'Invalid older_than param value',
},
{
'older_than': '1d',
'granularity': '1year',
'msg': 'Invalid granularity param value',
}
]
for kv in kvs:
cmd = Command()
with self.assertRaises(CommandError):
cmd.handle(None, older_than=kv['older_than'], granularity=kv['granularity'])
class CleanupFactsUnitTest(BaseCommandMixin, BaseTest, MongoDBRequired):
def setUp(self):
super(CleanupFactsUnitTest, self).setUp()
self.builder = FactScanBuilder()
self.builder.add_fact('ansible', TEST_FACT_ANSIBLE)
self.builder.add_fact('packages', TEST_FACT_PACKAGES)
self.builder.build(scan_count=20, host_count=10)
'''
Create 10 hosts with 40 facts each. After cleanup, there should be 20 facts for each host.
Then ensure the correct facts are deleted.
'''
def test_cleanup_logic(self):
cleanup_facts = CleanupFacts()
fact_oldest = FactVersion.objects.all().order_by('timestamp').first()
granularity = relativedelta(years=2)
deleted_count = cleanup_facts.cleanup(self.builder.get_timestamp(0), granularity)
self.assertEqual(deleted_count, 2 * (self.builder.get_scan_count() * self.builder.get_host_count()) / 2)
# Check the number of facts per host
for host in self.builder.get_hosts():
count = FactVersion.objects.filter(host=host).count()
scan_count = (2 * self.builder.get_scan_count()) / 2
self.assertEqual(count, scan_count)
count = Fact.objects.filter(host=host).count()
self.assertEqual(count, scan_count)
# Ensure that only 2 facts (ansible and packages) exists per granularity time
date_pivot = self.builder.get_timestamp(0)
for host in self.builder.get_hosts():
while date_pivot > fact_oldest.timestamp:
date_pivot_next = date_pivot - granularity
kv = {
'timestamp__lte': date_pivot,
'timestamp__gt': date_pivot_next,
'host': host,
}
count = FactVersion.objects.filter(**kv).count()
self.assertEqual(count, 2, "should only be 2 FactVersion per the 2 year granularity")
count = Fact.objects.filter(**kv).count()
self.assertEqual(count, 2, "should only be 2 Fact per the 2 year granularity")
date_pivot = date_pivot_next
'''
Create 10 hosts with 40 facts each. After cleanup, there should be 30 facts for each host.
Then ensure the correct facts are deleted.
'''
def test_cleanup_module(self):
cleanup_facts = CleanupFacts()
fact_oldest = FactVersion.objects.all().order_by('timestamp').first()
granularity = relativedelta(years=2)
deleted_count = cleanup_facts.cleanup(self.builder.get_timestamp(0), granularity, module='ansible')
self.assertEqual(deleted_count, (self.builder.get_scan_count() * self.builder.get_host_count()) / 2)
# Check the number of facts per host
for host in self.builder.get_hosts():
count = FactVersion.objects.filter(host=host).count()
self.assertEqual(count, 30)
count = Fact.objects.filter(host=host).count()
self.assertEqual(count, 30)
# Ensure that only 1 ansible fact exists per granularity time
date_pivot = self.builder.get_timestamp(0)
for host in self.builder.get_hosts():
while date_pivot > fact_oldest.timestamp:
date_pivot_next = date_pivot - granularity
kv = {
'timestamp__lte': date_pivot,
'timestamp__gt': date_pivot_next,
'host': host,
'module': 'ansible',
}
count = FactVersion.objects.filter(**kv).count()
self.assertEqual(count, 1)
count = Fact.objects.filter(**kv).count()
self.assertEqual(count, 1)
date_pivot = date_pivot_next

View File

@@ -0,0 +1,86 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import StringIO
import sys
import json
# Django
from django.core.management import call_command
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
class BaseCommandMixin(BaseTestMixin):
def create_test_inventories(self):
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.projects = self.make_projects(self.normal_django_user, 2)
self.organizations[0].projects.add(self.projects[1])
self.organizations[1].projects.add(self.projects[0])
self.inventories = []
self.hosts = []
self.groups = []
for n, organization in enumerate(self.organizations):
inventory = Inventory.objects.create(name='inventory-%d' % n,
description='description for inventory %d' % n,
organization=organization,
variables=json.dumps({'n': n}) if n else '')
self.inventories.append(inventory)
hosts = []
for x in xrange(10):
if n > 0:
variables = json.dumps({'ho': 'hum-%d' % x})
else:
variables = ''
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variables=variables)
hosts.append(host)
self.hosts.extend(hosts)
groups = []
for x in xrange(5):
if n > 0:
variables = json.dumps({'gee': 'whiz-%d' % x})
else:
variables = ''
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variables=variables)
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
if n > 0 and x == 4:
group.parents.add(groups[3])
self.groups.extend(groups)
def run_command(self, name, *args, **options):
'''
Run a management command and capture its stdout/stderr along with any
exceptions.
'''
command_runner = options.pop('command_runner', call_command)
stdin_fileobj = options.pop('stdin_fileobj', None)
options.setdefault('verbosity', 1)
options.setdefault('interactive', False)
original_stdin = sys.stdin
original_stdout = sys.stdout
original_stderr = sys.stderr
if stdin_fileobj:
sys.stdin = stdin_fileobj
sys.stdout = StringIO.StringIO()
sys.stderr = StringIO.StringIO()
result = None
try:
result = command_runner(name, *args, **options)
except Exception as e:
result = e
finally:
captured_stdout = sys.stdout.getvalue()
captured_stderr = sys.stderr.getvalue()
sys.stdin = original_stdin
sys.stdout = original_stdout
sys.stderr = original_stderr
return result, captured_stdout, captured_stderr

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,39 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import uuid
# AWX
from awx.main.tests.base import BaseTest
from command_base import BaseCommandMixin
from awx.main.models import * # noqa
__all__ = ['RemoveInstanceCommandFunctionalTest']
class RemoveInstanceCommandFunctionalTest(BaseCommandMixin, BaseTest):
uuids = []
instances = []
def setup_instances(self):
self.uuids = [uuid.uuid4().hex for x in range(0, 3)]
self.instances.append(Instance(uuid=settings.SYSTEM_UUID, primary=True, hostname='127.0.0.1'))
self.instances.append(Instance(uuid=self.uuids[0], primary=False, hostname='127.0.0.2'))
self.instances.append(Instance(uuid=self.uuids[1], primary=False, hostname='127.0.0.3'))
self.instances.append(Instance(uuid=self.uuids[2], primary=False, hostname='127.0.0.4'))
for x in self.instances:
x.save()
def setUp(self):
super(RemoveInstanceCommandFunctionalTest, self).setUp()
self.create_test_license_file()
self.setup_instances()
self.setup_users()
def test_default(self):
self.assertEqual(Instance.objects.filter(hostname="127.0.0.2").count(), 1)
result, stdout, stderr = self.run_command('remove_instance', hostname='127.0.0.2')
self.assertIsNone(result)
self.assertEqual(stdout, 'Successfully removed instance (uuid="%s",hostname="127.0.0.2",role="secondary").\n' % (self.uuids[0]))
self.assertEqual(Instance.objects.filter(hostname="127.0.0.2").count(), 0)

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,116 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from mock import MagicMock, Mock
# Django
from django.test import SimpleTestCase
# AWX
from awx.fact.models.fact import * # noqa
from awx.main.management.commands.run_socketio_service import SocketSessionManager, SocketSession, SocketController
__all__ = ['SocketSessionManagerUnitTest', 'SocketControllerUnitTest',]
class WeakRefable():
pass
class SocketSessionManagerUnitTest(SimpleTestCase):
def setUp(self):
self.session_manager = SocketSessionManager()
super(SocketSessionManagerUnitTest, self).setUp()
def create_sessions(self, count, token_key=None):
self.sessions = []
self.count = count
for i in range(0, count):
self.sessions.append(SocketSession(i, token_key or i, WeakRefable()))
self.session_manager.add_session(self.sessions[i])
def test_multiple_session_diff_token(self):
self.create_sessions(10)
for s in self.sessions:
self.assertIn(s.token_key, self.session_manager.socket_session_token_key_map)
self.assertEqual(s, self.session_manager.socket_session_token_key_map[s.token_key][s.session_id])
def test_multiple_session_same_token(self):
self.create_sessions(10, token_key='foo')
sessions_dict = self.session_manager.lookup("foo")
self.assertEqual(len(sessions_dict), 10)
for s in self.sessions:
self.assertIn(s.session_id, sessions_dict)
self.assertEqual(s, sessions_dict[s.session_id])
def test_prune_sessions_max(self):
self.create_sessions(self.session_manager.SESSIONS_MAX + 10)
self.assertEqual(len(self.session_manager.socket_sessions), self.session_manager.SESSIONS_MAX)
class SocketControllerUnitTest(SimpleTestCase):
def setUp(self):
self.socket_controller = SocketController(SocketSessionManager())
server = Mock()
self.socket_controller.set_server(server)
super(SocketControllerUnitTest, self).setUp()
def create_clients(self, count, token_key=None):
self.sessions = []
self.sockets =[]
self.count = count
self.sockets_dict = {}
for i in range(0, count):
if isinstance(token_key, list):
token_key_actual = token_key[i]
else:
token_key_actual = token_key or i
socket = MagicMock(session=dict())
socket_session = SocketSession(i, token_key_actual, socket)
self.sockets.append(socket)
self.sessions.append(socket_session)
self.sockets_dict[i] = socket
self.socket_controller.add_session(socket_session)
socket.session['socket_session'] = socket_session
socket.send_packet = Mock()
self.socket_controller.server.sockets = self.sockets_dict
def test_broadcast_packet(self):
self.create_clients(10)
packet = {
"hello": "world"
}
self.socket_controller.broadcast_packet(packet)
for s in self.sockets:
s.send_packet.assert_called_with(packet)
def test_send_packet(self):
self.create_clients(5, token_key=[0, 1, 2, 3, 4])
packet = {
"hello": "world"
}
self.socket_controller.send_packet(packet, 2)
self.assertEqual(0, len(self.sockets[0].send_packet.mock_calls))
self.assertEqual(0, len(self.sockets[1].send_packet.mock_calls))
self.sockets[2].send_packet.assert_called_once_with(packet)
self.assertEqual(0, len(self.sockets[3].send_packet.mock_calls))
self.assertEqual(0, len(self.sockets[4].send_packet.mock_calls))
def test_send_packet_multiple_sessions_one_token(self):
self.create_clients(5, token_key=[0, 1, 1, 1, 2])
packet = {
"hello": "world"
}
self.socket_controller.send_packet(packet, 1)
self.assertEqual(0, len(self.sockets[0].send_packet.mock_calls))
self.sockets[1].send_packet.assert_called_once_with(packet)
self.sockets[2].send_packet.assert_called_once_with(packet)
self.sockets[3].send_packet.assert_called_once_with(packet)
self.assertEqual(0, len(self.sockets[4].send_packet.mock_calls))

View File

@@ -0,0 +1,37 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# AWX
from awx.main.tests.base import BaseTest
from command_base import BaseCommandMixin
# Django
from django.core.management.base import CommandError
__all__ = ['UpdatePasswordCommandFunctionalTest']
class UpdatePasswordCommandFunctionalTest(BaseCommandMixin, BaseTest):
def setUp(self):
super(UpdatePasswordCommandFunctionalTest, self).setUp()
self.create_test_license_file()
self.setup_instances()
self.setup_users()
def test_updated_ok(self):
result, stdout, stderr = self.run_command('update_password', username='admin', password='dingleberry')
self.assertEqual(stdout, 'Password updated\n')
def test_same_password(self):
result, stdout, stderr = self.run_command('update_password', username='admin', password='admin')
self.assertEqual(stdout, 'Password not updated\n')
def test_error_username_required(self):
result, stdout, stderr = self.run_command('update_password', password='foo')
self.assertIsInstance(result, CommandError)
self.assertEqual(str(result), 'username required')
def test_error_password_required(self):
result, stdout, stderr = self.run_command('update_password', username='admin')
self.assertIsInstance(result, CommandError)
self.assertEqual(str(result), 'password required')

View File

@@ -0,0 +1,242 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import unittest2 as unittest
# Django
from django.core.urlresolvers import reverse
# AWX
from awx.main.utils import timestamp_apiformat
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from awx.fact.models import * # noqa
from awx.fact.tests.base import BaseFactTestMixin, FactScanBuilder, TEST_FACT_ANSIBLE, TEST_FACT_PACKAGES, TEST_FACT_SERVICES
from awx.main.utils import build_url
__all__ = ['FactVersionApiTest', 'FactViewApiTest', 'SingleFactApiTest',]
class FactApiBaseTest(BaseLiveServerTest, BaseFactTestMixin):
def setUp(self):
super(FactApiBaseTest, self).setUp()
self.create_test_license_file()
self.setup_instances()
self.setup_users()
self.organization = self.make_organization(self.super_django_user)
self.organization.admins.add(self.normal_django_user)
self.inventory = self.organization.inventories.create(name='test-inventory', description='description for test-inventory')
self.host = self.inventory.hosts.create(name='host.example.com')
self.host2 = self.inventory.hosts.create(name='host2.example.com')
self.host3 = self.inventory.hosts.create(name='host3.example.com')
def setup_facts(self, scan_count):
self.builder = FactScanBuilder()
self.builder.set_inventory_id(self.inventory.pk)
self.builder.add_fact('ansible', TEST_FACT_ANSIBLE)
self.builder.add_fact('packages', TEST_FACT_PACKAGES)
self.builder.add_fact('services', TEST_FACT_SERVICES)
self.builder.add_hostname('host.example.com')
self.builder.add_hostname('host2.example.com')
self.builder.add_hostname('host3.example.com')
self.builder.build(scan_count=scan_count, host_count=3)
self.fact_host = FactHost.objects.get(hostname=self.host.name)
class FactVersionApiTest(FactApiBaseTest):
def check_equal(self, fact_versions, results):
def find(element, set1):
for e in set1:
if all([ e.get(field) == element.get(field) for field in element.keys()]):
return e
return None
self.assertEqual(len(results), len(fact_versions))
for v in fact_versions:
v_dict = {
'timestamp': timestamp_apiformat(v.timestamp),
'module': v.module
}
e = find(v_dict, results)
self.assertIsNotNone(e, "%s not found in %s" % (v_dict, results))
def get_list(self, fact_versions, params=None):
url = build_url('api:host_fact_versions_list', args=(self.host.pk,), get=params)
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
self.check_equal(fact_versions, response['results'])
return response
def test_permission_list(self):
url = reverse('api:host_fact_versions_list', args=(self.host.pk,))
with self.current_user('admin'):
self.get(url, expect=200)
with self.current_user('normal'):
self.get(url, expect=200)
with self.current_user('other'):
self.get(url, expect=403)
with self.current_user('nobody'):
self.get(url, expect=403)
with self.current_user(None):
self.get(url, expect=401)
def test_list_empty(self):
url = reverse('api:host_fact_versions_list', args=(self.host.pk,))
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
self.assertIn('results', response)
self.assertIsInstance(response['results'], list)
self.assertEqual(len(response['results']), 0)
def test_list_related_fact_view(self):
self.setup_facts(2)
url = reverse('api:host_fact_versions_list', args=(self.host.pk,))
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
for entry in response['results']:
self.assertIn('fact_view', entry['related'])
self.get(entry['related']['fact_view'], expect=200)
def test_list(self):
self.setup_facts(2)
self.get_list(FactVersion.objects.filter(host=self.fact_host))
def test_list_module(self):
self.setup_facts(10)
self.get_list(FactVersion.objects.filter(host=self.fact_host, module='packages'), dict(module='packages'))
def test_list_time_from(self):
self.setup_facts(10)
params = {
'from': timestamp_apiformat(self.builder.get_timestamp(1)),
}
# 'to': timestamp_apiformat(self.builder.get_timestamp(3))
fact_versions = FactVersion.objects.filter(host=self.fact_host, timestamp__gt=params['from'])
self.get_list(fact_versions, params)
def test_list_time_to(self):
self.setup_facts(10)
params = {
'to': timestamp_apiformat(self.builder.get_timestamp(3))
}
fact_versions = FactVersion.objects.filter(host=self.fact_host, timestamp__lte=params['to'])
self.get_list(fact_versions, params)
def test_list_time_from_to(self):
self.setup_facts(10)
params = {
'from': timestamp_apiformat(self.builder.get_timestamp(1)),
'to': timestamp_apiformat(self.builder.get_timestamp(3))
}
fact_versions = FactVersion.objects.filter(host=self.fact_host, timestamp__gt=params['from'], timestamp__lte=params['to'])
self.get_list(fact_versions, params)
class FactViewApiTest(FactApiBaseTest):
def check_equal(self, fact_obj, results):
fact_dict = {
'timestamp': timestamp_apiformat(fact_obj.timestamp),
'module': fact_obj.module,
'host': {
'hostname': fact_obj.host.hostname,
'inventory_id': fact_obj.host.inventory_id,
'id': str(fact_obj.host.id)
},
'fact': fact_obj.fact
}
self.assertEqual(fact_dict, results)
def test_permission_view(self):
url = reverse('api:host_fact_compare_view', args=(self.host.pk,))
with self.current_user('admin'):
self.get(url, expect=200)
with self.current_user('normal'):
self.get(url, expect=200)
with self.current_user('other'):
self.get(url, expect=403)
with self.current_user('nobody'):
self.get(url, expect=403)
with self.current_user(None):
self.get(url, expect=401)
def get_fact(self, fact_obj, params=None):
url = build_url('api:host_fact_compare_view', args=(self.host.pk,), get=params)
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
self.check_equal(fact_obj, response)
def test_view(self):
self.setup_facts(2)
self.get_fact(Fact.objects.filter(host=self.fact_host, module='ansible').order_by('-timestamp')[0])
def test_view_module_filter(self):
self.setup_facts(2)
self.get_fact(Fact.objects.filter(host=self.fact_host, module='services').order_by('-timestamp')[0], dict(module='services'))
def test_view_time_filter(self):
self.setup_facts(6)
ts = self.builder.get_timestamp(3)
self.get_fact(Fact.objects.filter(host=self.fact_host, module='ansible', timestamp__lte=ts).order_by('-timestamp')[0],
dict(datetime=ts))
@unittest.skip("single fact query needs to be updated to use inventory_id attribute on host document")
class SingleFactApiTest(FactApiBaseTest):
def setUp(self):
super(SingleFactApiTest, self).setUp()
self.group = self.inventory.groups.create(name='test-group')
self.group.hosts.add(self.host, self.host2, self.host3)
def test_permission_list(self):
url = reverse('api:host_fact_versions_list', args=(self.host.pk,))
with self.current_user('admin'):
self.get(url, expect=200)
with self.current_user('normal'):
self.get(url, expect=200)
with self.current_user('other'):
self.get(url, expect=403)
with self.current_user('nobody'):
self.get(url, expect=403)
with self.current_user(None):
self.get(url, expect=401)
def _test_related(self, url):
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
self.assertTrue(len(response['results']) > 0)
for entry in response['results']:
self.assertIn('single_fact', entry['related'])
# Requires fields
self.get(entry['related']['single_fact'], expect=400)
def test_related_host_list(self):
self.setup_facts(2)
self._test_related(reverse('api:host_list'))
def test_related_group_list(self):
self.setup_facts(2)
self._test_related(reverse('api:group_list'))
def test_related_inventory_list(self):
self.setup_facts(2)
self._test_related(reverse('api:inventory_list'))
def test_params(self):
self.setup_facts(2)
params = {
'module': 'packages',
'fact_key': 'name',
'fact_value': 'acpid',
}
url = build_url('api:inventory_single_fact_view', args=(self.inventory.pk,), get=params)
with self.current_user(self.super_django_user):
response = self.get(url, expect=200)
self.assertEqual(len(response['results']), 3)
for entry in response['results']:
self.assertEqual(entry['fact'][0]['name'], 'acpid')

24
awx/main/tests/old/ha.py Normal file
View File

@@ -0,0 +1,24 @@
# Copyright (c) 2015 Ansible, Inc.
# Python
import mock
# Django
from django.test import SimpleTestCase
# AWX
from awx.main.models import * # noqa
from awx.main.ha import * # noqa
__all__ = ['HAUnitTest',]
class HAUnitTest(SimpleTestCase):
@mock.patch('awx.main.models.Instance.objects.count', return_value=2)
def test_multiple_instances(self, ignore):
self.assertTrue(is_ha_environment())
@mock.patch('awx.main.models.Instance.objects.count', return_value=1)
def test_db_localhost(self, ignore):
self.assertFalse(is_ha_environment())

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,220 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from __future__ import absolute_import
# Django
import django
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.job_base import BaseJobTestMixin
import yaml
__all__ = ['JobTemplateLaunchTest', 'JobTemplateLaunchPasswordsTest']
class JobTemplateLaunchTest(BaseJobTestMixin, django.test.TestCase):
def setUp(self):
super(JobTemplateLaunchTest, self).setUp()
self.url = reverse('api:job_template_list')
self.data = dict(
name = 'launched job template',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
credential = self.cred_sue.pk,
playbook = self.proj_dev.playbooks[0],
)
self.data_no_cred = dict(
name = 'launched job template no credential',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
playbook = self.proj_dev.playbooks[0],
)
self.data_cred_ask = dict(self.data)
self.data_cred_ask['name'] = 'launched job templated with ask passwords'
self.data_cred_ask['credential'] = self.cred_sue_ask.pk
with self.current_user(self.user_sue):
response = self.post(self.url, self.data, expect=201)
self.launch_url = reverse('api:job_template_launch',
args=(response['id'],))
def test_launch_job_template(self):
with self.current_user(self.user_sue):
self.data['name'] = 'something different'
response = self.post(self.url, self.data, expect=201)
detail_url = reverse('api:job_template_detail',
args=(response['id'],))
self.assertEquals(response['url'], detail_url)
def test_no_cred_update_template(self):
# You can still post the job template without a credential, just can't launch it without one
with self.current_user(self.user_sue):
response = self.post(self.url, self.data_no_cred, expect=201)
detail_url = reverse('api:job_template_detail',
args=(response['id'],))
self.assertEquals(response['url'], detail_url)
def test_invalid_auth_unauthorized(self):
# Invalid auth can't trigger the launch endpoint
self.check_invalid_auth(self.launch_url, {}, methods=('post',))
def test_credential_implicit(self):
# Implicit, attached credentials
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {}, expect=202)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'new')
def test_launch_extra_vars_json(self):
# Sending extra_vars as a JSON string, implicit credentials
with self.current_user(self.user_sue):
data = dict(extra_vars = '{\"a\":3}')
response = self.post(self.launch_url, data, expect=202)
j = Job.objects.get(pk=response['job'])
ev_dict = yaml.load(j.extra_vars)
self.assertIn('a', ev_dict)
if 'a' in ev_dict:
self.assertEqual(ev_dict['a'], 3)
def test_launch_extra_vars_yaml(self):
# Sending extra_vars as a JSON string, implicit credentials
with self.current_user(self.user_sue):
data = dict(extra_vars = 'a: 3')
response = self.post(self.launch_url, data, expect=202)
j = Job.objects.get(pk=response['job'])
ev_dict = yaml.load(j.extra_vars)
self.assertIn('a', ev_dict)
if 'a' in ev_dict:
self.assertEqual(ev_dict['a'], 3)
def test_credential_explicit(self):
# Explicit, credential
with self.current_user(self.user_sue):
self.cred_sue.mark_inactive()
response = self.post(self.launch_url, {'credential': self.cred_doug.pk}, expect=202)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_credential_explicit_via_credential_id(self):
# Explicit, credential
with self.current_user(self.user_sue):
self.cred_sue.mark_inactive()
response = self.post(self.launch_url, {'credential_id': self.cred_doug.pk}, expect=202)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_credential_override(self):
# Explicit, credential
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'credential': self.cred_doug.pk}, expect=202)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_credential_override_via_credential_id(self):
# Explicit, credential
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'credential_id': self.cred_doug.pk}, expect=202)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
self.assertEqual(j.credential.pk, self.cred_doug.pk)
def test_bad_credential_launch_fail(self):
# Can't launch a job template without a credential defined (or if we
# pass an invalid/inactive credential value).
with self.current_user(self.user_sue):
self.cred_sue.mark_inactive()
self.post(self.launch_url, {}, expect=400)
self.post(self.launch_url, {'credential': 0}, expect=400)
self.post(self.launch_url, {'credential_id': 0}, expect=400)
self.post(self.launch_url, {'credential': 'one'}, expect=400)
self.post(self.launch_url, {'credential_id': 'one'}, expect=400)
self.cred_doug.mark_inactive()
self.post(self.launch_url, {'credential': self.cred_doug.pk}, expect=400)
self.post(self.launch_url, {'credential_id': self.cred_doug.pk}, expect=400)
def test_explicit_unowned_cred(self):
# Explicitly specify a credential that we don't have access to
with self.current_user(self.user_juan):
launch_url = reverse('api:job_template_launch',
args=(self.jt_eng_run.pk,))
self.post(launch_url, {'credential_id': self.cred_sue.pk}, expect=403)
def test_no_project_fail(self):
# Job Templates without projects can not be launched
with self.current_user(self.user_sue):
self.data['name'] = "missing proj"
response = self.post(self.url, self.data, expect=201)
jt = JobTemplate.objects.get(pk=response['id'])
jt.project = None
jt.save()
launch_url2 = reverse('api:job_template_launch',
args=(response['id'],))
self.post(launch_url2, {}, expect=400)
def test_no_inventory_fail(self):
# Job Templates without inventory can not be launched
with self.current_user(self.user_sue):
self.data['name'] = "missing inv"
response = self.post(self.url, self.data, expect=201)
jt = JobTemplate.objects.get(pk=response['id'])
jt.inventory = None
jt.save()
launch_url3 = reverse('api:job_template_launch',
args=(response['id'],))
self.post(launch_url3, {}, expect=400)
def test_deleted_credential_fail(self):
# Job Templates with deleted credentials cannot be launched.
self.cred_sue.mark_inactive()
with self.current_user(self.user_sue):
self.post(self.launch_url, {}, expect=400)
class JobTemplateLaunchPasswordsTest(BaseJobTestMixin, django.test.TestCase):
def setUp(self):
super(JobTemplateLaunchPasswordsTest, self).setUp()
self.url = reverse('api:job_template_list')
self.data = dict(
name = 'launched job template',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
credential = self.cred_sue_ask.pk,
playbook = self.proj_dev.playbooks[0],
)
with self.current_user(self.user_sue):
response = self.post(self.url, self.data, expect=201)
self.launch_url = reverse('api:job_template_launch',
args=(response['id'],))
# should return explicit credentials required passwords
def test_explicit_cred_with_ask_passwords_fail(self):
passwords_required = ['ssh_password', 'become_password', 'ssh_key_unlock']
# Job Templates with deleted credentials cannot be launched.
with self.current_user(self.user_sue):
self.cred_sue_ask.mark_inactive()
response = self.post(self.launch_url, {'credential_id': self.cred_sue_ask_many.pk}, expect=400)
for p in passwords_required:
self.assertIn(p, response['passwords_needed_to_start'])
self.assertEqual(len(passwords_required), len(response['passwords_needed_to_start']))
def test_explicit_cred_with_ask_password(self):
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'ssh_password': 'whatever'}, expect=202)
j = Job.objects.get(pk=response['job'])
self.assertEqual(j.status, 'new')
def test_explicit_cred_with_ask_password_empty_string_fail(self):
with self.current_user(self.user_sue):
response = self.post(self.launch_url, {'ssh_password': ''}, expect=400)
self.assertIn('ssh_password', response['passwords_needed_to_start'])

View File

@@ -0,0 +1,75 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from __future__ import absolute_import
import json
# Django
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from awx.main.tests.job_base import BaseJobTestMixin
__all__ = ['JobRelaunchTest',]
class JobRelaunchTest(BaseJobTestMixin, BaseLiveServerTest):
def test_job_relaunch(self):
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
url = reverse('api:job_relaunch', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.post(url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'successful')
self.assertEqual(j.launch_type, 'relaunch')
# Test with a job that prompts for SSH and sudo passwords.
job = self.make_job(self.jt_sup_run, self.user_sue, 'success')
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_password', 'become_password']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_password'] = 'sshpass'
response = self.post(url, data, expect=400)
data2 = dict(become_password='sudopass')
response = self.post(url, data2, expect=400)
data.update(data2)
response = self.post(url, data, expect=202)
job = Job.objects.get(pk=job.pk)
# Create jt with no extra_vars
# Launch j1 with runtime extra_vars
# Assign extra_vars to jt backing job
# Relaunch j1
# j2 should not contain jt extra_vars
def test_relaunch_job_does_not_inherit_jt_extra_vars(self):
jt_extra_vars = {
"hello": "world"
}
j_extra_vars = {
"goodbye": "cruel universe"
}
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success', extra_vars=j_extra_vars)
url = reverse('api:job_relaunch', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.post(url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'successful')
self.jt_ops_east_run.extra_vars = jt_extra_vars
self.jt_ops_east_run.save()
response = self.post(url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'successful')
resp_extra_vars = json.loads(response['extra_vars'])
self.assertNotIn("hello", resp_extra_vars)
self.assertEqual(resp_extra_vars, j_extra_vars)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,244 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from __future__ import absolute_import
# Django
from django.core.urlresolvers import reverse
from django.conf import settings
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from awx.main.tests.job_base import BaseJobTestMixin
__all__ = ['JobStartCancelTest',]
class JobStartCancelTest(BaseJobTestMixin, BaseLiveServerTest):
def test_job_start(self):
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
url = reverse('api:job_start', args=(job.pk,))
# Test with no auth and with invalid login.
self.check_invalid_auth(url)
self.check_invalid_auth(url, methods=('post',))
# Sue can start a job (when passwords are already saved) as long as the
# status is new. Reverse list so "new" will be last.
for status in reversed([x[0] for x in Job.STATUS_CHOICES]):
if status == 'waiting':
continue
job.status = status
job.save()
with self.current_user(self.user_sue):
response = self.get(url)
if status == 'new':
self.assertTrue(response['can_start'])
self.assertFalse(response['passwords_needed_to_start'])
# response = self.post(url, {}, expect=202)
# job = Job.objects.get(pk=job.pk)
# self.assertEqual(job.status, 'successful',
# job.result_stdout)
else:
self.assertFalse(response['can_start'])
response = self.post(url, {}, expect=405)
# Test with a job that prompts for SSH and sudo become passwords.
#job = self.job_sup_run
job = self.make_job(self.jt_sup_run, self.user_sue, 'new')
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_password', 'become_password']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_password'] = 'sshpass'
response = self.post(url, data, expect=400)
data2 = dict(become_password='sudopass')
response = self.post(url, data2, expect=400)
data.update(data2)
response = self.post(url, data, expect=202)
job = Job.objects.get(pk=job.pk)
# FIXME: Test run gets the following error in this case:
# fatal: [hostname] => sudo output closed while waiting for password prompt:
#self.assertEqual(job.status, 'successful')
# Test with a job that prompts for SSH unlock key, given the wrong key.
#job = self.jt_ops_west_run.create_job(
# credential=self.cred_greg,
# created_by=self.user_sue,
#)
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
job.credential = self.cred_greg
job.save()
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_key_unlock']))
data = dict()
response = self.post(url, data, expect=400)
# The job should start but fail.
data['ssh_key_unlock'] = 'sshunlock'
response = self.post(url, data, expect=202)
# job = Job.objects.get(pk=job.pk)
# self.assertEqual(job.status, 'failed')
# Test with a job that prompts for SSH unlock key, given the right key.
from awx.main.tests.data.ssh import TEST_SSH_KEY_DATA_UNLOCK
# job = self.jt_ops_west_run.create_job(
# credential=self.cred_greg,
# created_by=self.user_sue,
# )
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
job.credential = self.cred_greg
job.save()
url = reverse('api:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_key_unlock']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
response = self.post(url, data, expect=202)
# job = Job.objects.get(pk=job.pk)
# self.assertEqual(job.status, 'successful')
# FIXME: Test with other users, test when passwords are required.
def test_job_cancel(self):
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
url = reverse('api:job_cancel', args=(job.pk,))
# Test with no auth and with invalid login.
self.check_invalid_auth(url)
self.check_invalid_auth(url, methods=('post',))
# sue can cancel the job, but only when it is pending or running.
for status in [x[0] for x in Job.STATUS_CHOICES]:
if status == 'waiting':
continue
job.status = status
job.save()
with self.current_user(self.user_sue):
response = self.get(url)
if status in ('new', 'pending', 'waiting', 'running'):
self.assertTrue(response['can_cancel'])
response = self.post(url, {}, expect=202)
else:
self.assertFalse(response['can_cancel'])
response = self.post(url, {}, expect=405)
# FIXME: Test with other users.
def test_get_job_results(self):
# Start/run a job and then access its results via the API.
#job = self.job_ops_east_run
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
job.signal_start()
# Check that the job detail has been updated.
url = reverse('api:job_detail', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertEqual(response['status'], 'successful',
response['result_traceback'])
self.assertTrue(response['result_stdout'])
# Test job events for completed job.
url = reverse('api:job_job_events_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test individual job event detail records.
host_ids = set()
for job_event in job.job_events.all():
if job_event.host:
host_ids.add(job_event.host.pk)
url = reverse('api:job_event_detail', args=(job_event.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# Also test job event list for each host.
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('api:host_job_events_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job event list for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('api:group_job_events_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_events.all()
self.assertTrue(qs.count(), group)
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test global job event list.
url = reverse('api:job_event_list')
with self.current_user(self.user_sue):
response = self.get(url)
qs = JobEvent.objects.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job host summaries for completed job.
url = reverse('api:job_job_host_summaries_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Every host referenced by a job_event should be present as a job
# host summary record.
self.assertEqual(host_ids,
set(qs.values_list('host__pk', flat=True)))
# Test individual job host summary records.
for job_host_summary in job.job_host_summaries.all():
url = reverse('api:job_host_summary_detail',
args=(job_host_summary.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# Test job host summaries for each host.
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('api:host_job_host_summaries_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job host summaries for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('api:group_job_host_summaries_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)

View File

@@ -0,0 +1,237 @@
# Python
import json
# Django
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, QueueStartStopTestMixin
__all__ = ['SurveyPasswordRedactedTest']
PASSWORD="5m/h"
ENCRYPTED_STR='$encrypted$'
TEST_PLAYBOOK = u'''---
- name: test success
hosts: test-group
gather_facts: True
tasks:
- name: should pass
command: echo {{ %s }}
''' % ('spot_speed')
TEST_SIMPLE_SURVEY = '''
{
"name": "Simple",
"description": "Description",
"spec": [
{
"type": "password",
"question_name": "spots speed",
"question_description": "How fast can spot run?",
"variable": "%s",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "%s"
}
]
}
''' % ('spot_speed', PASSWORD)
TEST_COMPLEX_SURVEY = '''
{
"name": "Simple",
"description": "Description",
"spec": [
{
"type": "password",
"question_name": "spots speed",
"question_description": "How fast can spot run?",
"variable": "spot_speed",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "0m/h"
},
{
"type": "password",
"question_name": "ssn",
"question_description": "What's your social security number?",
"variable": "ssn",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "999-99-9999"
},
{
"type": "password",
"question_name": "bday",
"question_description": "What's your birth day?",
"variable": "bday",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "1/1/1970"
}
]
}
'''
TEST_SINGLE_PASSWORDS = [
{
'description': 'Single instance with a . after',
'text' : 'See spot. See spot run. See spot run %s. That is a fast run.' % PASSWORD,
'passwords': [PASSWORD],
'occurances': 1,
},
{
'description': 'Single instance with , after',
'text': 'Spot goes %s, at a fast pace' % PASSWORD,
'passwords': [PASSWORD],
'occurances': 1,
},
{
'description': 'Single instance with a space after',
'text': 'Is %s very fast?' % PASSWORD,
'passwords': [PASSWORD],
'occurances': 1,
},
{
'description': 'Many instances, also with newline',
'text': 'I think %s is very very fast. If I ran %s for 4 hours how many hours would I run?.\nTrick question. %s for 4 hours would result in running for 4 hours' % (PASSWORD, PASSWORD, PASSWORD),
'passwords': [PASSWORD],
'occurances': 3,
},
]
passwd = 'my!@#$%^pass&*()_+'
TEST_SINGLE_PASSWORDS.append({
'description': 'password includes characters not in a-z 0-9 range',
'passwords': [passwd],
'text': 'Text is fun yeah with passwords %s.' % passwd,
'occurances': 1
})
# 3 because 3 password fields in spec TEST_COMPLEX_SURVEY
TEST_MULTIPLE_PASSWORDS = []
passwds = [ '65km/s', '545-83-4534', '7/4/2002']
TEST_MULTIPLE_PASSWORDS.append({
'description': '3 different passwords each used once',
'text': 'Spot runs %s. John has an ss of %s and is born on %s.' % (passwds[0], passwds[1], passwds[2]),
'passwords': passwds,
'occurances': 3,
})
TESTS = {
'simple': {
'survey' : json.loads(TEST_SIMPLE_SURVEY),
'tests' : TEST_SINGLE_PASSWORDS,
},
'complex': {
'survey' : json.loads(TEST_COMPLEX_SURVEY),
'tests' : TEST_MULTIPLE_PASSWORDS,
}
}
class SurveyPasswordBaseTest(BaseTest, QueueStartStopTestMixin):
def setUp(self):
super(SurveyPasswordBaseTest, self).setUp()
self.setup_instances()
self.setup_users()
def check_passwords_redacted(self, test, response):
self.assertIsNotNone(response['content'])
for password in test['passwords']:
self.check_not_found(response['content'], password, test['description'], word_boundary=True)
self.check_found(response['content'], ENCRYPTED_STR, test['occurances'], test['description'])
# TODO: A more complete test would ensure that the variable value isn't found
def check_extra_vars_redacted(self, test, response):
self.assertIsNotNone(response)
# Ensure that all extra_vars of type password have the value '$encrypted$'
vars = []
for question in test['survey']['spec']:
if question['type'] == 'password':
vars.append(question['variable'])
extra_vars = json.loads(response['extra_vars'])
for var in vars:
self.assertIn(var, extra_vars, 'Variable "%s" should exist in "%s"' % (var, extra_vars))
self.assertEqual(extra_vars[var], ENCRYPTED_STR)
def _get_url_job_stdout(self, job):
url = reverse('api:job_stdout', args=(job.pk,))
return self.get(url, expect=200, auth=self.get_super_credentials(), accept='application/json')
def _get_url_job_details(self, job):
url = reverse('api:job_detail', args=(job.pk,))
return self.get(url, expect=200, auth=self.get_super_credentials(), accept='application/json')
class SurveyPasswordRedactedTest(SurveyPasswordBaseTest):
'''
Transpose TEST[]['tests'] to the below format. A more flat format."
[
{
'text': '...',
'description': '...',
...,
'job': '...',
'survey': '...'
},
]
'''
def setup_test(self, test_name):
blueprint = TESTS[test_name]
self.tests[test_name] = []
job_template = self.make_job_template(survey_enabled=True, survey_spec=blueprint['survey'])
for test in blueprint['tests']:
test = dict(test)
extra_vars = {}
# build extra_vars from spec variables and passwords
for x in range(0, len(blueprint['survey']['spec'])):
question = blueprint['survey']['spec'][x]
extra_vars[question['variable']] = test['passwords'][x]
job = self.make_job(job_template=job_template)
job.extra_vars = json.dumps(extra_vars)
job.result_stdout_text = test['text']
job.save()
test['job'] = job
test['survey'] = blueprint['survey']
self.tests[test_name].append(test)
def setUp(self):
super(SurveyPasswordRedactedTest, self).setUp()
self.tests = {}
self.setup_test('simple')
self.setup_test('complex')
# should redact single variable survey
def test_redact_stdout_simple_survey(self):
for test in self.tests['simple']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)
# should redact multiple variables survey
def test_redact_stdout_complex_survey(self):
for test in self.tests['complex']:
response = self._get_url_job_stdout(test['job'])
self.check_passwords_redacted(test, response)
# should redact values in extra_vars
def test_redact_job_extra_vars(self):
for test in self.tests['simple']:
response = self._get_url_job_details(test['job'])
self.check_extra_vars_redacted(test, response)

View File

@@ -0,0 +1,147 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import json
import os
import tempfile
from awx.main.models import Host, Inventory, Organization
from awx.main.tests.base import BaseTest
import awx.main.task_engine
from awx.main.task_engine import * # noqa
class LicenseTests(BaseTest):
def setUp(self):
self.start_redis()
self.setup_instances()
super(LicenseTests, self).setUp()
self.setup_users()
u = self.super_django_user
org = Organization.objects.create(name='o1', created_by=u)
org.admins.add(self.normal_django_user)
self.inventory = Inventory.objects.create(name='hi', organization=org, created_by=u)
Host.objects.create(name='a1', inventory=self.inventory, created_by=u)
Host.objects.create(name='a2', inventory=self.inventory, created_by=u)
Host.objects.create(name='a3', inventory=self.inventory, created_by=u)
Host.objects.create(name='a4', inventory=self.inventory, created_by=u)
Host.objects.create(name='a5', inventory=self.inventory, created_by=u)
Host.objects.create(name='a6', inventory=self.inventory, created_by=u)
Host.objects.create(name='a7', inventory=self.inventory, created_by=u)
Host.objects.create(name='a8', inventory=self.inventory, created_by=u)
Host.objects.create(name='a9', inventory=self.inventory, created_by=u)
Host.objects.create(name='a10', inventory=self.inventory, created_by=u)
Host.objects.create(name='a11', inventory=self.inventory, created_by=u)
Host.objects.create(name='a12', inventory=self.inventory, created_by=u)
self._temp_task_file = awx.main.task_engine.TEMPORARY_TASK_FILE
self._temp_task_fetch_ami = awx.main.task_engine.TemporaryTaskEngine.fetch_ami
self._temp_task_fetch_instance = awx.main.task_engine.TemporaryTaskEngine.fetch_instance
def tearDown(self):
awx.main.task_engine.TEMPORARY_TASK_FILE = self._temp_task_file
awx.main.task_engine.TemporaryTaskEngine.fetch_ami = self._temp_task_fetch_ami
awx.main.task_engine.TemporaryTaskEngine.fetch_instance = self._temp_task_fetch_instance
super(LicenseTests, self).tearDown()
self.stop_redis()
def test_license_writer(self):
writer = TaskEngager(
company_name='acmecorp',
contact_name='Michael DeHaan',
contact_email='michael@ansibleworks.com',
license_date=25000, # seconds since epoch
instance_count=500)
data = writer.get_data()
assert data['instance_count'] == 500
assert data['contact_name'] == 'Michael DeHaan'
assert data['contact_email'] == 'michael@ansibleworks.com'
assert data['license_date'] == 25000
assert data['license_key'] == "11bae31f31c6a6cdcb483a278cdbe98bd8ac5761acd7163a50090b0f098b3a13"
strdata = writer.get_string()
strdata_loaded = json.loads(strdata)
assert strdata_loaded == data
reader = TaskSerializer()
vdata = reader.from_string(strdata)
assert vdata['available_instances'] == 500
assert vdata['current_instances'] == 12
assert vdata['free_instances'] == 488
assert vdata['date_warning'] is True
assert vdata['date_expired'] is True
assert vdata['license_date'] == 25000
assert vdata['time_remaining'] < 0
assert vdata['valid_key'] is True
assert vdata['compliant'] is False
assert vdata['subscription_name']
def test_expired_licenses(self):
reader = TaskSerializer()
writer = TaskEngager(
company_name='Tower',
contact_name='Tower Admin',
contact_email='tower@ansible.com',
license_date=int(time.time() - 3600),
instance_count=100,
trial=True)
strdata = writer.get_string()
vdata = reader.from_string(strdata)
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] < 0
writer = TaskEngager(
company_name='Tower',
contact_name='Tower Admin',
contact_email='tower@ansible.com',
license_date=int(time.time() - 2592001),
instance_count=100,
trial=False)
strdata = writer.get_string()
vdata = reader.from_string(strdata)
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] < 0
writer = TaskEngager(
company_name='Tower',
contact_name='Tower Admin',
contact_email='tower@ansible.com',
license_date=int(time.time() - 3600),
instance_count=100,
trial=False)
strdata = writer.get_string()
vdata = reader.from_string(strdata)
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] > 0
def test_aws_license(self):
os.environ['AWX_LICENSE_FILE'] = 'non-existent-license-file.json'
h, path = tempfile.mkstemp()
self._temp_paths.append(path)
with os.fdopen(h, 'w') as f:
json.dump({'instance_count': 100}, f)
awx.main.task_engine.TEMPORARY_TASK_FILE = path
def fetch_ami(_self):
_self.attributes['ami-id'] = 'ami-00000000'
return True
def fetch_instance(_self):
_self.attributes['instance-id'] = 'i-00000000'
return True
awx.main.task_engine.TemporaryTaskEngine.fetch_ami = fetch_ami
awx.main.task_engine.TemporaryTaskEngine.fetch_instance = fetch_instance
reader = TaskSerializer()
license = reader.from_file()
self.assertTrue(license['is_aws'])
self.assertTrue(license['time_remaining'])
self.assertTrue(license['free_instances'] > 0)
self.assertTrue(license['grace_period_remaining'] > 0)

View File

@@ -0,0 +1,466 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
from datetime import timedelta
# Django
from django.core.urlresolvers import reverse
from django.test.utils import override_settings
from django.contrib.auth.models import User
from django.utils.timezone import now as tz_now
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
__all__ = ['AuthTokenLimitUnitTest', 'OrganizationsTest']
class AuthTokenLimitUnitTest(BaseTest):
def setUp(self):
self.now = tz_now()
# Times are relative to now
# (key, created on in seconds , expiration in seconds)
self.test_data = [
# a is implicitly expired
("a", -1000, -10),
# b's are invalid due to session limit of 3
("b", -100, 60),
("bb", -100, 60),
("c", -90, 70),
("d", -80, 80),
("e", -70, 90),
]
self.user = User.objects.create_superuser('admin', 'foo@bar.com', 'password')
for key, t_create, t_expire in self.test_data:
AuthToken.objects.create(
user=self.user,
key=key,
request_hash='this_is_a_hash',
created=self.now + timedelta(seconds=t_create),
expires=self.now + timedelta(seconds=t_expire),
)
super(AuthTokenLimitUnitTest, self).setUp()
@override_settings(AUTH_TOKEN_PER_USER=3)
def test_get_tokens_over_limit(self):
invalid_tokens = AuthToken.get_tokens_over_limit(self.user, now=self.now)
invalid_keys = [x.key for x in invalid_tokens]
self.assertEqual(len(invalid_keys), 2)
self.assertIn('b', invalid_keys)
self.assertIn('bb', invalid_keys)
class OrganizationsTest(BaseTest):
def collection(self):
return reverse('api:organization_list')
def setUp(self):
super(OrganizationsTest, self).setUp()
self.setup_instances()
# TODO: Test non-enterprise license
self.create_test_license_file()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 10)
self.projects = self.make_projects(self.normal_django_user, 10)
# add projects to organizations in a more or less arbitrary way
for project in self.projects[0:2]:
self.organizations[0].projects.add(project)
for project in self.projects[3:8]:
self.organizations[1].projects.add(project)
for project in self.projects[9:10]:
self.organizations[2].projects.add(project)
self.organizations[0].projects.add(self.projects[-1])
self.organizations[9].projects.add(self.projects[-2])
# get the URL for various organization records
self.a_detail_url = "%s%s" % (self.collection(), self.organizations[0].pk)
self.b_detail_url = "%s%s" % (self.collection(), self.organizations[1].pk)
self.c_detail_url = "%s%s" % (self.collection(), self.organizations[2].pk)
# configuration:
# admin_user is an admin and regular user in all organizations
# other_user is all organizations
# normal_user is a user in organization 0, and an admin of organization 1
# nobody_user is a user not a member of any organizations
for x in self.organizations:
x.admins.add(self.super_django_user)
x.users.add(self.super_django_user)
x.users.add(self.other_django_user)
self.organizations[0].users.add(self.normal_django_user)
self.organizations[1].admins.add(self.normal_django_user)
def test_get_organization_list(self):
url = reverse('api:organization_list')
# no credentials == 401
self.options(url, expect=401)
self.head(url, expect=401)
self.get(url, expect=401)
# wrong credentials == 401
with self.current_user(self.get_invalid_credentials()):
self.options(url, expect=401)
self.head(url, expect=401)
self.get(url, expect=401)
# superuser credentials == 200, full list
with self.current_user(self.super_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 10, previous=None, next=None)
self.assertEqual(len(response['results']),
Organization.objects.count())
for field in ['id', 'url', 'name', 'description', 'created']:
self.assertTrue(field in response['results'][0],
'field %s not in result' % field)
# check that the related URL functionality works
related = response['results'][0]['related']
for x in ['projects', 'users', 'admins']:
self.assertTrue(x in related and related[x].endswith("/%s/" % x), "looking for %s in related" % x)
# normal credentials == 200, get only organizations of which user is a member
with self.current_user(self.normal_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 2, previous=None, next=None)
# no admin rights? get empty list
with self.current_user(self.other_django_user):
response = self.get(url, expect=200)
self.check_pagination_and_size(response, self.other_django_user.organizations.count(), previous=None, next=None)
# not a member of any orgs? get empty list
with self.current_user(self.nobody_django_user):
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 0, previous=None, next=None)
def test_get_item(self):
# first get all the URLs
data = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
urls = [item['url'] for item in data['results']]
# make sure super user can fetch records
data = self.get(urls[0], expect=200, auth=self.get_super_credentials())
[self.assertTrue(key in data) for key in ['name', 'description', 'url']]
# make sure invalid user cannot
data = self.get(urls[0], expect=401, auth=self.get_invalid_credentials())
# normal user should be able to get org 0 and org 1 but not org 9 (as he's not a user or admin of it)
data = self.get(urls[0], expect=200, auth=self.get_normal_credentials())
data = self.get(urls[1], expect=200, auth=self.get_normal_credentials())
data = self.get(urls[9], expect=403, auth=self.get_normal_credentials())
# other user is a member, but not admin, can access org
data = self.get(urls[0], expect=200, auth=self.get_other_credentials())
# nobody user is not a member, cannot access org
data = self.get(urls[0], expect=403, auth=self.get_nobody_credentials())
def test_get_item_subobjects_projects(self):
# first get all the orgs
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
# find projects attached to the first org
projects0_url = orgs['results'][0]['related']['projects']
projects1_url = orgs['results'][1]['related']['projects']
projects9_url = orgs['results'][9]['related']['projects']
self.get(projects0_url, expect=401, auth=None)
self.get(projects0_url, expect=401, auth=self.get_invalid_credentials())
# normal user is just a member of the first org, so can see all projects under the org
self.get(projects0_url, expect=200, auth=self.get_normal_credentials())
# however in the second org, he's an admin and should see all of them
projects1a = self.get(projects1_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(projects1a['count'], 5)
# but the non-admin cannot access the list of projects in the org. He should use /projects/ instead!
self.get(projects1_url, expect=200, auth=self.get_other_credentials())
# superuser should be able to read anything
projects9a = self.get(projects9_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects9a['count'], 1)
# nobody user is not a member of any org, so can't see projects...
self.get(projects0_url, expect=403, auth=self.get_nobody_credentials())
projects1a = self.get(projects1_url, expect=403, auth=self.get_nobody_credentials())
def test_get_item_subobjects_users(self):
# see if we can list the users added to the organization
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
org1_users_url = orgs['results'][1]['related']['users']
org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(org1_users['count'], 2)
org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(org1_users['count'], 2)
org1_users = self.get(org1_users_url, expect=200, auth=self.get_other_credentials())
self.assertEquals(org1_users['count'], 2)
def test_get_item_subobjects_admins(self):
# see if we can list the users added to the organization
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
org1_users_url = orgs['results'][1]['related']['admins']
org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(org1_users['count'], 2)
org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(org1_users['count'], 2)
def test_get_organization_inventories_list(self):
pass
def _test_get_item_subobjects_tags(self):
# FIXME: Update to support taggit!
# put some tags on the org
org1 = Organization.objects.get(pk=2)
tag1 = Tag.objects.create(name='atag')
tag2 = Tag.objects.create(name='btag')
org1.tags.add(tag1)
org1.tags.add(tag2)
# see if we can list the users added to the organization
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
org1_tags_url = orgs['results'][1]['related']['tags']
org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(org1_tags['count'], 2)
org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(org1_tags['count'], 2)
org1_tags = self.get(org1_tags_url, expect=403, auth=self.get_other_credentials())
def _test_get_item_subobjects_audit_trail(self):
# FIXME: Update to support whatever audit trail framework is used.
url = '/api/v1/organizations/2/audit_trail/'
self.get(url, expect=200, auth=self.get_normal_credentials())
# FIXME: verify that some audit trail records are auto-created on save AND post
def test_post_item(self):
new_org = dict(name='magic test org', description='8675309')
# need to be a valid user
self.post(self.collection(), new_org, expect=401, auth=None)
self.post(self.collection(), new_org, expect=401, auth=self.get_invalid_credentials())
# only super users can create organizations
self.post(self.collection(), new_org, expect=403, auth=self.get_normal_credentials())
self.post(self.collection(), new_org, expect=403, auth=self.get_other_credentials())
data1 = self.post(self.collection(), new_org, expect=201, auth=self.get_super_credentials())
# duplicate post results in 400
self.post(self.collection(), new_org, expect=400, auth=self.get_super_credentials())
# look at what we got back from the post, make sure we added an org
last_org = Organization.objects.order_by('-pk')[0]
self.assertTrue(data1['url'].endswith("/%d/" % last_org.pk))
# Test that not even super users can create an organization with a basic license
self.create_basic_license_file()
cant_org = dict(name='silly user org', description='4815162342')
self.post(self.collection(), cant_org, expect=402, auth=self.get_super_credentials())
def test_post_item_subobjects_projects(self):
# first get all the orgs
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
# find projects attached to the first org
projects0_url = orgs['results'][0]['related']['projects']
projects1_url = orgs['results'][1]['related']['projects']
projects2_url = orgs['results'][2]['related']['projects']
# get all the projects on the first org
projects0 = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
a_project = projects0['results'][-1]
# attempt to add the project to the 7th org and see what happens
self.post(projects1_url, a_project, expect=204, auth=self.get_super_credentials())
projects1 = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 3)
# make sure adding a project that does not exist, or a missing pk field, results in a 400
self.post(projects1_url, dict(id=99999), expect=400, auth=self.get_super_credentials())
self.post(projects1_url, dict(asdf=1234), expect=400, auth=self.get_super_credentials())
# test that by posting a pk + disassociate: True we can remove a relationship
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 6)
a_project['disassociate'] = True
self.post(projects1_url, a_project, expect=204, auth=self.get_super_credentials())
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 5)
a_project = projects1['results'][-1]
a_project['disassociate'] = 1
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.post(projects1_url, a_project, expect=204, auth=self.get_normal_credentials())
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 4)
new_project_a = self.make_projects(self.normal_django_user, 1)[0]
new_project_b = self.make_projects(self.other_django_user, 1)[0]
# admin of org can add projects that he can read
self.post(projects1_url, dict(id=new_project_a.pk), expect=204, auth=self.get_normal_credentials())
# but not those he cannot
self.post(projects1_url, dict(id=new_project_b.pk), expect=403, auth=self.get_normal_credentials())
# and can't post a project he can read to an org he cannot
# self.post(projects2_url, dict(id=new_project_a.pk), expect=403, auth=self.get_normal_credentials())
# and can't do post a project he can read to an organization he cannot
self.post(projects2_url, dict(id=new_project_a.pk), expect=403, auth=self.get_normal_credentials())
def test_post_item_subobjects_users(self):
url = reverse('api:organization_users_list', args=(self.organizations[1].pk,))
users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(users['count'], 2)
self.post(url, dict(id=self.normal_django_user.pk), expect=204, auth=self.get_normal_credentials())
users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(users['count'], 3)
self.post(url, dict(id=self.normal_django_user.pk, disassociate=True), expect=204, auth=self.get_normal_credentials())
users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(users['count'], 2)
# post a completely new user to verify we can add users to the subcollection directly
new_user = dict(username='NewUser9000', password='NewPassword9000')
which_org = self.normal_django_user.admin_of_organizations.all()[0]
url = reverse('api:organization_users_list', args=(which_org.pk,))
self.post(url, new_user, expect=201, auth=self.get_normal_credentials())
all_users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(all_users['count'], 3)
def test_post_item_subobjects_admins(self):
url = reverse('api:organization_admins_list', args=(self.organizations[1].pk,))
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(admins['count'], 2)
self.post(url, dict(id=self.other_django_user.pk), expect=204, auth=self.get_normal_credentials())
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(admins['count'], 3)
self.post(url, dict(id=self.other_django_user.pk, disassociate=1), expect=204, auth=self.get_normal_credentials())
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(admins['count'], 2)
def _test_post_item_subobjects_tags(self):
# FIXME: Update to support taggit!
tag = Tag.objects.create(name='blippy')
url = '/api/v1/organizations/2/tags/'
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(tags['count'], 0)
self.post(url, dict(id=tag.pk), expect=204, auth=self.get_normal_credentials())
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(tags['count'], 1)
self.assertEqual(tags['results'][0]['id'], tag.pk)
self.post(url, dict(id=tag.pk, disassociate=1), expect=204, auth=self.get_normal_credentials())
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(tags['count'], 0)
def _test_post_item_subobjects_audit_trail(self):
# FIXME: Update to support whatever audit trail framework is used.
# audit trails are system things, and no user can post to them.
url = '/api/v1/organizations/2/audit_trail/'
self.post(url, dict(id=1), expect=405, auth=self.get_super_credentials())
def test_put_item(self):
# first get some urls and data to put back to them
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
self.get(urls[0], expect=200, auth=self.get_super_credentials())
data1 = self.get(urls[1], expect=200, auth=self.get_super_credentials())
# test that an unauthenticated user cannot do a put
new_data1 = data1.copy()
new_data1['description'] = 'updated description'
self.put(urls[0], new_data1, expect=401, auth=None)
self.put(urls[0], new_data1, expect=401, auth=self.get_invalid_credentials())
# user normal is an admin of org 0 and a member of org 1 so should be able to put only org 1
self.put(urls[0], new_data1, expect=403, auth=self.get_normal_credentials())
self.put(urls[1], new_data1, expect=200, auth=self.get_normal_credentials())
# get back org 1 and see if it changed
get_result = self.get(urls[1], expect=200, auth=self.get_normal_credentials())
self.assertEquals(get_result['description'], 'updated description')
# super user can also put even though they aren't added to the org users or admins list
self.put(urls[1], new_data1, expect=200, auth=self.get_super_credentials())
# make sure posting to this URL is not supported
self.post(urls[1], new_data1, expect=405, auth=self.get_super_credentials())
def test_put_item_subobjects_projects(self):
# any attempt to put a subobject should be a 405, edit the actual resource or POST with 'disassociate' to delete
# this is against a collection URL anyway, so we really need not repeat this test for other object types
# as a PUT against a collection doesn't make much sense.
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
projects0_url = orgs['results'][0]['related']['projects']
sub_projects = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(sub_projects['count'], 3)
first_sub_project = sub_projects['results'][0]
self.put(projects0_url, first_sub_project, expect=405, auth=self.get_super_credentials())
def test_delete_item(self):
# first get some urls
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
urldata1 = self.get(urls[1], auth=self.get_super_credentials())
# check authentication -- admins of the org and superusers can delete objects only
self.delete(urls[0], expect=401, auth=None)
self.delete(urls[0], expect=401, auth=self.get_invalid_credentials())
self.delete(urls[8], expect=403, auth=self.get_normal_credentials())
self.delete(urls[1], expect=204, auth=self.get_normal_credentials())
self.delete(urls[0], expect=204, auth=self.get_super_credentials())
# check that when we have deleted an object it comes back 404 via GET
# but that it's still in the database as inactive
self.get(urls[1], expect=404, auth=self.get_normal_credentials())
org1 = Organization.objects.get(pk=urldata1['id'])
self.assertEquals(org1.active, False)
# also check that DELETE on the collection doesn't work
self.delete(self.collection(), expect=405, auth=self.get_super_credentials())
# Test that not even super users can delete an organization with a basic license
self.create_basic_license_file()
self.delete(urls[2], expect=402, auth=self.get_super_credentials())
def test_invalid_post_data(self):
url = reverse('api:organization_list')
# API should gracefully handle data of an invalid type.
self.post(url, expect=400, data=None, auth=self.get_super_credentials())
self.post(url, expect=400, data=99, auth=self.get_super_credentials())
self.post(url, expect=400, data='abcd', auth=self.get_super_credentials())
self.post(url, expect=400, data=3.14, auth=self.get_super_credentials())
self.post(url, expect=400, data=True, auth=self.get_super_credentials())
self.post(url, expect=400, data=[1,2,3], auth=self.get_super_credentials())
url = reverse('api:organization_users_list', args=(self.organizations[0].pk,))
self.post(url, expect=400, data=None, auth=self.get_super_credentials())
self.post(url, expect=400, data=99, auth=self.get_super_credentials())
self.post(url, expect=400, data='abcd', auth=self.get_super_credentials())
self.post(url, expect=400, data=3.14, auth=self.get_super_credentials())
self.post(url, expect=400, data=True, auth=self.get_super_credentials())
self.post(url, expect=400, data=[1,2,3], auth=self.get_super_credentials())
# TODO: tests for tag disassociation

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,138 @@
import textwrap
# AWX
from awx.main.redact import UriCleaner
from awx.main.tests.base import BaseTest, URI
__all__ = ['UriCleanTests']
TEST_URIS = [
URI('no host', scheme='https', username='myusername', password='mypass', host=None),
URI('no host', scheme='https', username='myusername', password='mypass*********', host=None),
URI('http', scheme='http'),
URI('https', scheme='https'),
URI('no password', password=''),
URI('no host', host=''),
URI('host with port', host='host.com:22'),
URI('host with @', host='host.com:22'),
URI('host with @, password with @ at the end', password='mypasswordwith@', host='@host.com'),
URI('no host, with space', host=' '),
URI('password is a space', password='%20%20'),
URI('no password field', password=None),
URI('no username no password', username=None, password=None),
URI('password with @ at the end', password='mypasswordwitha@'),
URI('password with @@ at the end', password='mypasswordwitha@@'),
URI('password with @@@ at the end', password='mypasswordwitha@@@'),
URI('password with @@@@ at the end', password='mypasswordwitha@@@@'),
URI('password with @a@@ at the end', password='mypasswordwitha@a@@'),
URI('password with @@@a at the end', password='mypasswordwitha@@@a'),
URI('password with a @ in the middle', password='pa@ssword'),
URI('url with @', password='pa@ssword', host='googly.com/whatever@#$:stuff@.com/'),
]
TEST_CLEARTEXT = []
# Arguably, this is a regression test given the below data.
# regression data https://trello.com/c/cdUELgVY/
uri = URI(scheme="https", username="myusername", password="mypasswordwith%40", host="nonexistant.ansible.com/ansible.git/")
TEST_CLEARTEXT.append({
'uri' : uri,
'text' : textwrap.dedent("""\
PLAY [all] ********************************************************************
TASK: [delete project directory before update] ********************************
skipping: [localhost]
TASK: [update project using git and accept hostkey] ***************************
skipping: [localhost]
TASK: [update project using git] **********************************************
failed: [localhost] => {"cmd": "/usr/bin/git ls-remote https://%s:%s -h refs/heads/HEAD", "failed": true, "rc": 128}
stderr: fatal: unable to access '%s': Could not resolve host: nonexistant.ansible.com
msg: fatal: unable to access '%s': Could not resolve host: nonexistant.ansible.com
FATAL: all hosts have already failed -- aborting
PLAY RECAP ********************************************************************
to retry, use: --limit @/root/project_update.retry
localhost : ok=0 changed=0 unreachable=0 failed=1
""" % (uri.username, uri.password, str(uri), str(uri))),
'host_occurrences' : 2
})
uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs")
TEST_CLEARTEXT.append({
'uri' : uri,
'text' : textwrap.dedent("""\
TASK: [update project using git] **
failed: [localhost] => {"cmd": "/usr/bin/git ls-remote https://REDACTED:********", "failed": true, "rc": 128}
stderr: error: Couldn't resolve host '@%s' while accessing %s
fatal: HTTP request failed
msg: error: Couldn't resolve host '@%s' while accessing %s
fatal: HTTP request failed
""" % (uri.host, str(uri), uri.host, str(uri))),
'host_occurrences' : 4
})
class UriCleanTests(BaseTest):
# should redact sensitive usernames and passwords
def test_uri_scm_simple_redacted(self):
for uri in TEST_URIS:
redacted_str = UriCleaner.remove_sensitive(str(uri))
if uri.username:
self.check_not_found(redacted_str, uri.username, uri.description)
if uri.password:
self.check_not_found(redacted_str, uri.password, uri.description)
# should replace secret data with safe string, UriCleaner.REPLACE_STR
def test_uri_scm_simple_replaced(self):
for uri in TEST_URIS:
redacted_str = UriCleaner.remove_sensitive(str(uri))
self.check_found(redacted_str, UriCleaner.REPLACE_STR, uri.get_secret_count())
# should redact multiple uris in text
def test_uri_scm_multiple(self):
cleartext = ''
for uri in TEST_URIS:
cleartext += str(uri) + ' '
for uri in TEST_URIS:
cleartext += str(uri) + '\n'
redacted_str = UriCleaner.remove_sensitive(str(uri))
if uri.username:
self.check_not_found(redacted_str, uri.username, uri.description)
if uri.password:
self.check_not_found(redacted_str, uri.password, uri.description)
# should replace multiple secret data with safe string
def test_uri_scm_multiple_replaced(self):
cleartext = ''
find_count = 0
for uri in TEST_URIS:
cleartext += str(uri) + ' '
find_count += uri.get_secret_count()
for uri in TEST_URIS:
cleartext += str(uri) + '\n'
find_count += uri.get_secret_count()
redacted_str = UriCleaner.remove_sensitive(cleartext)
self.check_found(redacted_str, UriCleaner.REPLACE_STR, find_count)
# should redact and replace multiple secret data within a complex cleartext blob
def test_uri_scm_cleartext_redact_and_replace(self):
for test_data in TEST_CLEARTEXT:
uri = test_data['uri']
redacted_str = UriCleaner.remove_sensitive(test_data['text'])
self.check_not_found(redacted_str, uri.username, uri.description)
self.check_not_found(redacted_str, uri.password, uri.description)
# Ensure the host didn't get redacted
self.check_found(redacted_str, uri.host, test_data['host_occurrences'], uri.description)

View File

@@ -0,0 +1,216 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import datetime
# Django
from django.core.urlresolvers import reverse
from django.utils.timezone import now
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
__all__ = ['ScheduleTest']
YESTERDAY = (datetime.date.today() - datetime.timedelta(1)).strftime('%Y%m%dT075000Z')
UNTIL_SCHEDULE = "DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=1;UNTIL=30230401T075000Z" % YESTERDAY
EXPIRED_SCHEDULES = ["DTSTART:19340331T055000Z RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5"]
INFINITE_SCHEDULES = ["DTSTART:30340331T055000Z RRULE:FREQ=MINUTELY;INTERVAL=10"]
GOOD_SCHEDULES = ["DTSTART:20500331T055000Z RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20240331T075000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1",
"DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=1;UNTIL=20230401T075000Z" % YESTERDAY,
"DTSTART:20140331T075000Z RRULE:FREQ=WEEKLY;INTERVAL=1;BYDAY=MO,WE,FR",
"DTSTART:20140331T075000Z RRULE:FREQ=WEEKLY;INTERVAL=5;BYDAY=MO",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYMONTHDAY=6",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYSETPOS=4;BYDAY=SU",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYSETPOS=-1;BYDAY=MO,TU,WE,TH,FR",
"DTSTART:20140331T075000Z RRULE:FREQ=MONTHLY;INTERVAL=1;BYSETPOS=-1;BYDAY=MO,TU,WE,TH,FR,SA,SU",
"DTSTART:20140331T075000Z RRULE:FREQ=YEARLY;INTERVAL=1;BYMONTH=4;BYMONTHDAY=1",
"DTSTART:20140331T075000Z RRULE:FREQ=YEARLY;INTERVAL=1;BYSETPOS=-1;BYMONTH=8;BYDAY=SU",
"DTSTART:20140331T075000Z RRULE:FREQ=WEEKLY;INTERVAL=1;UNTIL=20230401T075000Z;BYDAY=MO,WE,FR",
"DTSTART:20140331T075000Z RRULE:FREQ=HOURLY;INTERVAL=1;UNTIL=20230610T075000Z",
"DTSTART:20140411T040000Z RRULE:FREQ=WEEKLY;INTERVAL=1;UNTIL=20140411T040000Z;BYDAY=WE"]
BAD_SCHEDULES = ["", "DTSTART:20140331T055000 RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20240331T075000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=10000000",
"DTSTART;TZID=US-Eastern:19961105T090000 RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20140331T055000Z RRULE:FREQ=SECONDLY;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=SECONDLY",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYDAY=20MO;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=MONTHLY;BYMONTHDAY=10,15;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYMONTH=1,2;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYYEARDAY=120;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYWEEKNO=10;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1 DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1 RRULE:FREQ=HOURLY;INTERVAL=1"]
class ScheduleTest(BaseTest):
def setUp(self):
super(ScheduleTest, self).setUp()
self.start_redis()
self.setup_instances()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.organizations[0].admins.add(self.normal_django_user)
self.organizations[0].users.add(self.other_django_user)
self.organizations[0].users.add(self.normal_django_user)
self.diff_org_user = self.make_user('fred')
self.organizations[1].users.add(self.diff_org_user)
self.cloud_source = Credential.objects.create(kind='awx', user=self.super_django_user,
username='Dummy', password='Dummy')
self.first_inventory = Inventory.objects.create(name='test_inventory', description='for org 0', organization=self.organizations[0])
self.first_inventory.hosts.create(name='host_1')
self.first_inventory_group = self.first_inventory.groups.create(name='group_1')
self.first_inventory_source = self.first_inventory_group.inventory_source
self.first_inventory_source.source = 'ec2'
self.first_inventory_source.save()
Permission.objects.create(
inventory = self.first_inventory,
user = self.other_django_user,
permission_type = 'read'
)
self.second_inventory = Inventory.objects.create(name='test_inventory_2', description='for org 0', organization=self.organizations[0])
self.second_inventory.hosts.create(name='host_2')
self.second_inventory_group = self.second_inventory.groups.create(name='group_2')
self.second_inventory_source = self.second_inventory_group.inventory_source
self.second_inventory_source.source = 'ec2'
self.second_inventory_source.save()
self.first_schedule = Schedule.objects.create(name='test_schedule_1', unified_job_template=self.first_inventory_source,
enabled=True, rrule=GOOD_SCHEDULES[0])
self.second_schedule = Schedule.objects.create(name='test_schedule_2', unified_job_template=self.second_inventory_source,
enabled=True, rrule=GOOD_SCHEDULES[0])
self.without_valid_source_inventory = Inventory.objects.create(name='without valid source', description='for org 0', organization=self.organizations[0])
self.without_valid_source_inventory.hosts.create(name='host_3')
self.without_valid_source_inventory_group = self.without_valid_source_inventory.groups.create(name='not valid source')
self.without_valid_source_inventory_source = self.without_valid_source_inventory_group.inventory_source
def tearDown(self):
super(ScheduleTest, self).tearDown()
self.stop_redis()
def test_schedules_list(self):
url = reverse('api:schedule_list')
enabled_schedules = Schedule.objects.filter(enabled=True).distinct()
empty_schedules = Schedule.objects.none()
org_1_schedules = Schedule.objects.filter(unified_job_template=self.first_inventory_source)
#Super user can see everything
self.check_get_list(url, self.super_django_user, enabled_schedules)
# Unauth user should have no access
self.check_invalid_auth(url)
# regular org user with read permission can see only their schedules
self.check_get_list(url, self.other_django_user, org_1_schedules)
# other org user with no read perm can't see anything
self.check_get_list(url, self.diff_org_user, empty_schedules)
def test_post_new_schedule(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
reverse('api:inventory_source_schedules_list', args=(self.second_inventory_source.pk,))
new_schedule = dict(name='newsched_1', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
# No auth should fail
self.check_invalid_auth(first_url, new_schedule, methods=('post',))
# Super user can post a new schedule
with self.current_user(self.super_django_user):
self.post(first_url, data=new_schedule, expect=201)
# #admin can post
admin_schedule = dict(name='newsched_2', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
self.post(first_url, data=admin_schedule, expect=201, auth=self.get_normal_credentials())
#normal user without write access can't post
unauth_schedule = dict(name='newsched_3', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.other_django_user):
self.post(first_url, data=unauth_schedule, expect=403)
#give normal user write access and then they can post
Permission.objects.create(
user = self.other_django_user,
inventory = self.first_inventory,
permission_type = PERM_INVENTORY_WRITE
)
auth_schedule = unauth_schedule
with self.current_user(self.other_django_user):
self.post(first_url, data=auth_schedule, expect=201)
# another org user shouldn't be able to post a schedule to this org's schedule
diff_user_schedule = dict(name='newsched_4', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.diff_org_user):
self.post(first_url, data=diff_user_schedule, expect=403)
def test_post_schedule_to_non_cloud_source(self):
invalid_inv_url = reverse('api:inventory_source_schedules_list', args=(self.without_valid_source_inventory_source.pk,))
new_schedule = dict(name='newsched_1', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.super_django_user):
self.post(invalid_inv_url, data=new_schedule, expect=400)
def test_update_existing_schedule(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
new_schedule = dict(name='edit_schedule', description='going to change', enabled=True, rrule=EXPIRED_SCHEDULES[0])
with self.current_user(self.normal_django_user):
data = self.post(first_url, new_schedule, expect=201)
self.assertEquals(data['next_run'], None)
new_schedule_url = reverse('api:schedule_detail', args=(data['id'],))
data['rrule'] = GOOD_SCHEDULES[0]
with self.current_user(self.normal_django_user):
data = self.put(new_schedule_url, data=data, expect=200)
self.assertNotEqual(data['next_run'], None)
#TODO: Test path needed for non org-admin users, but rather regular users who have permission to create the JT associated with the Schedule
def test_infinite_schedule(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
new_schedule = dict(name='inf_schedule', description='going forever', enabled=True, rrule=INFINITE_SCHEDULES[0])
with self.current_user(self.normal_django_user):
data = self.post(first_url, new_schedule, expect=201)
self.assertEquals(data['dtend'], None)
long_schedule = dict(name='long_schedule', description='going for a long time', enabled=True, rrule=UNTIL_SCHEDULE)
with self.current_user(self.normal_django_user):
data = self.post(first_url, long_schedule, expect=201)
self.assertNotEquals(data['dtend'], None)
def test_schedule_filtering(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
start_time = now() + datetime.timedelta(minutes=5)
dtstart_str = start_time.strftime("%Y%m%dT%H%M%SZ")
new_schedule = dict(name="filter_schedule_1", enabled=True, rrule="DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5" % dtstart_str)
with self.current_user(self.normal_django_user):
self.post(first_url, new_schedule, expect=201)
self.assertTrue(Schedule.objects.enabled().between(now(), now() + datetime.timedelta(minutes=10)).count(), 1)
start_time = now()
dtstart_str = start_time.strftime("%Y%m%dT%H%M%SZ")
new_schedule_middle = dict(name="runnable_schedule", enabled=True, rrule="DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5" % dtstart_str)
with self.current_user(self.normal_django_user):
self.post(first_url, new_schedule_middle, expect=201)
self.assertTrue(Schedule.objects.enabled().between(now() - datetime.timedelta(minutes=10), now() + datetime.timedelta(minutes=10)).count(), 1)
def test_rrule_validation(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
with self.current_user(self.normal_django_user):
for good_rule in GOOD_SCHEDULES:
sched_dict = dict(name=good_rule, enabled=True, rrule=good_rule)
self.post(first_url, sched_dict, expect=201)
for bad_rule in BAD_SCHEDULES:
sched_dict = dict(name=bad_rule, enabled=True, rrule=bad_rule)
self.post(first_url, sched_dict, expect=400)

View File

@@ -0,0 +1,403 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import json
import os
import subprocess
import sys
import urlparse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
__all__ = ['InventoryScriptTest']
class BaseScriptTest(BaseLiveServerTest):
'''
Base class for tests that run external scripts to access the API.
'''
def setUp(self):
super(BaseScriptTest, self).setUp()
self._sys_path = [x for x in sys.path]
self._environ = dict(os.environ.items())
self._temp_files = []
def tearDown(self):
super(BaseScriptTest, self).tearDown()
sys.path = self._sys_path
for k,v in self._environ.items():
if os.environ.get(k, None) != v:
os.environ[k] = v
for k,v in os.environ.items():
if k not in self._environ.keys():
del os.environ[k]
for tf in self._temp_files:
if os.path.exists(tf):
os.remove(tf)
def run_script(self, name, *args, **options):
'''
Run an external script and capture its stdout/stderr and return code.
'''
#stdin_fileobj = options.pop('stdin_fileobj', None)
pargs = [name]
for k,v in options.items():
pargs.append('%s%s' % ('-' if len(k) == 1 else '--', k))
if v is not True:
pargs.append(str(v))
for arg in args:
pargs.append(str(arg))
proc = subprocess.Popen(pargs, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
return proc.returncode, stdout, stderr
class InventoryScriptTest(BaseScriptTest):
'''
Test helper to run management command as standalone script.
'''
def setUp(self):
super(InventoryScriptTest, self).setUp()
self.start_redis()
self.setup_instances()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.projects = self.make_projects(self.normal_django_user, 2)
self.organizations[0].projects.add(self.projects[1])
self.organizations[1].projects.add(self.projects[0])
self.inventories = []
self.hosts = []
self.groups = []
for n, organization in enumerate(self.organizations):
inventory = Inventory.objects.create(name='inventory-%d' % n,
description='description for inventory %d' % n,
organization=organization,
variables=json.dumps({'n': n}) if n else '')
self.inventories.append(inventory)
hosts = []
for x in xrange(10):
if n > 0:
variables = json.dumps({'ho': 'hum-%d' % x})
else:
variables = ''
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variables=variables)
if x in (3, 7):
host.mark_inactive()
hosts.append(host)
# add localhost just to make sure it's thrown into all (Ansible github bug)
local = inventory.hosts.create(name='localhost', inventory=inventory, variables={})
hosts.append(local)
self.hosts.extend(hosts)
groups = []
for x in xrange(5):
if n > 0:
variables = json.dumps({'gee': 'whiz-%d' % x})
else:
variables = ''
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variables=variables)
if x == 2:
group.mark_inactive()
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
if n > 0 and x == 4:
group.parents.add(groups[3])
if x == 4:
group.hosts.add(local)
self.groups.extend(groups)
def tearDown(self):
super(InventoryScriptTest, self).tearDown()
self.stop_redis()
def run_inventory_script(self, *args, **options):
rest_api_url = self.live_server_url
parts = urlparse.urlsplit(rest_api_url)
username, password = self.get_super_credentials()
netloc = '%s:%s@%s' % (username, password, parts.netloc)
rest_api_url = urlparse.urlunsplit([parts.scheme, netloc, parts.path,
parts.query, parts.fragment])
os.environ.setdefault('REST_API_URL', rest_api_url)
#os.environ.setdefault('REST_API_TOKEN',
# self.super_django_user.auth_token.key)
name = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'plugins',
'inventory', 'awxrest.py')
return self.run_script(name, *args, **options)
def test_without_inventory_id(self):
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
rc, stdout, stderr = self.run_inventory_script(host=self.hosts[0].name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_list_with_inventory_id_as_argument(self):
inventory = self.inventories[0]
self.assertTrue(inventory.active)
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups.filter(active=True)
groupnames = [ x for x in groups.values_list('name', flat=True)]
# it's ok for all to be here because due to an Ansible inventory workaround
# 127.0.0.1/localhost must show up in the all group
groupnames.append('all')
self.assertEqual(set(data.keys()), set(groupnames))
# Groups for this inventory should only have hosts, and no group
# variable data or parent/child relationships.
for k,v in data.items():
if k != 'all':
self.assertTrue(isinstance(v, dict))
self.assertTrue(isinstance(v['children'], (list,tuple)))
self.assertTrue(isinstance(v['hosts'], (list,tuple)))
self.assertTrue(isinstance(v['vars'], (dict)))
group = inventory.groups.get(active=True, name=k)
hosts = group.hosts.filter(active=True)
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v['hosts']), set(hostnames))
else:
self.assertTrue(v['hosts'] == ['localhost'])
for group in inventory.groups.filter(active=False):
self.assertFalse(group.name in data.keys(),
'deleted group %s should not be in data' % group)
# Command line argument for inventory ID should take precedence over
# environment variable.
inventory_pks = set(map(lambda x: x.pk, self.inventories))
invalid_id = [x for x in xrange(9999) if x not in inventory_pks][0]
os.environ['INVENTORY_ID'] = str(invalid_id)
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
def test_list_with_inventory_id_in_environment(self):
inventory = self.inventories[1]
self.assertTrue(inventory.active)
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups.filter(active=True)
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
# Groups for this inventory should have hosts, variable data, and one
# parent/child relationship.
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
group = inventory.groups.get(active=True, name=k)
hosts = group.hosts.filter(active=True)
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
children = group.children.filter(active=True)
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
self.assertTrue(len(v['children']) == 0)
def test_list_with_hostvars_inline(self):
inventory = self.inventories[1]
self.assertTrue(inventory.active)
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk,
hostvars=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups.filter(active=True)
groupnames = list(groups.values_list('name', flat=True))
groupnames.extend(['all', '_meta'])
self.assertEqual(set(data.keys()), set(groupnames))
all_hostnames = set()
# Groups for this inventory should have hosts, variable data, and one
# parent/child relationship.
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
if k == '_meta':
continue
group = inventory.groups.get(active=True, name=k)
hosts = group.hosts.filter(active=True)
hostnames = hosts.values_list('name', flat=True)
all_hostnames.update(hostnames)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
children = group.children.filter(active=True)
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
self.assertTrue(len(v['children']) == 0)
# Check hostvars in ['_meta']['hostvars'] dict.
for hostname in all_hostnames:
self.assertTrue(hostname in data['_meta']['hostvars'])
host = inventory.hosts.get(name=hostname)
self.assertEqual(data['_meta']['hostvars'][hostname],
host.variables_dict)
# Hostvars can also be requested via environment variable.
os.environ['INVENTORY_HOSTVARS'] = str(True)
rc, stdout, stderr = self.run_inventory_script(list=True,
inventory=inventory.pk)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
self.assertTrue('_meta' in data)
def test_valid_host(self):
# Host without variable data.
inventory = self.inventories[0]
self.assertTrue(inventory.active)
host = inventory.hosts.filter(active=True)[2]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
self.assertEqual(data, {})
# Host with variable data.
inventory = self.inventories[1]
self.assertTrue(inventory.active)
host = inventory.hosts.filter(active=True)[4]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
self.assertEqual(data, host.variables_dict)
def test_invalid_host(self):
# Valid host, but not part of the specified inventory.
inventory = self.inventories[0]
self.assertTrue(inventory.active)
host = Host.objects.get(id=12)
self.assertTrue(host.active)
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
# Invalid hostname not in database.
rc, stdout, stderr = self.run_inventory_script(host='blah.example.com')
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_invalid_inventory_id(self):
inventory_pks = set(map(lambda x: x.pk, self.inventories))
invalid_id = [x for x in xrange(1, 9999) if x not in inventory_pks][0]
os.environ['INVENTORY_ID'] = str(invalid_id)
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
os.environ['INVENTORY_ID'] = 'not_an_int'
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
os.environ['INVENTORY_ID'] = str(invalid_id)
rc, stdout, stderr = self.run_inventory_script(host=self.hosts[1].name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
os.environ['INVENTORY_ID'] = 'not_an_int'
rc, stdout, stderr = self.run_inventory_script(host=self.hosts[2].name)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_deleted_inventory(self):
inventory = self.inventories[0]
inventory.mark_inactive()
self.assertFalse(inventory.active)
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_without_list_or_host_argument(self):
inventory = self.inventories[0]
self.assertTrue(inventory.active)
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script()
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_both_list_and_host_arguments(self):
inventory = self.inventories[0]
self.assertTrue(inventory.active)
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(list=True, host='blah')
self.assertNotEqual(rc, 0, stderr)
self.assertEqual(json.loads(stdout), {'failed': True})
def test_with_disabled_hosts(self):
inventory = self.inventories[1]
self.assertTrue(inventory.active)
for host in inventory.hosts.filter(active=True, enabled=True):
host.enabled = False
host.save(update_fields=['enabled'])
os.environ['INVENTORY_ID'] = str(inventory.pk)
# Load inventory list as normal (only enabled hosts).
rc, stdout, stderr = self.run_inventory_script(list=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups.filter(active=True)
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
group = inventory.groups.get(active=True, name=k)
hosts = group.hosts.filter(active=True, enabled=True)
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
self.assertFalse(hostnames)
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
children = group.children.filter(active=True)
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
self.assertTrue(len(v['children']) == 0)
# Load inventory list with all hosts.
rc, stdout, stderr = self.run_inventory_script(list=True, all=True)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
groups = inventory.groups.filter(active=True)
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
group = inventory.groups.get(active=True, name=k)
hosts = group.hosts.filter(active=True)
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
self.assertTrue(hostnames)
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
children = group.children.filter(active=True)
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
self.assertTrue(len(v['children']) == 0)

View File

@@ -0,0 +1,105 @@
# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved.
from awx.main.tests.base import BaseTest
from awx.main.models import * # noqa
from django.core.urlresolvers import reverse
from django.test.utils import override_settings
TEST_TOWER_SETTINGS_MANIFEST = {
"TEST_SETTING_INT": {
"name": "An Integer Field",
"description": "An Integer Field",
"default": 1,
"type": "int",
"category": "test"
},
"TEST_SETTING_STRING": {
"name": "A String Field",
"description": "A String Field",
"default": "test",
"type": "string",
"category": "test"
},
"TEST_SETTING_BOOL": {
"name": "A Bool Field",
"description": "A Bool Field",
"default": True,
"type": "bool",
"category": "test"
},
"TEST_SETTING_LIST": {
"name": "A List Field",
"description": "A List Field",
"default": ["A", "Simple", "List"],
"type": "list",
"category": "test"
}
}
@override_settings(TOWER_SETTINGS_MANIFEST=TEST_TOWER_SETTINGS_MANIFEST)
class SettingsTest(BaseTest):
def setUp(self):
super(SettingsTest, self).setUp()
self.setup_instances()
self.setup_users()
def get_settings(self, expected_count=4):
result = self.get(reverse('api:settings_list'), expect=200)
self.assertEqual(result['count'], expected_count)
return result['results']
def get_individual_setting(self, setting):
all_settings = self.get_settings()
setting_actual = None
for setting_item in all_settings:
if setting_item['key'] == setting:
setting_actual = setting_item
break
self.assertIsNotNone(setting_actual)
return setting_actual
def set_setting(self, key, value):
self.post(reverse('api:settings_list'), data={"key": key, "value": value}, expect=201)
def test_get_settings(self):
# Regular user should see nothing (no user settings yet)
with self.current_user(self.normal_django_user):
self.get_settings(expected_count=0)
# anonymous user should get a 401
self.get(reverse('api:settings_list'), expect=401)
# super user can see everything
with self.current_user(self.super_django_user):
self.get_settings(expected_count=len(TEST_TOWER_SETTINGS_MANIFEST))
def test_set_and_reset_settings(self):
settings_reset = reverse('api:settings_reset')
with self.current_user(self.super_django_user):
# Set and reset a single setting
setting_int = self.get_individual_setting('TEST_SETTING_INT')
self.assertEqual(setting_int['value'], TEST_TOWER_SETTINGS_MANIFEST['TEST_SETTING_INT']['default'])
self.set_setting('TEST_SETTING_INT', 2)
setting_int = self.get_individual_setting('TEST_SETTING_INT')
self.assertEqual(setting_int['value'], 2)
self.post(settings_reset, data={"key": 'TEST_SETTING_INT'}, expect=204)
setting_int = self.get_individual_setting('TEST_SETTING_INT')
self.assertEqual(setting_int['value'], TEST_TOWER_SETTINGS_MANIFEST['TEST_SETTING_INT']['default'])
def test_clear_all_settings(self):
settings_list = reverse('api:settings_list')
with self.current_user(self.super_django_user):
self.set_setting('TEST_SETTING_INT', 2)
self.set_setting('TEST_SETTING_STRING', "foo")
self.set_setting('TEST_SETTING_BOOL', False)
self.set_setting('TEST_SETTING_LIST', [1,2,3])
all_settings = self.get_settings()
for setting_entry in all_settings:
self.assertNotEqual(setting_entry['value'],
TEST_TOWER_SETTINGS_MANIFEST[setting_entry['key']]['default'])
self.delete(settings_list, expect=200)
all_settings = self.get_settings()
for setting_entry in all_settings:
self.assertEqual(setting_entry['value'],
TEST_TOWER_SETTINGS_MANIFEST[setting_entry['key']]['default'])

1281
awx/main/tests/old/tasks.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,55 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import mock
from mock import Mock
from StringIO import StringIO
from django.utils.timezone import now
# Django
from django.test import SimpleTestCase
# AWX
from awx.main.models import * # noqa
__all__ = ['UnifiedJobsUnitTest',]
class UnifiedJobsUnitTest(SimpleTestCase):
# stdout file present
@mock.patch('os.path.exists', return_value=True)
@mock.patch('codecs.open', return_value='my_file_handler')
def test_result_stdout_raw_handle_file__found(self, exists, open):
unified_job = UnifiedJob()
unified_job.result_stdout_file = 'dummy'
with mock.patch('os.stat', return_value=Mock(st_size=1)):
result = unified_job.result_stdout_raw_handle()
self.assertEqual(result, 'my_file_handler')
# stdout file missing, job finished
@mock.patch('os.path.exists', return_value=False)
def test_result_stdout_raw_handle__missing(self, exists):
unified_job = UnifiedJob()
unified_job.result_stdout_file = 'dummy'
unified_job.finished = now()
result = unified_job.result_stdout_raw_handle()
self.assertIsInstance(result, StringIO)
self.assertEqual(result.read(), 'stdout capture is missing')
# stdout file missing, job not finished
@mock.patch('os.path.exists', return_value=False)
def test_result_stdout_raw_handle__pending(self, exists):
unified_job = UnifiedJob()
unified_job.result_stdout_file = 'dummy'
unified_job.finished = None
result = unified_job.result_stdout_raw_handle()
self.assertIsInstance(result, StringIO)
self.assertEqual(result.read(), 'Waiting for results...')

1149
awx/main/tests/old/users.py Normal file

File diff suppressed because it is too large Load Diff

113
awx/main/tests/old/views.py Normal file
View File

@@ -0,0 +1,113 @@
# Django
from django.core.urlresolvers import reverse
# Reuse Test code
from awx.main.tests.base import (
BaseLiveServerTest,
QueueStartStopTestMixin,
URI,
)
from awx.main.models.projects import * # noqa
__all__ = ['UnifiedJobStdoutRedactedTests']
TEST_STDOUTS = []
uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs")
TEST_STDOUTS.append({
'description': 'uri in a plain text document',
'uri' : uri,
'text' : 'hello world %s goodbye world' % uri,
'occurrences' : 1
})
uri = URI(scheme="https", username="applepie@@@", password="thatyouknow@@@@", host="github.ginger.com/theirrepo.git/info/refs")
TEST_STDOUTS.append({
'description': 'uri appears twice in a multiline plain text document',
'uri' : uri,
'text' : 'hello world %s \n\nyoyo\n\nhello\n%s' % (uri, uri),
'occurrences' : 2
})
class UnifiedJobStdoutRedactedTests(BaseLiveServerTest, QueueStartStopTestMixin):
def setUp(self):
super(UnifiedJobStdoutRedactedTests, self).setUp()
self.setup_instances()
self.setup_users()
self.test_cases = []
self.negative_test_cases = []
proj = self.make_project()
for e in TEST_STDOUTS:
e['project'] = ProjectUpdate(project=proj)
e['project'].result_stdout_text = e['text']
e['project'].save()
self.test_cases.append(e)
for d in TEST_STDOUTS:
d['job'] = self.make_job()
d['job'].result_stdout_text = d['text']
d['job'].save()
self.negative_test_cases.append(d)
# This is more of a functional test than a unit test.
# should filter out username and password
def check_sensitive_redacted(self, test_data, response):
uri = test_data['uri']
self.assertIsNotNone(response['content'])
self.check_not_found(response['content'], uri.username, test_data['description'])
self.check_not_found(response['content'], uri.password, test_data['description'])
# Ensure the host didn't get redacted
self.check_found(response['content'], uri.host, test_data['occurrences'], test_data['description'])
def check_sensitive_not_redacted(self, test_data, response):
uri = test_data['uri']
self.assertIsNotNone(response['content'])
self.check_found(response['content'], uri.username, description=test_data['description'])
self.check_found(response['content'], uri.password, description=test_data['description'])
def _get_url_job_stdout(self, job, url_base, format='json'):
formats = {
'json': 'application/json',
'ansi': 'text/plain',
'txt': 'text/plain',
'html': 'text/html',
}
content_type = formats[format]
project_update_stdout_url = reverse(url_base, args=(job.pk,)) + "?format=" + format
return self.get(project_update_stdout_url, expect=200, auth=self.get_super_credentials(), accept=content_type)
def _test_redaction_enabled(self, format):
for test_data in self.test_cases:
response = self._get_url_job_stdout(test_data['project'], "api:project_update_stdout", format=format)
self.check_sensitive_redacted(test_data, response)
def _test_redaction_disabled(self, format):
for test_data in self.negative_test_cases:
response = self._get_url_job_stdout(test_data['job'], "api:job_stdout", format=format)
self.check_sensitive_not_redacted(test_data, response)
def test_project_update_redaction_enabled_json(self):
self._test_redaction_enabled('json')
def test_project_update_redaction_enabled_ansi(self):
self._test_redaction_enabled('ansi')
def test_project_update_redaction_enabled_html(self):
self._test_redaction_enabled('html')
def test_project_update_redaction_enabled_txt(self):
self._test_redaction_enabled('txt')
def test_job_redaction_disabled_json(self):
self._test_redaction_disabled('json')
def test_job_redaction_disabled_ansi(self):
self._test_redaction_disabled('ansi')
def test_job_redaction_disabled_html(self):
self._test_redaction_disabled('html')
def test_job_redaction_disabled_txt(self):
self._test_redaction_disabled('txt')