Major rename of package from lib to ansibleworks.

This commit is contained in:
Chris Church
2013-05-21 18:20:26 -04:00
parent 5133b9a30e
commit aeac739735
264 changed files with 230 additions and 316 deletions

View File

@@ -0,0 +1,11 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
from ansibleworks.main.tests.organizations import OrganizationsTest
from ansibleworks.main.tests.users import UsersTest
from ansibleworks.main.tests.inventory import InventoryTest
from ansibleworks.main.tests.projects import ProjectsTest
from ansibleworks.main.tests.commands import *
from ansibleworks.main.tests.tasks import RunJobTest
from ansibleworks.main.tests.jobs import *

View File

@@ -0,0 +1,212 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import contextlib
import datetime
import json
import os
import shutil
import tempfile
from django.conf import settings
from django.contrib.auth.models import User
import django.test
from django.test.client import Client
from ansibleworks.main.models import *
class BaseTestMixin(object):
'''
Mixin with shared code for use by all test cases.
'''
def setUp(self):
super(BaseTestMixin, self).setUp()
self.object_ctr = 0
self._temp_project_dirs = []
self._current_auth = None
self._user_passwords = {}
def tearDown(self):
super(BaseTestMixin, self).tearDown()
for project_dir in self._temp_project_dirs:
if os.path.exists(project_dir):
shutil.rmtree(project_dir, True)
@contextlib.contextmanager
def current_user(self, user_or_username, password=None):
try:
if isinstance(user_or_username, User):
username = user_or_username.username
else:
username = user_or_username
password = password or self._user_passwords.get(username)
previous_auth = self._current_auth
if username is None:
self._current_auth = None
else:
self._current_auth = (username, password)
yield
finally:
self._current_auth = previous_auth
def make_user(self, username, password=None, super_user=False):
user = None
password = password or username
if super_user:
user = User.objects.create_superuser(username, "%s@example.com", password)
else:
user = User.objects.create_user(username, "%s@example.com", password)
self.assertTrue(user.auth_token)
self._user_passwords[user.username] = password
return user
def make_organizations(self, created_by, count=1):
results = []
for x in range(0, count):
self.object_ctr = self.object_ctr + 1
results.append(Organization.objects.create(
name="org%s-%s" % (x, self.object_ctr), description="org%s" % x, created_by=created_by
))
return results
def make_project(self, name, description='', created_by=None,
playbook_content=''):
if not os.path.exists(settings.PROJECTS_ROOT):
os.makedirs(settings.PROJECTS_ROOT)
# Create temp project directory.
project_dir = tempfile.mkdtemp(dir=settings.PROJECTS_ROOT)
self._temp_project_dirs.append(project_dir)
# Create temp playbook in project (if playbook content is given).
if playbook_content:
handle, playbook_path = tempfile.mkstemp(suffix='.yml',
dir=project_dir)
test_playbook_file = os.fdopen(handle, 'w')
test_playbook_file.write(playbook_content)
test_playbook_file.close()
return Project.objects.create(
name=name, description=description,
local_path=os.path.basename(project_dir), created_by=created_by,
#scm_type='git', default_playbook='foo.yml',
)
def make_projects(self, created_by, count=1, playbook_content=''):
results = []
for x in range(0, count):
self.object_ctr = self.object_ctr + 1
results.append(self.make_project(
name="proj%s-%s" % (x, self.object_ctr),
description="proj%s" % x,
created_by=created_by,
playbook_content=playbook_content,
))
return results
def check_pagination_and_size(self, data, desired_count, previous=None, next=None):
self.assertTrue('results' in data)
self.assertEqual(data['count'], desired_count)
self.assertEqual(data['previous'], previous)
self.assertEqual(data['next'], next)
def check_list_ids(self, data, queryset, check_order=False):
data_ids = [x['id'] for x in data['results']]
qs_ids = queryset.values_list('pk', flat=True)
if check_order:
self.assertEqual(data_ids, qs_ids)
else:
self.assertEqual(set(data_ids), set(qs_ids))
def setup_users(self, just_super_user=False):
# Create a user.
self.super_username = 'admin'
self.super_password = 'admin'
self.normal_username = 'normal'
self.normal_password = 'normal'
self.other_username = 'other'
self.other_password = 'other'
self.super_django_user = self.make_user(self.super_username, self.super_password, super_user=True)
if not just_super_user:
self.normal_django_user = self.make_user(self.normal_username, self.normal_password, super_user=False)
self.other_django_user = self.make_user(self.other_username, self.other_password, super_user=False)
def get_super_credentials(self):
return (self.super_username, self.super_password)
def get_normal_credentials(self):
return (self.normal_username, self.normal_password)
def get_other_credentials(self):
return (self.other_username, self.other_password)
def get_invalid_credentials(self):
return ('random', 'combination')
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None):
assert method is not None
method_name = method.lower()
if method_name not in ('options', 'head', 'get', 'delete'):
assert data is not None
client = Client()
auth = auth or self._current_auth
if auth:
if isinstance(auth, (list, tuple)):
client.login(username=auth[0], password=auth[1])
elif isinstance(auth, basestring):
client = Client(HTTP_AUTHORIZATION='Token %s' % auth)
method = getattr(client, method_name)
response = None
if data is not None:
response = method(url, json.dumps(data), 'application/json')
else:
response = method(url)
if response.status_code == 500 and expect != 500:
assert False, "Failed: %s" % response.content
if expect is not None:
assert response.status_code == expect, "expected status %s, got %s for url=%s as auth=%s: %s" % (expect, response.status_code, url, auth, response.content)
if method_name == 'head':
self.assertFalse(response.content)
if response.status_code not in [ 202, 204, 405 ] and method_name != 'head' and response.content:
# no JSON responses in these at least for now, 409 should probably return some (FIXME)
return json.loads(response.content)
else:
return None
def options(self, url, expect=200, auth=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='options')
def head(self, url, expect=200, auth=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='head')
def get(self, url, expect=200, auth=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='get')
def post(self, url, data, expect=204, auth=None):
return self._generic_rest(url, data=data, expect=expect, auth=auth, method='post')
def put(self, url, data, expect=200, auth=None):
return self._generic_rest(url, data=data, expect=expect, auth=auth, method='put')
def patch(self, url, data, expect=200, auth=None):
return self._generic_rest(url, data=data, expect=expect, auth=auth, method='patch')
def delete(self, url, expect=201, auth=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='delete')
def get_urls(self, collection_url, auth=None):
# TODO: this test helper function doesn't support pagination
data = self.get(collection_url, expect=200, auth=auth)
return [item['url'] for item in data['results']]
class BaseTest(BaseTestMixin, django.test.TestCase):
'''
Base class for unit tests.
'''
class BaseTransactionTest(BaseTestMixin, django.test.TransactionTestCase):
'''
Base class for tests requiring transactions (or where the test database
needs to be accessed by subprocesses).
'''

View File

@@ -0,0 +1,408 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import json
import os
import StringIO
import sys
import tempfile
from django.conf import settings
from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils.timezone import now
from ansibleworks.main.models import *
from ansibleworks.main.tests.base import BaseTest
__all__ = ['RunCommandAsScriptTest', 'AcomInventoryTest',
'AcomCallbackEventTest']
class BaseCommandTest(BaseTest):
'''
Base class for tests that run management commands.
'''
def setUp(self):
super(BaseCommandTest, self).setUp()
self._sys_path = [x for x in sys.path]
self._environ = dict(os.environ.items())
self._temp_files = []
def tearDown(self):
super(BaseCommandTest, self).tearDown()
sys.path = self._sys_path
for k,v in self._environ.items():
if os.environ.get(k, None) != v:
os.environ[k] = v
for k,v in os.environ.items():
if k not in self._environ.keys():
del os.environ[k]
for tf in self._temp_files:
if os.path.exists(tf):
os.remove(tf)
def run_command(self, name, *args, **options):
'''
Run a management command and capture its stdout/stderr along with any
exceptions.
'''
command_runner = options.pop('command_runner', call_command)
stdin_fileobj = options.pop('stdin_fileobj', None)
options.setdefault('verbosity', 1)
options.setdefault('interactive', False)
original_stdin = sys.stdin
original_stdout = sys.stdout
original_stderr = sys.stderr
if stdin_fileobj:
sys.stdin = stdin_fileobj
sys.stdout = StringIO.StringIO()
sys.stderr = StringIO.StringIO()
result = None
try:
result = command_runner(name, *args, **options)
except Exception, e:
result = e
except SystemExit, e:
result = e
finally:
captured_stdout = sys.stdout.getvalue()
captured_stderr = sys.stderr.getvalue()
sys.stdin = original_stdin
sys.stdout = original_stdout
sys.stderr = original_stderr
return result, captured_stdout, captured_stderr
class RunCommandAsScriptTest(BaseCommandTest):
'''
Test helper to run management command as standalone script.
'''
def test_run_command_as_script(self):
from ansibleworks.main.management.commands import run_command_as_script
os.environ['ACOM_TEST_DATABASE_NAME'] = settings.DATABASES['default']['NAME']
# FIXME: Not sure how to test ImportError for settings module.
def run_cmd(name, *args, **kwargs):
return run_command_as_script(name)
result, stdout, stderr = self.run_command('version',
command_runner=run_cmd)
self.assertEqual(result, None)
self.assertTrue(stdout)
self.assertFalse(stderr)
class AcomInventoryTest(BaseCommandTest):
'''
Test cases for acom_inventory management command.
'''
def setUp(self):
super(AcomInventoryTest, self).setUp()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.projects = self.make_projects(self.normal_django_user, 2)
self.organizations[0].projects.add(self.projects[1])
self.organizations[1].projects.add(self.projects[0])
self.inventories = []
self.hosts = []
self.groups = []
for n, organization in enumerate(self.organizations):
inventory = Inventory.objects.create(name='inventory-%d' % n,
description='description for inventory %d' % n,
organization=organization)
self.inventories.append(inventory)
hosts = []
for x in xrange(10):
if n > 0:
variable_data = VariableData.objects.create(data=json.dumps({'ho': 'hum-%d' % x}))
else:
variable_data = None
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variable_data=variable_data)
hosts.append(host)
self.hosts.extend(hosts)
groups = []
for x in xrange(5):
if n > 0:
variable_data = VariableData.objects.create(data=json.dumps({'gee': 'whiz-%d' % x}))
else:
variable_data = None
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variable_data=variable_data)
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
if n > 0 and x == 4:
group.parents.add(groups[3])
self.groups.extend(groups)
def test_without_inventory_id(self):
result, stdout, stderr = self.run_command('acom_inventory', list=True)
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
result, stdout, stderr = self.run_command('acom_inventory',
host=self.hosts[0])
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
def test_list_with_inventory_id_as_argument(self):
inventory = self.inventories[0]
result, stdout, stderr = self.run_command('acom_inventory', list=True,
inventory_id=inventory.pk)
self.assertEqual(result, None)
data = json.loads(stdout)
self.assertEqual(set(data.keys()),
set(inventory.groups.values_list('name', flat=True)))
# Groups for this inventory should only have hosts, and no group
# variable data or parent/child relationships.
for k,v in data.items():
self.assertTrue(isinstance(v, (list, tuple)))
group = inventory.groups.get(name=k)
self.assertEqual(set(v),
set(group.hosts.values_list('name', flat=True)))
# Command line argument for inventory ID should take precedence over
# environment variable.
inventory_pks = set(map(lambda x: x.pk, self.inventories))
invalid_id = [x for x in xrange(9999) if x not in inventory_pks][0]
os.environ['ACOM_INVENTORY_ID'] = str(invalid_id)
result, stdout, stderr = self.run_command('acom_inventory', list=True,
inventory_id=inventory.pk)
self.assertEqual(result, None)
data = json.loads(stdout)
def test_list_with_inventory_id_in_environment(self):
inventory = self.inventories[1]
os.environ['ACOM_INVENTORY_ID'] = str(inventory.pk)
result, stdout, stderr = self.run_command('acom_inventory', list=True)
self.assertEqual(result, None)
data = json.loads(stdout)
self.assertEqual(set(data.keys()),
set(inventory.groups.values_list('name', flat=True)))
# Groups for this inventory should have hosts, variable data, and one
# parent/child relationship.
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
group = inventory.groups.get(name=k)
self.assertEqual(set(v.get('hosts', [])),
set(group.hosts.values_list('name', flat=True)))
if group.variable_data:
self.assertEqual(v.get('vars', {}),
json.loads(group.variable_data.data))
if k == 'group-3':
self.assertEqual(set(v.get('children', [])),
set(group.children.values_list('name', flat=True)))
else:
self.assertFalse('children' in v)
def test_valid_host(self):
# Host without variable data.
inventory = self.inventories[0]
host = inventory.hosts.all()[2]
os.environ['ACOM_INVENTORY_ID'] = str(inventory.pk)
result, stdout, stderr = self.run_command('acom_inventory',
host=host.name)
self.assertEqual(result, None)
data = json.loads(stdout)
self.assertEqual(data, {})
# Host with variable data.
inventory = self.inventories[1]
host = inventory.hosts.all()[3]
os.environ['ACOM_INVENTORY_ID'] = str(inventory.pk)
result, stdout, stderr = self.run_command('acom_inventory',
host=host.name)
self.assertEqual(result, None)
data = json.loads(stdout)
self.assertEqual(data, json.loads(host.variable_data.data))
def test_invalid_host(self):
# Valid host, but not part of the specified inventory.
inventory = self.inventories[0]
host = Host.objects.exclude(inventory=inventory)[0]
os.environ['ACOM_INVENTORY_ID'] = str(inventory.pk)
result, stdout, stderr = self.run_command('acom_inventory',
host=host.name)
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
# Invalid hostname not in database.
result, stdout, stderr = self.run_command('acom_inventory',
host='invalid.example.com')
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
def test_with_invalid_inventory_id(self):
inventory_pks = set(map(lambda x: x.pk, self.inventories))
invalid_id = [x for x in xrange(1, 9999) if x not in inventory_pks][0]
os.environ['ACOM_INVENTORY_ID'] = str(invalid_id)
result, stdout, stderr = self.run_command('acom_inventory', list=True)
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
os.environ['ACOM_INVENTORY_ID'] = 'not_an_int'
result, stdout, stderr = self.run_command('acom_inventory', list=True)
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
os.environ['ACOM_INVENTORY_ID'] = str(invalid_id)
result, stdout, stderr = self.run_command('acom_inventory',
host=self.hosts[1])
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
os.environ['ACOM_INVENTORY_ID'] = 'not_an_int'
result, stdout, stderr = self.run_command('acom_inventory',
host=self.hosts[2])
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
def test_without_list_or_host_argument(self):
inventory = self.inventories[0]
os.environ['ACOM_INVENTORY_ID'] = str(inventory.pk)
result, stdout, stderr = self.run_command('acom_inventory')
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
def test_with_both_list_and_host_arguments(self):
inventory = self.inventories[0]
os.environ['ACOM_INVENTORY_ID'] = str(inventory.pk)
result, stdout, stderr = self.run_command('acom_inventory', list=True,
host='blah')
self.assertTrue(isinstance(result, CommandError))
self.assertEqual(json.loads(stdout), {})
class AcomCallbackEventTest(BaseCommandTest):
'''
Test cases for acom_callback_event management command.
'''
def setUp(self):
super(AcomCallbackEventTest, self).setUp()
self.setup_users()
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.project = self.make_projects(self.normal_django_user, 1)[0]
self.organization.projects.add(self.project)
self.inventory = Inventory.objects.create(name='test-inventory',
organization=self.organization)
self.host = self.inventory.hosts.create(name='host.example.com',
inventory=self.inventory)
self.group = self.inventory.groups.create(name='test-group',
inventory=self.inventory)
self.group.hosts.add(self.host)
self.job = Job.objects.create(name='job-%s' % now().isoformat(),
inventory=self.inventory,
project=self.project)
self.valid_kwargs = {
'job_id': self.job.id,
'event_type': 'playbook_on_start',
'event_data_json': json.dumps({'test_event_data': [2,4,6]}),
}
def test_with_job_status_not_running(self):
# Events can only be added when the job is running.
self.assertEqual(self.job.status, 'new')
self.job.status = 'pending'
self.job.save()
result, stdout, stderr = self.run_command('acom_callback_event',
**self.valid_kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('unable to add event ' in str(result).lower())
self.job.status = 'successful'
self.job.save()
result, stdout, stderr = self.run_command('acom_callback_event',
**self.valid_kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('unable to add event ' in str(result).lower())
self.job.status = 'failed'
self.job.save()
result, stdout, stderr = self.run_command('acom_callback_event',
**self.valid_kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('unable to add event ' in str(result).lower())
def test_with_invalid_args(self):
self.job.status = 'running'
self.job.save()
# Event type not given.
kwargs = dict(self.valid_kwargs.items())
kwargs.pop('event_type')
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('no event specified' in str(result).lower())
# Invalid event type.
kwargs = dict(self.valid_kwargs.items())
kwargs['event_type'] = 'invalid_event_type'
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('unsupported event' in str(result).lower())
# Neither file or data specified.
kwargs = dict(self.valid_kwargs.items())
kwargs.pop('event_data_json')
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('either --file or --data' in str(result).lower())
# Non-integer job ID.
kwargs = dict(self.valid_kwargs.items())
kwargs['job_id'] = 'foo'
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('id must be an integer' in str(result).lower())
# No job ID.
kwargs = dict(self.valid_kwargs.items())
kwargs.pop('job_id')
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('no job id' in str(result).lower())
# Invalid job ID.
kwargs = dict(self.valid_kwargs.items())
kwargs['job_id'] = 9999
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('not found' in str(result).lower())
# Invalid inline JSON data.
kwargs = dict(self.valid_kwargs.items())
kwargs['event_data_json'] = 'invalid json'
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('error parsing json' in str(result).lower())
# Invalid file specified.
kwargs = dict(self.valid_kwargs.items())
kwargs.pop('event_data_json')
h, tf = tempfile.mkstemp()
os.close(h)
os.remove(tf)
kwargs['event_data_file'] = '%s.json' % tf
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertTrue(isinstance(result, CommandError))
self.assertTrue('reading from' in str(result).lower())
def test_with_valid_args(self):
self.job.status = 'running'
self.job.save()
# Default valid args.
kwargs = dict(self.valid_kwargs.items())
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertEqual(result, None)
self.assertEqual(self.job.job_events.count(), 1)
# Pass job ID in environment instead.
kwargs = dict(self.valid_kwargs.items())
kwargs.pop('job_id')
os.environ['ACOM_JOB_ID'] = str(self.job.id)
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertEqual(result, None)
self.assertEqual(self.job.job_events.count(), 2)
os.environ.pop('ACOM_JOB_ID', None)
# Test with JSON data in a file instead.
kwargs = dict(self.valid_kwargs.items())
kwargs.pop('event_data_json')
h, tf = tempfile.mkstemp(suffix='.json')
self._temp_files.append(tf)
f = os.fdopen(h, 'w')
json.dump({'some_event_data': [1, 2, 3]}, f)
f.close()
kwargs['event_data_file'] = tf
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertEqual(result, None)
self.assertEqual(self.job.job_events.count(), 3)
# Test with JSON data from stdin.
kwargs = dict(self.valid_kwargs.items())
kwargs.pop('event_data_json')
kwargs['event_data_file'] = '-'
kwargs['stdin_fileobj'] = StringIO.StringIO(json.dumps({'blah': 'bleep'}))
result, stdout, stderr = self.run_command('acom_callback_event', **kwargs)
self.assertEqual(result, None)
self.assertEqual(self.job.job_events.count(), 4)

View File

@@ -0,0 +1,449 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import datetime
import json
from django.contrib.auth.models import User as DjangoUser
import django.test
from django.test.client import Client
from ansibleworks.main.models import *
from ansibleworks.main.tests.base import BaseTest
class InventoryTest(BaseTest):
def setUp(self):
super(InventoryTest, self).setUp()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 3)
self.organizations[0].admins.add(self.normal_django_user)
self.organizations[0].users.add(self.other_django_user)
self.organizations[0].users.add(self.normal_django_user)
self.inventory_a = Inventory.objects.create(name='inventory-a', description='foo', organization=self.organizations[0])
self.inventory_b = Inventory.objects.create(name='inventory-b', description='bar', organization=self.organizations[1])
# the normal user is an org admin of org 0
# create a permission here on the 'other' user so they have edit access on the org
# we may add another permission type later.
self.perm_read = Permission.objects.create(
inventory = self.inventory_b,
user = self.other_django_user,
permission_type = 'read'
)
# and make one more user that won't be a part of any org, just for negative-access testing
self.nobody_django_user = User.objects.create(username='nobody')
self.nobody_django_user.set_password('nobody')
self.nobody_django_user.save()
def get_nobody_credentials(self):
# here is a user without any permissions...
return ('nobody', 'nobody')
def test_main_line(self):
# some basic URLs...
inventories = '/api/v1/inventories/'
inventories_1 = '/api/v1/inventories/1/'
inventories_2 = '/api/v1/inventories/2/'
hosts = '/api/v1/hosts/'
groups = '/api/v1/groups/'
variables = '/api/v1/variables/'
# a super user can list inventories
data = self.get(inventories, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['count'], 2)
# an org admin can list inventories but is filtered to what he adminsters
data = self.get(inventories, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 1)
# a user who is on a team who has a read permissions on an inventory can see filtered inventories
data = self.get(inventories, expect=200, auth=self.get_other_credentials())
self.assertEquals(data['count'], 1)
# a regular user not part of anything cannot see any inventories
data = self.get(inventories, expect=200, auth=self.get_nobody_credentials())
self.assertEquals(data['count'], 0)
# a super user can get inventory records
data = self.get(inventories_1, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['name'], 'inventory-a')
# an org admin can get inventory records
data = self.get(inventories_1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['name'], 'inventory-a')
# a user who is on a team who has read permissions on an inventory can see inventory records
data = self.get(inventories_1, expect=403, auth=self.get_other_credentials())
data = self.get(inventories_2, expect=200, auth=self.get_other_credentials())
self.assertEquals(data['name'], 'inventory-b')
# a regular user cannot read any inventory records
data = self.get(inventories_1, expect=403, auth=self.get_nobody_credentials())
data = self.get(inventories_2, expect=403, auth=self.get_nobody_credentials())
# a super user can create inventory
new_inv_1 = dict(name='inventory-c', description='baz', organization=1)
data = self.post(inventories, data=new_inv_1, expect=201, auth=self.get_super_credentials())
self.assertEquals(data['id'], 3)
# an org admin of any org can create inventory, if it is one of his organizations
# the organization parameter is required!
new_inv_incomplete = dict(name='inventory-d', description='baz')
data = self.post(inventories, data=new_inv_incomplete, expect=400, auth=self.get_normal_credentials())
new_inv_not_my_org = dict(name='inventory-d', description='baz', organization=3)
data = self.post(inventories, data=new_inv_not_my_org, expect=403, auth=self.get_normal_credentials())
new_inv_my_org = dict(name='inventory-d', description='baz', organization=1)
data = self.post(inventories, data=new_inv_my_org, expect=201, auth=self.get_normal_credentials())
# a regular user cannot create inventory
new_inv_denied = dict(name='inventory-e', description='glorp', organization=1)
data = self.post(inventories, data=new_inv_denied, expect=403, auth=self.get_other_credentials())
# a super user can add hosts (but inventory ID is required)
inv = Inventory.objects.create(
name = 'test inventory',
organization = self.organizations[0]
)
invalid = dict(name='asdf0.example.com')
new_host_a = dict(name='asdf0.example.com', inventory=inv.pk)
new_host_b = dict(name='asdf1.example.com', inventory=inv.pk)
new_host_c = dict(name='asdf2.example.com', inventory=inv.pk)
new_host_d = dict(name='asdf3.example.com', inventory=inv.pk)
new_host_e = dict(name='asdf4.example.com', inventory=inv.pk)
host_data0 = self.post(hosts, data=invalid, expect=400, auth=self.get_super_credentials())
host_data0 = self.post(hosts, data=new_host_a, expect=201, auth=self.get_super_credentials())
# an org admin can add hosts
host_data1 = self.post(hosts, data=new_host_e, expect=201, auth=self.get_normal_credentials())
# a normal user cannot add hosts
host_data2 = self.post(hosts, data=new_host_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions (on any inventory) can create hosts
edit_perm = Permission.objects.create(
user = self.other_django_user,
inventory = Inventory.objects.get(pk=inv.pk),
permission_type = PERM_INVENTORY_WRITE
)
host_data3 = self.post(hosts, data=new_host_c, expect=201, auth=self.get_other_credentials())
# hostnames must be unique inside an organization
host_data4 = self.post(hosts, data=new_host_c, expect=400, auth=self.get_other_credentials())
###########################################
# GROUPS
invalid = dict(name='web1')
new_group_a = dict(name='web2', inventory=inv.pk)
new_group_b = dict(name='web3', inventory=inv.pk)
new_group_c = dict(name='web4', inventory=inv.pk)
new_group_d = dict(name='web5', inventory=inv.pk)
new_group_e = dict(name='web6', inventory=inv.pk)
groups = '/api/v1/groups/'
data0 = self.post(groups, data=invalid, expect=400, auth=self.get_super_credentials())
data0 = self.post(groups, data=new_group_a, expect=201, auth=self.get_super_credentials())
# an org admin can add hosts
group_data1 = self.post(groups, data=new_group_e, expect=201, auth=self.get_normal_credentials())
# a normal user cannot add hosts
group_data2 = self.post(groups, data=new_group_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions (on any inventory) can create hosts
# already done!
#edit_perm = Permission.objects.create(
# user = self.other_django_user,
# inventory = Inventory.objects.get(pk=inv.pk),
# permission_type = PERM_INVENTORY_WRITE
#)
group_data3 = self.post(groups, data=new_group_c, expect=201, auth=self.get_other_credentials())
# hostnames must be unique inside an organization
group_data4 = self.post(groups, data=new_group_c, expect=400, auth=self.get_other_credentials())
#################################################
# HOSTS->inventories POST via subcollection
url = '/api/v1/inventories/1/hosts/'
new_host_a = dict(name='web100.example.com')
new_host_b = dict(name='web101.example.com')
new_host_c = dict(name='web102.example.com')
new_host_d = dict(name='web103.example.com')
new_host_e = dict(name='web104.example.com')
# a super user can associate hosts with inventories
added_by_collection_a = self.post(url, data=new_host_a, expect=201, auth=self.get_super_credentials())
# an org admin can associate hosts with inventories
added_by_collection_b = self.post(url, data=new_host_b, expect=201, auth=self.get_normal_credentials())
# a normal user cannot associate hosts with inventories
added_by_collection_c = self.post(url, data=new_host_c, expect=403, auth=self.get_nobody_credentials())
# a normal user with edit permission on the inventory can associate hosts with inventories
url5 = '/api/v1/inventories/5/hosts/'
added_by_collection_d = self.post(url5, data=new_host_d, expect=201, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 4)
# now remove the host from inventory (still keeps the record)
added_by_collection_d['disassociate'] = 1
self.post(url5, data=added_by_collection_d, expect=204, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 3)
##################################################
# GROUPS->inventories POST via subcollection
root_groups = '/api/v1/inventories/1/root_groups/'
url = '/api/v1/inventories/1/groups/'
new_group_a = dict(name='web100')
new_group_b = dict(name='web101')
new_group_c = dict(name='web102')
new_group_d = dict(name='web103')
new_group_e = dict(name='web104')
# a super user can associate groups with inventories
added_by_collection = self.post(url, data=new_group_a, expect=201, auth=self.get_super_credentials())
# an org admin can associate groups with inventories
added_by_collection = self.post(url, data=new_group_b, expect=201, auth=self.get_normal_credentials())
# a normal user cannot associate groups with inventories
added_by_collection = self.post(url, data=new_group_c, expect=403, auth=self.get_nobody_credentials())
# a normal user with edit permissions on the inventory can associate groups with inventories
url5 = '/api/v1/inventories/5/groups/'
added_by_collection = self.post(url5, data=new_group_d, expect=201, auth=self.get_other_credentials())
# make sure duplicates give 400s
self.post(url5, data=new_group_d, expect=400, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 4)
# side check: see if root groups URL is operational. These are groups without parents.
root_groups = self.get(root_groups, expect=200, auth=self.get_super_credentials())
self.assertEquals(root_groups['count'], 2)
remove_me = added_by_collection
remove_me['disassociate'] = 1
self.post(url5, data=remove_me, expect=204, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 3)
###################################################
# VARIABLES
vars_a = dict(asdf=1234, dog='fido', cat='fluffy', unstructured=dict(a=[1,2,3],b=dict(x=2,y=3)))
vars_b = dict(asdf=4321, dog='barky', cat='snarf', unstructured=dict(a=[1,2,3],b=dict(x=2,y=3)))
vars_c = dict(asdf=5555, dog='mouse', cat='mogwai', unstructured=dict(a=[3,0,3],b=dict(z=2600)))
# attempting to get a variable object creates it, even though it does not already exist
vdata_url = "/api/v1/hosts/%s/variable_data/" % (added_by_collection_a['id'])
got = self.get(vdata_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, dict())
# super user can create variable objects
# an org admin can create variable objects (defers to inventory permissions)
got = self.put(vdata_url, data=vars_a, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, vars_a)
# verify that we can update things and get them back
got = self.put(vdata_url, data=vars_c, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, vars_c)
got = self.get(vdata_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, vars_c)
# a normal user cannot edit variable objects
self.put(vdata_url, data=vars_a, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory write permissions can edit variable objects
vdata_url = "/api/v1/hosts/1/variable_data/"
got = self.put(vdata_url, data=vars_b, expect=200, auth=self.get_normal_credentials())
self.assertEquals(got, vars_b)
# this URL is not one end users will use, but is what you get back from a put
# as a result, it also needs to be access controlled and working. You will not
# be able to put to it.
backend_url = '/api/v1/variable_data/1/'
got = self.get(backend_url, expect=200, auth=self.get_normal_credentials())
got = self.put(backend_url, data=dict(), expect=403, auth=self.get_super_credentials())
###################################################
# VARIABLES -> GROUPS
vars_a = dict(asdf=7777, dog='droopy', cat='battlecat', unstructured=dict(a=[1,1,1],b=dict(x=1,y=2)))
vars_b = dict(asdf=8888, dog='snoopy', cat='cheshire', unstructured=dict(a=[2,2,2],b=dict(x=3,y=4)))
vars_c = dict(asdf=9999, dog='pluto', cat='five', unstructured=dict(a=[3,3,3],b=dict(z=5)))
groups = Group.objects.all()
vdata1_url = "/api/v1/groups/%s/variable_data/" % (groups[0].pk)
vdata2_url = "/api/v1/groups/%s/variable_data/" % (groups[1].pk)
# a super user can associate variable objects with groups
got = self.get(vdata1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(got, {})
put = self.put(vdata1_url, data=vars_a, expect=200, auth=self.get_super_credentials())
self.assertEquals(put, vars_a)
# an org admin can associate variable objects with groups
put = self.put(vdata1_url, data=vars_b, expect=200, auth=self.get_normal_credentials())
# a normal user cannot associate variable objects with groups
put = self.put(vdata1_url, data=vars_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions can associate variable objects with groups
put = self.put(vdata1_url, data=vars_c, expect=200, auth=self.get_normal_credentials())
self.assertEquals(put, vars_c)
####################################################
# ADDING HOSTS TO GROUPS
groups = Group.objects.all()
hosts = Host.objects.all()
groups[0].hosts.add(Host.objects.get(pk=1))
groups[0].hosts.add(Host.objects.get(pk=3))
groups[0].save()
# access
url1 = '/api/v1/groups/1/hosts/'
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 2)
self.assertEquals(data['results'][0]['id'], 1)
self.assertEquals(data['results'][1]['id'], 3)
# addition
got = self.get('/api/v1/hosts/2/', expect=200, auth=self.get_normal_credentials())
self.assertEquals(got['id'], 2)
posted = self.post('/api/v1/groups/1/hosts/', data=got, expect=204, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 3)
self.assertEquals(data['results'][1]['id'], 2)
# now add one new completely new host, to test creation+association in one go
new_host = dict(inventory=got['inventory'], name='completelynewhost.example.com', description='...')
posted = self.post('/api/v1/groups/1/hosts/', data=new_host, expect=201, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 4)
# removal
got['disassociate'] = 1
posted = self.post('/api/v1/groups/1/hosts/', data=got, expect=204, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 3)
self.assertEquals(data['results'][1]['id'], 3)
####################################################
# SUBGROUPS
groups = Group.objects.all()
# just some more groups for kicks
inv = Inventory.objects.get(pk=1)
Group.objects.create(name='group-X1', inventory=inv)
Group.objects.create(name='group-X2', inventory=inv)
Group.objects.create(name='group-X3', inventory=inv)
Group.objects.create(name='group-X4', inventory=inv)
Group.objects.create(name='group-X5', inventory=inv)
Permission.objects.create(
inventory = inv,
user = self.other_django_user,
permission_type = PERM_INVENTORY_WRITE
)
# data used for testing listing all hosts that are transitive members of a group
g2 = Group.objects.get(pk=2)
nh = Host.objects.create(name='newhost.example.com', inventory=inv, created_by=User.objects.get(pk=1))
g2.hosts.add(nh)
g2.save()
# a super user can set subgroups
subgroups_url = '/api/v1/groups/1/children/'
child_url = '/api/v1/groups/2/'
subgroups_url2 = '/api/v1/groups/3/children/'
subgroups_url3 = '/api/v1/groups/4/children/'
subgroups_url4 = '/api/v1/groups/5/children/'
got = self.get(child_url, expect=200, auth=self.get_super_credentials())
self.post(subgroups_url, data=got, expect=204, auth=self.get_super_credentials())
kids = Group.objects.get(pk=1).children.all()
self.assertEqual(len(kids), 1)
checked = self.get(subgroups_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(checked['count'], 1)
# an org admin can set subgroups
posted = self.post(subgroups_url2, data=got, expect=204, auth=self.get_normal_credentials())
# see if we can post a completely new subgroup
new_data = dict(inventory=5, name='completely new', description='blarg?')
kids = self.get(subgroups_url2, expect=200, auth=self.get_normal_credentials())
self.assertEqual(kids['count'], 1)
posted2 = self.post(subgroups_url2, data=new_data, expect=201, auth=self.get_normal_credentials())
with_one_more_kid = self.get(subgroups_url2, expect=200, auth=self.get_normal_credentials())
self.assertEqual(with_one_more_kid['count'], 2)
# double post causes conflict error (actually, should it? -- just got a 204, already associated)
# self.post(subgroups_url2, data=got, expect=409, auth=self.get_normal_credentials())
checked = self.get(subgroups_url2, expect=200, auth=self.get_normal_credentials())
# a normal user cannot set subgroups
self.post(subgroups_url3, data=got, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions can associate subgroups
self.post(subgroups_url3, data=got, expect=204, auth=self.get_other_credentials())
checked = self.get(subgroups_url3, expect=200, auth=self.get_normal_credentials())
self.assertEqual(checked['count'], 1)
# slight detour
# can see all hosts under a group, even if it has subgroups
# this URL is NOT postable
all_hosts = '/api/v1/groups/1/all_hosts/'
self.assertEqual(Group.objects.get(pk=1).hosts.count(), 3)
data = self.get(all_hosts, expect=200, auth=self.get_normal_credentials())
self.post(all_hosts, data=dict(id=123456, msg='spam'), expect=405, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 4)
# now post it back to remove it, by adding the disassociate bit
result = checked['results'][0]
result['disassociate'] = 1
self.post(subgroups_url3, data=result, expect=204, auth=self.get_other_credentials())
checked = self.get(subgroups_url3, expect=200, auth=self.get_normal_credentials())
self.assertEqual(checked['count'], 0)
# try to double disassociate to see what happens (should no-op)
self.post(subgroups_url3, data=result, expect=204, auth=self.get_other_credentials())
#########################################################
# FIXME: TAGS
# the following objects can be tagged and the tags can be read
# inventory
# host records
# group records
# variable records
# this may just be in a seperate test file called 'tags'
#########################################################
# FIXME: RELATED FIELDS
# on an inventory resource, I can see related resources for hosts and groups and permissions
# and these work
# on a host resource, I can see related resources variables and inventories
# and these work
# on a group resource, I can see related resources for variables, inventories, and children
# and these work

View File

@@ -0,0 +1,968 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import datetime
import json
from django.contrib.auth.models import User as DjangoUser
from django.core.urlresolvers import reverse
from django.db import transaction
import django.test
from django.test.client import Client
from django.test.utils import override_settings
from ansibleworks.main.models import *
from ansibleworks.main.tests.base import BaseTestMixin
__all__ = ['JobTemplateTest', 'JobTest', 'JobStartCancelTest']
TEST_PLAYBOOK = '''- hosts: all
gather_facts: false
tasks:
- name: woohoo
command: test 1 = 1
'''
class BaseJobTestMixin(BaseTestMixin):
''''''
def _create_inventory(self, name, organization, created_by,
groups_hosts_dict):
'''Helper method for creating inventory with groups and hosts.'''
inventory = organization.inventories.create(
name=name,
created_by=created_by,
)
for group_name, host_names in groups_hosts_dict.items():
group = inventory.groups.create(
name=group_name,
created_by=created_by,
)
for host_name in host_names:
host = inventory.hosts.create(
name=host_name,
created_by=created_by,
)
group.hosts.add(host)
return inventory
def populate(self):
# Here's a little story about the Ansible Bread Company, or ABC. They
# make machines that make bread - bakers, slicers, and packagers - and
# these machines are each controlled by a Linux boxes, which is in turn
# managed by Ansible Commander.
# Sue is the super user. You don't mess with Sue or you're toast. Ha.
self.user_sue = self.make_user('sue', super_user=True)
# There are three organizations in ABC using Ansible, since it's the
# best thing for dev ops automation since, well, sliced bread.
# Engineering - They design and build the machines.
self.org_eng = Organization.objects.create(
name='engineering',
created_by=self.user_sue,
)
# Support - They fix it when it's not working.
self.org_sup = Organization.objects.create(
name='support',
created_by=self.user_sue,
)
# Operations - They implement the production lines using the machines.
self.org_ops = Organization.objects.create(
name='operations',
created_by=self.user_sue,
)
# Alex is Sue's IT assistant who can also administer all of the
# organizations.
self.user_alex = self.make_user('alex')
self.org_eng.admins.add(self.user_alex)
self.org_sup.admins.add(self.user_alex)
self.org_ops.admins.add(self.user_alex)
# Bob is the head of engineering. He's an admin for engineering, but
# also a user within the operations organization (so he can see the
# results if things go wrong in production).
self.user_bob = self.make_user('bob')
self.org_eng.admins.add(self.user_bob)
self.org_ops.users.add(self.user_bob)
# Chuck is the lead engineer. He has full reign over engineering, but
# no other organizations.
self.user_chuck = self.make_user('chuck')
self.org_eng.admins.add(self.user_chuck)
# Doug is the other engineer working under Chuck. He can write
# playbooks and check them, but Chuck doesn't quite think he's ready to
# run them yet. Poor Doug.
self.user_doug = self.make_user('doug')
self.org_eng.users.add(self.user_doug)
# Eve is the head of support. She can also see what goes on in
# operations to help them troubleshoot problems.
self.user_eve = self.make_user('eve')
self.org_sup.admins.add(self.user_eve)
self.org_ops.users.add(self.user_eve)
# Frank is the other support guy.
self.user_frank = self.make_user('frank')
self.org_sup.users.add(self.user_frank)
# Greg is the head of operations.
self.user_greg = self.make_user('greg')
self.org_ops.admins.add(self.user_greg)
# Holly is an operations engineer.
self.user_holly = self.make_user('holly')
self.org_ops.users.add(self.user_holly)
# Iris is another operations engineer.
self.user_iris = self.make_user('iris')
self.org_ops.users.add(self.user_iris)
# Jim is the intern. He can login, but can't do anything quite yet
# except make everyone else fresh coffee.
self.user_jim = self.make_user('jim')
# There are three main projects, one each for the development, test and
# production branches of the playbook repository. All three orgs can
# use the production branch, support can use the production and testing
# branches, and operations can only use the production branch.
self.proj_dev = self.make_project('dev', 'development branch',
self.user_sue, TEST_PLAYBOOK)
self.org_eng.projects.add(self.proj_dev)
self.proj_test = self.make_project('test', 'testing branch',
self.user_sue, TEST_PLAYBOOK)
self.org_eng.projects.add(self.proj_test)
self.org_sup.projects.add(self.proj_test)
self.proj_prod = self.make_project('prod', 'production branch',
self.user_sue, TEST_PLAYBOOK)
self.org_eng.projects.add(self.proj_prod)
self.org_sup.projects.add(self.proj_prod)
self.org_ops.projects.add(self.proj_prod)
# Operations also has 2 additional projects specific to the east/west
# production environments.
self.proj_prod_east = self.make_project('prod-east',
'east production branch',
self.user_sue, TEST_PLAYBOOK)
self.org_ops.projects.add(self.proj_prod_east)
self.proj_prod_west = self.make_project('prod-west',
'west production branch',
self.user_sue, TEST_PLAYBOOK)
self.org_ops.projects.add(self.proj_prod_west)
# The engineering organization has a set of servers to use for
# development and testing (2 bakers, 1 slicer, 1 packager).
self.inv_eng = self._create_inventory(
name='engineering environment',
organization=self.org_eng,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['eng-baker1', 'eng-baker2'],
'slicers': ['eng-slicer1'],
'packagers': ['eng-packager1'],
},
)
# The support organization has a set of servers to use for
# testing and reproducing problems from operations (1 baker, 1 slicer,
# 1 packager).
self.inv_sup = self._create_inventory(
name='support environment',
organization=self.org_sup,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['sup-baker1'],
'slicers': ['sup-slicer1'],
'packagers': ['sup-packager1'],
},
)
# The operations organization manages multiple sets of servers for the
# east and west production facilities.
self.inv_ops_east = self._create_inventory(
name='east production environment',
organization=self.org_ops,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['east-baker%d' % n for n in range(1, 4)],
'slicers': ['east-slicer%d' % n for n in range(1, 3)],
'packagers': ['east-packager%d' % n for n in range(1, 3)],
},
)
self.inv_ops_west = self._create_inventory(
name='west production environment',
organization=self.org_ops,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['west-baker%d' % n for n in range(1, 6)],
'slicers': ['west-slicer%d' % n for n in range(1, 4)],
'packagers': ['west-packager%d' % n for n in range(1, 3)],
},
)
# Operations is divided into teams to work on the east/west servers.
# Greg and Holly work on east, Greg and iris work on west.
self.team_ops_east = self.org_ops.teams.create(
name='easterners',
created_by=self.user_sue,
)
self.team_ops_east.projects.add(self.proj_prod)
self.team_ops_east.projects.add(self.proj_prod_east)
self.team_ops_east.users.add(self.user_greg)
self.team_ops_east.users.add(self.user_holly)
self.team_ops_west = self.org_ops.teams.create(
name='westerners',
created_by=self.user_sue,
)
self.team_ops_west.projects.add(self.proj_prod)
self.team_ops_west.projects.add(self.proj_prod_west)
self.team_ops_west.users.add(self.user_greg)
self.team_ops_west.users.add(self.user_iris)
# Each user has his/her own set of credentials.
from ansibleworks.main.tests.tasks import (TEST_SSH_KEY_DATA,
TEST_SSH_KEY_DATA_LOCKED,
TEST_SSH_KEY_DATA_UNLOCK)
self.cred_bob = self.user_bob.credentials.create(
ssh_username='bob',
ssh_password='ASK',
created_by=self.user_sue,
)
self.cred_chuck = self.user_chuck.credentials.create(
ssh_username='chuck',
ssh_key_data=TEST_SSH_KEY_DATA,
created_by=self.user_sue,
)
self.cred_doug = self.user_doug.credentials.create(
ssh_username='doug',
ssh_password='doug doesn\'t mind his password being saved. this '
'is why we dont\'t let doug actually run jobs.',
created_by=self.user_sue,
)
self.cred_eve = self.user_eve.credentials.create(
ssh_username='eve',
ssh_password='ASK',
sudo_username='root',
sudo_password='ASK',
created_by=self.user_sue,
)
self.cred_frank = self.user_frank.credentials.create(
ssh_username='frank',
ssh_password='fr@nk the t@nk',
created_by=self.user_sue,
)
self.cred_greg = self.user_greg.credentials.create(
ssh_username='greg',
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock='ASK',
created_by=self.user_sue,
)
self.cred_holly = self.user_holly.credentials.create(
ssh_username='holly',
ssh_password='holly rocks',
created_by=self.user_sue,
)
self.cred_iris = self.user_iris.credentials.create(
ssh_username='iris',
ssh_password='ASK',
created_by=self.user_sue,
)
# Each operations team also has shared credentials they can use.
self.cred_ops_east = self.team_ops_east.credentials.create(
ssh_username='east',
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK,
created_by = self.user_sue,
)
self.cred_ops_west = self.team_ops_west.credentials.create(
ssh_username='west',
ssh_password='Heading270',
created_by = self.user_sue,
)
# FIXME: Define explicit permissions for tests.
# other django user is on the project team and can deploy
#self.permission1 = Permission.objects.create(
# inventory = self.inventory,
# project = self.project,
# team = self.team,
# permission_type = PERM_INVENTORY_DEPLOY,
# created_by = self.normal_django_user
#)
# individual permission granted to other2 user, can run check mode
#self.permission2 = Permission.objects.create(
# inventory = self.inventory,
# project = self.project,
# user = self.other2_django_user,
# permission_type = PERM_INVENTORY_CHECK,
# created_by = self.normal_django_user
#)
# Engineering has job templates to check/run the dev project onto
# their own inventory.
self.jt_eng_check = JobTemplate.objects.create(
name='eng-dev-check',
job_type='check',
inventory= self.inv_eng,
project=self.proj_dev,
playbook=self.proj_dev.playbooks[0],
created_by=self.user_sue,
)
self.job_eng_check = self.jt_eng_check.create_job(
created_by=self.user_sue,
credential=self.cred_doug,
)
self.jt_eng_run = JobTemplate.objects.create(
name='eng-dev-run',
job_type='run',
inventory= self.inv_eng,
project=self.proj_dev,
playbook=self.proj_dev.playbooks[0],
created_by=self.user_sue,
)
self.job_eng_run = self.jt_eng_run.create_job(
created_by=self.user_sue,
credential=self.cred_chuck,
)
# Support has job templates to check/run the test project onto
# their own inventory.
self.jt_sup_check = JobTemplate.objects.create(
name='sup-test-check',
job_type='check',
inventory= self.inv_sup,
project=self.proj_test,
playbook=self.proj_test.playbooks[0],
created_by=self.user_sue,
)
self.job_sup_check = self.jt_sup_check.create_job(
created_by=self.user_sue,
credential=self.cred_frank,
)
self.jt_sup_run = JobTemplate.objects.create(
name='sup-test-run',
job_type='run',
inventory= self.inv_sup,
project=self.proj_test,
playbook=self.proj_test.playbooks[0],
created_by=self.user_sue,
)
self.job_sup_run = self.jt_sup_run.create_job(
created_by=self.user_sue,
credential=self.cred_eve,
)
# Operations has job templates to check/run the prod project onto
# both east and west inventories, by default using the team credential.
self.jt_ops_east_check = JobTemplate.objects.create(
name='ops-east-prod-check',
job_type='check',
inventory= self.inv_ops_east,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_east,
created_by=self.user_sue,
)
self.job_ops_east_check = self.jt_ops_east_check.create_job(
created_by=self.user_sue,
)
self.jt_ops_east_run = JobTemplate.objects.create(
name='ops-east-prod-run',
job_type='run',
inventory= self.inv_ops_east,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_east,
created_by=self.user_sue,
)
self.job_ops_east_run = self.jt_ops_east_run.create_job(
created_by=self.user_sue,
)
self.jt_ops_west_check = JobTemplate.objects.create(
name='ops-west-prod-check',
job_type='check',
inventory= self.inv_ops_west,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_west,
created_by=self.user_sue,
)
self.job_ops_west_check = self.jt_ops_west_check.create_job(
created_by=self.user_sue,
)
self.jt_ops_west_run = JobTemplate.objects.create(
name='ops-west-prod-run',
job_type='run',
inventory= self.inv_ops_west,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_west,
created_by=self.user_sue,
)
self.job_ops_west_run = self.jt_ops_west_run.create_job(
created_by=self.user_sue,
)
def setUp(self):
super(BaseJobTestMixin, self).setUp()
self.populate()
def _test_invalid_creds(self, url, data=None, methods=None):
data = data or {}
methods = methods or ('options', 'head', 'get')
for auth in [(None,), ('invalid', 'password')]:
with self.current_user(*auth):
for method in methods:
f = getattr(self, method)
if method in ('post', 'put', 'patch'):
f(url, data, expect=401)
else:
f(url, expect=401)
class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
def test_get_job_template_list(self):
url = reverse('main:job_template_list')
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
# sue's credentials (superuser) == 200, full list
with self.current_user(self.user_sue):
self.options(url)
self.head(url)
response = self.get(url)
qs = JobTemplate.objects.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# FIXME: Check individual job template result fields.
# alex's credentials (admin of all orgs) == 200, full list
with self.current_user(self.user_alex):
self.options(url)
self.head(url)
response = self.get(url)
qs = JobTemplate.objects.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# bob's credentials (admin of eng, user of ops) == 200, all from
# engineering and operations.
with self.current_user(self.user_bob):
self.options(url)
self.head(url)
response = self.get(url)
qs = JobTemplate.objects.filter(
inventory__organization__in=[self.org_eng, self.org_ops],
)
#self.check_pagination_and_size(response, qs.count())
#self.check_list_ids(response, qs)
# FIXME: Check with other credentials.
def test_post_job_template_list(self):
url = reverse('main:job_template_list')
data = dict(
name = 'new job template',
job_type = PERM_INVENTORY_DEPLOY,
inventory = self.inv_eng.pk,
project = self.proj_dev.pk,
playbook = self.proj_dev.playbooks[0],
)
# Test with no auth and with invalid login.
self._test_invalid_creds(url, data, methods=('post',))
# sue can always add job templates.
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
detail_url = reverse('main:job_template_detail',
args=(response['id'],))
self.assertEquals(response['url'], detail_url)
# Check that all fields provided were set.
jt = JobTemplate.objects.get(pk=response['id'])
self.assertEqual(jt.name, data['name'])
self.assertEqual(jt.job_type, data['job_type'])
self.assertEqual(jt.inventory.pk, data['inventory'])
self.assertEqual(jt.credential, None)
self.assertEqual(jt.project.pk, data['project'])
self.assertEqual(jt.playbook, data['playbook'])
# Test that all required fields are really required.
data['name'] = 'another new job template'
for field in ('name', 'job_type', 'inventory', 'project', 'playbook'):
with self.current_user(self.user_sue):
d = dict(data.items())
d.pop(field)
response = self.post(url, d, expect=400)
self.assertTrue(field in response,
'no error for field "%s" in response' % field)
# Test invalid value for job_type.
with self.current_user(self.user_sue):
d = dict(data.items())
d['job_type'] = 'world domination'
response = self.post(url, d, expect=400)
self.assertTrue('job_type' in response)
# Test playbook not in list of project playbooks.
with self.current_user(self.user_sue):
d = dict(data.items())
d['playbook'] = 'no_playbook_here.yml'
response = self.post(url, d, expect=400)
self.assertTrue('playbook' in response)
# FIXME: Check other credentials and optional fields.
def test_get_job_template_detail(self):
jt = self.jt_eng_run
url = reverse('main:job_template_detail', args=(jt.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
# sue can read the job template detail.
with self.current_user(self.user_sue):
self.options(url)
self.head(url)
response = self.get(url)
self.assertEqual(response['url'], url)
# FIXME: Check other credentials and optional fields.
# TODO: add more tests that show
# the method used to START a JobTemplate follow the exact same permissions as those to create it ...
# and that jobs come back nicely serialized with related resources and so on ...
# that we can drill all the way down and can get at host failure lists, etc ...
def test_put_job_template_detail(self):
jt = self.jt_eng_run
url = reverse('main:job_template_detail', args=(jt.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url, methods=('put',))# 'patch'))
# sue can update the job template detail.
with self.current_user(self.user_sue):
data = self.get(url)
data['name'] = '%s-updated' % data['name']
response = self.put(url, data)
#patch_data = dict(name='%s-changed' % data['name'])
#response = self.patch(url, patch_data)
# FIXME: Check other credentials and optional fields.
def test_get_job_template_job_list(self):
jt = self.jt_eng_run
url = reverse('main:job_template_job_list', args=(jt.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
# sue can read the job template job list.
with self.current_user(self.user_sue):
self.options(url)
self.head(url)
response = self.get(url)
qs = jt.jobs.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# FIXME: Check other credentials and optional fields.
def test_post_job_template_job_list(self):
jt = self.jt_eng_run
url = reverse('main:job_template_job_list', args=(jt.pk,))
data = dict(
name='new job from template',
credential=self.cred_bob.pk,
)
# Test with no auth and with invalid login.
self._test_invalid_creds(url, data, methods=('post',))
# sue can create a new job from the template.
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
# FIXME: Check other credentials and optional fields.
class JobTest(BaseJobTestMixin, django.test.TestCase):
def test_get_job_list(self):
url = reverse('main:job_list')
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
# sue's credentials (superuser) == 200, full list
with self.current_user(self.user_sue):
self.options(url)
self.head(url)
response = self.get(url)
qs = Job.objects.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# FIXME: Check individual job result fields.
# FIXME: Check with other credentials.
def test_post_job_list(self):
url = reverse('main:job_list')
data = dict(
name='new job without template',
job_type=PERM_INVENTORY_DEPLOY,
inventory=self.inv_ops_east.pk,
project=self.proj_prod.pk,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_east.pk,
)
# Test with no auth and with invalid login.
self._test_invalid_creds(url, data, methods=('post',))
# sue can create a new job without a template.
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
# sue can also create a job here from a template.
jt = self.jt_ops_east_run
data = dict(
name='new job from template',
job_template=jt.pk,
)
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
# FIXME: Check with other credentials and optional fields.
def test_get_job_detail(self):
job = self.job_ops_east_run
url = reverse('main:job_detail', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
# sue can read the job detail.
with self.current_user(self.user_sue):
self.options(url)
self.head(url)
response = self.get(url)
self.assertEqual(response['url'], url)
# FIXME: Check with other credentials and optional fields.
def test_put_job_detail(self):
job = self.job_ops_west_run
url = reverse('main:job_detail', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url, methods=('put',))# 'patch'))
# sue can update the job detail only if the job is new.
self.assertEqual(job.status, 'new')
with self.current_user(self.user_sue):
data = self.get(url)
data['name'] = '%s-updated' % data['name']
response = self.put(url, data)
#patch_data = dict(name='%s-changed' % data['name'])
#response = self.patch(url, patch_data)
# sue cannot update the job detail if it is in any other state.
for status in ('pending', 'running', 'successful', 'failed', 'error',
'canceled'):
job.status = status
job.save()
with self.current_user(self.user_sue):
data = self.get(url)
data['name'] = '%s-updated' % data['name']
self.put(url, data, expect=405)
#patch_data = dict(name='%s-changed' % data['name'])
#self.patch(url, patch_data, expect=405)
# FIXME: Check with other credentials and readonly fields.
def _test_mainline(self):
url = reverse('main:job_list')
# job templates
data = self.get('/api/v1/job_templates/', expect=401)
data = self.get('/api/v1/job_templates/', expect=200, auth=self.get_normal_credentials())
self.assertTrue(data['count'], 2)
rec = dict(
name = 'job-foo',
credential = self.credential.pk,
inventory = self.inventory.pk,
project = self.project.pk,
job_type = PERM_INVENTORY_DEPLOY
)
# org admin can add job type
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_normal_credentials())
self.assertEquals(posted['url'], '/api/v1/job_templates/3/')
# other_django_user is on a team that can deploy, so can create both deploy and check type jobs
rec['name'] = 'job-foo2'
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other_credentials())
rec['name'] = 'job-foo3'
rec['job_type'] = PERM_INVENTORY_CHECK
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other_credentials())
# other2_django_user has individual permissions to run check mode, but not deploy
# nobody user can't even run check mode
rec['name'] = 'job-foo4'
self.post('/api/v1/job_templates/', rec, expect=403, auth=self.get_nobody_credentials())
rec['credential'] = self.credential2.pk
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other2_credentials())
rec['name'] = 'job-foo5'
rec['job_type'] = PERM_INVENTORY_DEPLOY
self.post('/api/v1/job_templates/', rec, expect=403, auth=self.get_nobody_credentials())
self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other2_credentials())
url = posted['url']
# verify we can also get the job template record
got = self.get(url, expect=200, auth=self.get_other2_credentials())
self.failUnlessEqual(got['url'], '/api/v1/job_templates/6/')
# TODO: add more tests that show
# the method used to START a JobTemplate follow the exact same permissions as those to create it ...
# and that jobs come back nicely serialized with related resources and so on ...
# that we can drill all the way down and can get at host failure lists, etc ...
# Need to disable transaction middleware for testing so that the callback
# management command will be able to read the database changes made to start
# the job. It won't be an issue normally, because the task will be running
# asynchronously; the start API call will update the database, queue the task,
# then return immediately (committing the transaction) before celery has even
# woken up to run the new task.
MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'),
settings.MIDDLEWARE_CLASSES)
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local',
MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES)
class JobStartCancelTest(BaseJobTestMixin, django.test.TransactionTestCase):
'''Job API tests that need to use the celery task backend.'''
def setUp(self):
super(JobStartCancelTest, self).setUp()
# Pass test database name in environment for use by the inventory script.
os.environ['ACOM_TEST_DATABASE_NAME'] = settings.DATABASES['default']['NAME']
def tearDown(self):
super(JobStartCancelTest, self).tearDown()
os.environ.pop('ACOM_TEST_DATABASE_NAME', None)
def test_job_start(self):
job = self.job_ops_east_run
url = reverse('main:job_start', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self._test_invalid_creds(url, methods=('post',))
# Sue can start a job (when passwords are already saved) as long as the
# status is new. Reverse list so "new" will be last.
for status in reversed([x[0] for x in Job.STATUS_CHOICES]):
job.status = status
job.save()
with self.current_user(self.user_sue):
response = self.get(url)
if status == 'new':
self.assertTrue(response['can_start'])
self.assertFalse(response['passwords_needed_to_start'])
response = self.post(url, {}, expect=202)
job = Job.objects.get(pk=job.pk)
self.assertEqual(job.status, 'successful')
else:
self.assertFalse(response['can_start'])
response = self.post(url, {}, expect=405)
# Test with a job that prompts for SSH and sudo passwords.
job = self.job_sup_run
url = reverse('main:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_password', 'sudo_password']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_password'] = 'sshpass'
response = self.post(url, data, expect=400)
data2 = dict(sudo_password='sudopass')
response = self.post(url, data2, expect=400)
data.update(data2)
response = self.post(url, data, expect=202)
job = Job.objects.get(pk=job.pk)
# FIXME: Test run gets the following error in this case:
# fatal: [hostname] => sudo output closed while waiting for password prompt:
#self.assertEqual(job.status, 'successful')
# Test with a job that prompts for SSH unlock key, given the wrong key.
job = self.jt_ops_west_run.create_job(
credential=self.cred_greg,
created_by=self.user_sue,
)
url = reverse('main:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_key_unlock']))
data = dict()
response = self.post(url, data, expect=400)
# The job should start but fail.
data['ssh_key_unlock'] = 'sshunlock'
response = self.post(url, data, expect=202)
job = Job.objects.get(pk=job.pk)
self.assertEqual(job.status, 'failed')
# Test with a job that prompts for SSH unlock key, given the right key.
from ansibleworks.main.tests.tasks import TEST_SSH_KEY_DATA_UNLOCK
job = self.jt_ops_west_run.create_job(
credential=self.cred_greg,
created_by=self.user_sue,
)
url = reverse('main:job_start', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertTrue(response['can_start'])
self.assertEqual(set(response['passwords_needed_to_start']),
set(['ssh_key_unlock']))
data = dict()
response = self.post(url, data, expect=400)
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
response = self.post(url, data, expect=202)
job = Job.objects.get(pk=job.pk)
self.assertEqual(job.status, 'successful')
# FIXME: Test with other users, test when passwords are required.
def test_job_cancel(self):
job = self.job_ops_east_run
url = reverse('main:job_cancel', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self._test_invalid_creds(url, methods=('post',))
# sue can cancel the job, but only when it is pending or running.
for status in [x[0] for x in Job.STATUS_CHOICES]:
job.status = status
job.save()
with self.current_user(self.user_sue):
response = self.get(url)
if status in ('pending', 'running'):
self.assertTrue(response['can_cancel'])
response = self.post(url, {}, expect=202)
else:
self.assertFalse(response['can_cancel'])
response = self.post(url, {}, expect=405)
# FIXME: Test with other users.
def test_get_job_results(self):
# Start/run a job and then access its results via the API.
job = self.job_ops_east_run
job.start()
# Check that the job detail has been updated.
url = reverse('main:job_detail', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
self.assertEqual(response['status'], 'successful')
self.assertTrue(response['result_stdout'])
# Test job events for completed job.
url = reverse('main:job_job_event_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test individual job event detail records.
host_ids = set()
for job_event in job.job_events.all():
if job_event.host:
host_ids.add(job_event.host.pk)
url = reverse('main:job_event_detail', args=(job_event.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# Also test job event list for each host.
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('main:host_job_event_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job event list for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('main:group_job_event_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_events.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test global job event list.
url = reverse('main:job_event_list')
with self.current_user(self.user_sue):
response = self.get(url)
qs = JobEvent.objects.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job host summaries for completed job.
url = reverse('main:job_job_host_summary_list', args=(job.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = job.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Every host referenced by a job_event should be present as a job
# host summary record.
self.assertEqual(host_ids,
set(qs.values_list('host__pk', flat=True)))
# Test individual job host summary records.
for job_host_summary in job.job_host_summaries.all():
url = reverse('main:job_host_summary_detail',
args=(job_host_summary.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
# Test job host summaries for each host.
for host in Host.objects.filter(pk__in=host_ids):
url = reverse('main:host_job_host_summary_list', args=(host.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = host.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Test job host summaries for groups.
for group in self.inv_ops_east.groups.all():
url = reverse('main:group_job_host_summary_list', args=(group.pk,))
with self.current_user(self.user_sue):
response = self.get(url)
qs = group.job_host_summaries.all()
self.assertTrue(qs.count())
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)

View File

@@ -0,0 +1,375 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import datetime
import json
from django.contrib.auth.models import User as DjangoUser
from django.core.urlresolvers import reverse
import django.test
from django.test.client import Client
from ansibleworks.main.models import *
from ansibleworks.main.tests.base import BaseTest
class OrganizationsTest(BaseTest):
def collection(self):
return '/api/v1/organizations/'
def setUp(self):
super(OrganizationsTest, self).setUp()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 10)
self.projects = self.make_projects(self.normal_django_user, 10)
# add projects to organizations in a more or less arbitrary way
for project in self.projects[0:2]:
self.organizations[0].projects.add(project)
for project in self.projects[3:8]:
self.organizations[1].projects.add(project)
for project in self.projects[9:10]:
self.organizations[2].projects.add(project)
self.organizations[0].projects.add(self.projects[-1])
self.organizations[9].projects.add(self.projects[-2])
# get the URL for various organization records
self.a_detail_url = "%s%s" % (self.collection(), self.organizations[0].pk)
self.b_detail_url = "%s%s" % (self.collection(), self.organizations[1].pk)
self.c_detail_url = "%s%s" % (self.collection(), self.organizations[2].pk)
# configuration:
# admin_user is an admin and regular user in all organizations
# other_user is all organizations
# normal_user is a user in organization 0, and an admin of organization 1
for x in self.organizations:
# NOTE: superuser does not have to be explicitly added to admin group
# x.admins.add(self.super_django_user)
x.users.add(self.super_django_user)
self.organizations[0].users.add(self.normal_django_user)
self.organizations[1].admins.add(self.normal_django_user)
def test_get_list(self):
url = reverse('main:organizations_list')
# no credentials == 401
self.options(url, expect=401)
self.head(url, expect=401)
self.get(url, expect=401)
# wrong credentials == 401
with self.current_user(self.get_invalid_credentials()):
self.options(url, expect=401)
self.head(url, expect=401)
self.get(url, expect=401)
# superuser credentials == 200, full list
with self.current_user(self.super_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 10, previous=None, next=None)
self.assertEqual(len(response['results']),
Organization.objects.count())
for field in ['id', 'url', 'name', 'description', 'created']:
self.assertTrue(field in response['results'][0],
'field %s not in result' % field)
# check that the related URL functionality works
related = response['results'][0]['related']
for x in [ 'audit_trail', 'projects', 'users', 'admins', 'tags' ]:
self.assertTrue(x in related and related[x].endswith("/%s/" % x), "looking for %s in related" % x)
# normal credentials == 200, get only organizations of which user is a member
with self.current_user(self.normal_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 2, previous=None, next=None)
# no admin rights? get empty list
with self.current_user(self.other_django_user):
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 0, previous=None, next=None)
def test_get_item(self):
# first get all the URLs
data = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
urls = [item['url'] for item in data['results']]
# make sure super user can fetch records
data = self.get(urls[0], expect=200, auth=self.get_super_credentials())
[self.assertTrue(key in data) for key in ['name', 'description', 'url' ]]
# make sure invalid user cannot
data = self.get(urls[0], expect=401, auth=self.get_invalid_credentials())
# normal user should be able to get org 0 and org 1 but not org 9 (as he's not a user or admin of it)
data = self.get(urls[0], expect=200, auth=self.get_normal_credentials())
data = self.get(urls[1], expect=200, auth=self.get_normal_credentials())
data = self.get(urls[9], expect=403, auth=self.get_normal_credentials())
# other user isn't a user or admin of anything, and similarly can't get in
data = self.get(urls[0], expect=403, auth=self.get_other_credentials())
def test_get_item_subobjects_projects(self):
# first get all the orgs
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
# find projects attached to the first org
projects0_url = orgs['results'][0]['related']['projects']
projects1_url = orgs['results'][1]['related']['projects']
projects9_url = orgs['results'][9]['related']['projects']
self.get(projects0_url, expect=401, auth=None)
self.get(projects0_url, expect=401, auth=self.get_invalid_credentials())
# normal user is just a member of the first org, but can't see any projects under the org
projects0a = self.get(projects0_url, expect=403, auth=self.get_normal_credentials())
# however in the second org, he's an admin and should see all of them
projects1a = self.get(projects1_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(projects1a['count'], 5)
# but the non-admin cannot access the list of projects in the org. He should use /projects/ instead!
projects1b = self.get(projects1_url, expect=403, auth=self.get_other_credentials())
# superuser should be able to read anything
projects9a = self.get(projects9_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects9a['count'], 1)
def test_get_item_subobjects_users(self):
# see if we can list the users added to the organization
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
org1_users_url = orgs['results'][1]['related']['users']
org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(org1_users['count'], 1)
org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(org1_users['count'], 1)
def test_get_item_subobjects_admins(self):
# see if we can list the users added to the organization
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
org1_users_url = orgs['results'][1]['related']['admins']
org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(org1_users['count'], 1)
org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(org1_users['count'], 1)
def test_get_item_subobjects_tags(self):
# put some tags on the org
org1 = Organization.objects.get(pk=2)
tag1 = Tag.objects.create(name='atag')
tag2 = Tag.objects.create(name='btag')
org1.tags.add(tag1)
org1.tags.add(tag2)
# see if we can list the users added to the organization
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
org1_tags_url = orgs['results'][1]['related']['tags']
org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(org1_tags['count'], 2)
org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(org1_tags['count'], 2)
org1_tags = self.get(org1_tags_url, expect=403, auth=self.get_other_credentials())
def test_get_item_subobjects_audit_trail(self):
url = '/api/v1/organizations/2/audit_trail/'
self.get(url, expect=200, auth=self.get_normal_credentials())
# FIXME: verify that some audit trail records are auto-created on save AND post
def test_post_item(self):
new_org = dict(name='magic test org', description='8675309')
# need to be a valid user
self.post(self.collection(), new_org, expect=401, auth=None)
self.post(self.collection(), new_org, expect=401, auth=self.get_invalid_credentials())
# only super users can create organizations
self.post(self.collection(), new_org, expect=403, auth=self.get_normal_credentials())
self.post(self.collection(), new_org, expect=403, auth=self.get_other_credentials())
data1 = self.post(self.collection(), new_org, expect=201, auth=self.get_super_credentials())
# duplicate post results in 400
data2 = self.post(self.collection(), new_org, expect=400, auth=self.get_super_credentials())
# look at what we got back from the post, make sure we added an org
self.assertTrue(data1['url'].endswith("/11/"))
def test_post_item_subobjects_projects(self):
# first get all the orgs
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
# find projects attached to the first org
projects0_url = orgs['results'][0]['related']['projects']
projects1_url = orgs['results'][1]['related']['projects']
projects2_url = orgs['results'][2]['related']['projects']
# get all the projects on the first org
projects0 = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
a_project = projects0['results'][-1]
# attempt to add the project to the 7th org and see what happens
self.post(projects1_url, a_project, expect=204, auth=self.get_super_credentials())
projects1 = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 3)
# make sure adding a project that does not exist, or a missing pk field, results in a 400
self.post(projects1_url, dict(id=99999), expect=400, auth=self.get_super_credentials())
self.post(projects1_url, dict(asdf=1234), expect=400, auth=self.get_super_credentials())
# test that by posting a pk + disassociate: True we can remove a relationship
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 6)
a_project['disassociate'] = True
self.post(projects1_url, a_project, expect=204, auth=self.get_super_credentials())
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 5)
a_project = projects1['results'][-1]
a_project['disassociate'] = 1
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.post(projects1_url, a_project, expect=204, auth=self.get_normal_credentials())
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 4)
new_project_a = self.make_projects(self.normal_django_user, 1)[0]
new_project_b = self.make_projects(self.other_django_user, 1)[0]
# admin of org can add projects that he can read
self.post(projects1_url, dict(id=new_project_a.pk), expect=204, auth=self.get_normal_credentials())
# but not those he cannot
self.post(projects1_url, dict(id=new_project_b.pk), expect=403, auth=self.get_normal_credentials())
# and can't post a project he can read to an org he cannot
# self.post(projects2_url, dict(id=new_project_a.pk), expect=403, auth=self.get_normal_credentials())
# and can't do post a project he can read to an organization he cannot
self.post(projects2_url, dict(id=new_project_a.pk), expect=403, auth=self.get_normal_credentials())
def test_post_item_subobjects_users(self):
url = '/api/v1/organizations/2/users/'
users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(users['count'], 1)
self.post(url, dict(id=2), expect=204, auth=self.get_normal_credentials())
users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(users['count'], 2)
self.post(url, dict(id=2, disassociate=True), expect=204, auth=self.get_normal_credentials())
users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(users['count'], 1)
# post a completely new user to verify we can add users to the subcollection directly
new_user = dict(username='NewUser9000')
which_org = self.normal_django_user.admin_of_organizations.all()[0]
url = '/api/v1/organizations/%s/users/' % (which_org.pk)
posted = self.post(url, new_user, expect=201, auth=self.get_normal_credentials())
all_users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(all_users['count'], 2)
def test_post_item_subobjects_admins(self):
url = '/api/v1/organizations/2/admins/'
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(admins['count'], 1)
self.post(url, dict(id=1), expect=204, auth=self.get_normal_credentials())
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(admins['count'], 2)
self.post(url, dict(id=1, disassociate=1), expect=204, auth=self.get_normal_credentials())
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(admins['count'], 1)
def test_post_item_subobjects_tags(self):
tag = Tag.objects.create(name='blippy')
url = '/api/v1/organizations/2/tags/'
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(tags['count'], 0)
self.post(url, dict(id=tag.pk), expect=204, auth=self.get_normal_credentials())
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(tags['count'], 1)
self.assertEqual(tags['results'][0]['id'], tag.pk)
self.post(url, dict(id=tag.pk, disassociate=1), expect=204, auth=self.get_normal_credentials())
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(tags['count'], 0)
def test_post_item_subobjects_audit_trail(self):
# audit trails are system things, and no user can post to them.
url = '/api/v1/organizations/2/audit_trail/'
self.post(url, dict(id=1), expect=405, auth=self.get_super_credentials())
def test_put_item(self):
# first get some urls and data to put back to them
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
data0 = self.get(urls[0], expect=200, auth=self.get_super_credentials())
data1 = self.get(urls[1], expect=200, auth=self.get_super_credentials())
# test that an unauthenticated user cannot do a put
new_data1 = data1.copy()
new_data1['description'] = 'updated description'
self.put(urls[0], new_data1, expect=401, auth=None)
self.put(urls[0], new_data1, expect=401, auth=self.get_invalid_credentials())
# user normal is an admin of org 0 and a member of org 1 so should be able to put only org 1
self.put(urls[0], new_data1, expect=403, auth=self.get_normal_credentials())
put_result = self.put(urls[1], new_data1, expect=200, auth=self.get_normal_credentials())
# get back org 1 and see if it changed
get_result = self.get(urls[1], expect=200, auth=self.get_normal_credentials())
self.assertEquals(get_result['description'], 'updated description')
# super user can also put even though they aren't added to the org users or admins list
self.put(urls[1], new_data1, expect=200, auth=self.get_super_credentials())
# make sure posting to this URL is not supported
self.post(urls[1], new_data1, expect=405, auth=self.get_super_credentials())
def test_put_item_subobjects_projects(self):
# any attempt to put a subobject should be a 405, edit the actual resource or POST with 'disassociate' to delete
# this is against a collection URL anyway, so we really need not repeat this test for other object types
# as a PUT against a collection doesn't make much sense.
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
projects0_url = orgs['results'][0]['related']['projects']
sub_projects = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(sub_projects['count'], 3)
first_sub_project = sub_projects['results'][0]
self.put(projects0_url, first_sub_project, expect=405, auth=self.get_super_credentials())
def test_delete_item(self):
# first get some urls
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
urldata1 = self.get(urls[1], auth=self.get_super_credentials())
# check authentication -- admins of the org and superusers can delete objects only
self.delete(urls[0], expect=401, auth=None)
self.delete(urls[0], expect=401, auth=self.get_invalid_credentials())
self.delete(urls[8], expect=403, auth=self.get_normal_credentials())
self.delete(urls[1], expect=204, auth=self.get_normal_credentials())
self.delete(urls[0], expect=204, auth=self.get_super_credentials())
# check that when we have deleted an object it comes back 404 via GET
# but that it's still in the database as inactive
self.get(urls[1], expect=404, auth=self.get_normal_credentials())
org1 = Organization.objects.get(pk=urldata1['id'])
self.assertEquals(org1.active, False)
# also check that DELETE on the collection doesn't work
self.delete(self.collection(), expect=405, auth=self.get_super_credentials())
# TODO: tests for tag disassociation

View File

@@ -0,0 +1,527 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import datetime
import json
from django.contrib.auth.models import User as DjangoUser
import django.test
from django.test.client import Client
from django.core.urlresolvers import reverse
from ansibleworks.main.models import *
from ansibleworks.main.tests.base import BaseTest
TEST_PLAYBOOK = '''- hosts: mygroup
gather_facts: false
tasks:
- name: woohoo
command: test 1 = 1
'''
class ProjectsTest(BaseTest):
# tests for users, projects, and teams
def collection(self):
return '/api/v1/projects/'
def setUp(self):
super(ProjectsTest, self).setUp()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 10)
self.projects = self.make_projects(self.normal_django_user, 10, TEST_PLAYBOOK)
# add projects to organizations in a more or less arbitrary way
for project in self.projects[0:2]:
self.organizations[0].projects.add(project)
for project in self.projects[3:8]:
self.organizations[1].projects.add(project)
for project in self.projects[9:10]:
self.organizations[2].projects.add(project)
self.organizations[0].projects.add(self.projects[-1])
self.organizations[9].projects.add(self.projects[-2])
# get the URL for various organization records
self.a_detail_url = "%s%s" % (self.collection(), self.organizations[0].pk)
self.b_detail_url = "%s%s" % (self.collection(), self.organizations[1].pk)
self.c_detail_url = "%s%s" % (self.collection(), self.organizations[2].pk)
# configuration:
# admin_user is an admin and regular user in all organizations
# other_user is all organizations
# normal_user is a user in organization 0, and an admin of organization 1
for x in self.organizations:
# NOTE: superuser does not have to be explicitly added to admin group
# x.admins.add(self.super_django_user)
x.users.add(self.super_django_user)
self.organizations[0].users.add(self.normal_django_user)
self.organizations[1].admins.add(self.normal_django_user)
self.team1 = Team.objects.create(
name = 'team1', organization = self.organizations[0]
)
self.team2 = Team.objects.create(
name = 'team2', organization = self.organizations[0]
)
# create some teams in the first org
self.team1.projects.add(self.projects[0])
self.team2.projects.add(self.projects[1])
self.team2.projects.add(self.projects[2])
self.team2.projects.add(self.projects[3])
self.team2.projects.add(self.projects[4])
self.team2.projects.add(self.projects[5])
self.team1.save()
self.team2.save()
self.team1.users.add(self.normal_django_user)
self.team2.users.add(self.other_django_user)
self.nobody_django_user = User.objects.create(username='nobody')
self.nobody_django_user.set_password('nobody')
self.nobody_django_user.save()
def get_nobody_credentials(self):
# here is a user without any permissions...
return ('nobody', 'nobody')
def test_playbooks(self):
def write_test_file(project, name, content):
full_path = os.path.join(project.get_project_path(), name)
if not os.path.exists(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
f = file(full_path, 'wb')
f.write(content)
f.close()
# Invalid local_path
project = self.projects[0]
project.local_path = 'path_does_not_exist'
project.save()
self.assertFalse(project.get_project_path())
self.assertEqual(len(project.playbooks), 0)
# Simple playbook
project = self.projects[1]
self.assertEqual(len(project.playbooks), 1)
write_test_file(project, 'foo.yml', TEST_PLAYBOOK)
self.assertEqual(len(project.playbooks), 2)
# Other files
project = self.projects[2]
self.assertEqual(len(project.playbooks), 1)
write_test_file(project, 'foo.txt', 'not a playbook')
self.assertEqual(len(project.playbooks), 1)
# Empty playbook
project = self.projects[3]
self.assertEqual(len(project.playbooks), 1)
write_test_file(project, 'blah.yml', '')
self.assertEqual(len(project.playbooks), 1)
# Invalid YAML
project = self.projects[4]
self.assertEqual(len(project.playbooks), 1)
write_test_file(project, 'blah.yml', TEST_PLAYBOOK + '----')
self.assertEqual(len(project.playbooks), 1)
# No hosts or includes
project = self.projects[5]
self.assertEqual(len(project.playbooks), 1)
playbook_content = TEST_PLAYBOOK.replace('hosts', 'hoists')
write_test_file(project, 'blah.yml', playbook_content)
self.assertEqual(len(project.playbooks), 1)
# Playbook in roles folder
project = self.projects[6]
self.assertEqual(len(project.playbooks), 1)
write_test_file(project, 'roles/blah.yml', TEST_PLAYBOOK)
self.assertEqual(len(project.playbooks), 1)
# Playbook in tasks folder
project = self.projects[7]
self.assertEqual(len(project.playbooks), 1)
write_test_file(project, 'tasks/blah.yml', TEST_PLAYBOOK)
self.assertEqual(len(project.playbooks), 1)
def test_mainline(self):
# =====================================================================
# PROJECTS - LISTING
# can get projects list
projects = '/api/v1/projects/'
# invalid auth
self.get(projects, expect=401)
self.get(projects, expect=401, auth=self.get_invalid_credentials())
# super user
results = self.get(projects, expect=200, auth=self.get_super_credentials())
self.assertEquals(results['count'], 10)
# org admin
results = self.get(projects, expect=200, auth=self.get_normal_credentials())
self.assertEquals(results['count'], 6)
# user on a team
results = self.get(projects, expect=200, auth=self.get_other_credentials())
self.assertEquals(results['count'], 5)
# user not on any teams
results = self.get(projects, expect=200, auth=self.get_nobody_credentials())
self.assertEquals(results['count'], 0)
# =====================================================================
# PROJECTS - ACCESS
project = '/api/v1/projects/%s/' % self.projects[3].pk
self.get(project, expect=200, auth=self.get_super_credentials())
self.get(project, expect=200, auth=self.get_normal_credentials())
self.get(project, expect=403, auth=self.get_other_credentials())
self.get(project, expect=403, auth=self.get_nobody_credentials())
# can delete projects
self.delete(project, expect=204, auth=self.get_normal_credentials())
self.get(project, expect=404, auth=self.get_normal_credentials())
# can list playbooks for projects
proj_playbooks = '/api/v1/projects/%d/playbooks/' % self.projects[2].pk
got = self.get(proj_playbooks, expect=200, auth=self.get_super_credentials())
self.assertEqual(got, self.projects[2].playbooks)
# can list member organizations for projects
proj_orgs = '/api/v1/projects/1/organizations/'
# only usable as superuser
got = self.get(proj_orgs, expect=403, auth=self.get_normal_credentials())
got = self.get(proj_orgs, expect=200, auth=self.get_super_credentials())
self.assertEquals(got['count'], 1)
self.assertEquals(got['results'][0]['url'], '/api/v1/organizations/1/')
# you can't add organizations to projects here, verify that this is true (405)
self.post(proj_orgs, data={}, expect=405, auth=self.get_super_credentials())
# =====================================================================
# TEAMS
all_teams = '/api/v1/teams/'
team1 = '/api/v1/teams/1/'
# can list teams
got = self.get(all_teams, expect=200, auth=self.get_super_credentials())
self.assertEquals(got['count'], 2)
# FIXME: for other accounts, also check filtering
# can get teams
got = self.get(team1, expect=200, auth=self.get_super_credentials())
self.assertEquals(got['url'], '/api/v1/teams/1/')
got = self.get(team1, expect=200, auth=self.get_normal_credentials())
got = self.get(team1, expect=403, auth=self.get_other_credentials())
self.team1.users.add(User.objects.get(username='other'))
self.team1.save()
got = self.get(team1, expect=200, auth=self.get_other_credentials())
got = self.get(team1, expect=403, auth=self.get_nobody_credentials())
new_team = dict(name='newTeam', description='blarg', organization=1)
new_team2 = dict(name='newTeam2', description='blarg', organization=1)
new_team3 = dict(name='newTeam3', description='bad wolf', organization=1)
# can add teams
posted1 = self.post(all_teams, data=new_team, expect=201, auth=self.get_super_credentials())
posted2 = self.post(all_teams, data=new_team, expect=400, auth=self.get_super_credentials())
posted3 = self.post(all_teams, data=new_team2, expect=201, auth=self.get_normal_credentials())
posted4 = self.post(all_teams, data=new_team2, expect=400, auth=self.get_normal_credentials())
posted5 = self.post(all_teams, data=new_team3, expect=403, auth=self.get_other_credentials())
url1 = posted1['url']
url3 = posted3['url']
url5 = posted1['url']
new_team = Team.objects.create(name='newTeam4', organization=Organization.objects.get(pk=2))
url = '/api/v1/teams/%s/' % new_team.pk
# can delete teams
self.delete(url, expect=401)
self.delete(url, expect=403, auth=self.get_nobody_credentials())
self.delete(url, expect=403, auth=self.get_other_credentials())
self.delete(url, expect=204, auth=self.get_normal_credentials())
self.delete(url3, expect=204, auth=self.get_super_credentials())
# =====================================================================
# ORGANIZATION TEAMS
# can list organization teams (filtered by user) -- this is an org admin function
org_teams = '/api/v1/organizations/2/teams/'
data1 = self.get(org_teams, expect=401)
data2 = self.get(org_teams, expect=403, auth=self.get_nobody_credentials())
data3 = self.get(org_teams, expect=403, auth=self.get_other_credentials())
data4 = self.get(org_teams, expect=200, auth=self.get_normal_credentials())
data5 = self.get(org_teams, expect=200, auth=self.get_super_credentials())
# can add teams to organizations
new_team1 = dict(name='super new team A')
# also tests that sub posts overwrite the related field:
new_team2 = dict(name='super new team B', organization=34567)
new_team3 = dict(name='super new team C')
data1 = self.post(org_teams, new_team1, expect=401)
data1 = self.post(org_teams, new_team1, expect=403, auth=self.get_nobody_credentials())
data1 = self.post(org_teams, new_team1, expect=403, auth=self.get_other_credentials())
data2 = self.post(org_teams, new_team2, expect=201, auth=self.get_normal_credentials())
data3 = self.post(org_teams, new_team3, expect=201, auth=self.get_super_credentials())
# can remove teams from organizations
data2['disassociate'] = 1
url = data2['url']
deleted = self.post(org_teams, data2, expect=204, auth=self.get_normal_credentials())
got = self.get(url, expect=404, auth=self.get_normal_credentials())
# =====================================================================
# TEAM PROJECTS
team = Team.objects.filter(organization__pk = 2)[0]
team_projects = '/api/v1/teams/%s/projects/' % (team.pk)
p1 = self.projects[0]
team.projects.add(p1)
team.save()
got = self.get(team_projects, expect=200, auth=self.get_super_credentials())
# FIXME: project postablility tests somewhat incomplete.
# add tests to show we can create new projects on the subresource and so on.
self.assertEquals(got['count'], 1)
# =====================================================================
# TEAMS USER MEMBERSHIP
team = Team.objects.filter(organization__pk = 2)[0]
team_users = '/api/v1/teams/%s/users/' % (team.pk)
for x in team.users.all():
team.users.remove(x)
team.save()
# can list uses on teams
self.get(team_users, expect=401)
self.get(team_users, expect=401, auth=self.get_invalid_credentials())
self.get(team_users, expect=403, auth=self.get_nobody_credentials())
self.get(team_users, expect=403, auth=self.get_other_credentials())
self.get(team_users, expect=200, auth=self.get_normal_credentials())
self.get(team_users, expect=200, auth=self.get_super_credentials())
# can add users to teams
all_users = self.get('/api/v1/users/', expect=200, auth=self.get_super_credentials())
for x in all_users['results']:
self.post(team_users, data=x, expect=403, auth=self.get_nobody_credentials())
self.post(team_users, data=x, expect=204, auth=self.get_normal_credentials())
self.assertEqual(Team.objects.get(pk=team.pk).users.count(), 4)
# can remove users from teams
for x in all_users['results']:
y = dict(id=x['id'], disassociate=1)
self.post(team_users, data=y, expect=403, auth=self.get_nobody_credentials())
self.post(team_users, data=y, expect=204, auth=self.get_normal_credentials())
self.assertEquals(Team.objects.get(pk=team.pk).users.count(), 0)
# =====================================================================
# USER TEAMS
# from a user, can see what teams they are on (related resource)
other = User.objects.get(username = 'other')
url = '/api/v1/users/%s/teams/' % other.pk
self.get(url, expect=401)
self.get(url, expect=401, auth=self.get_invalid_credentials())
self.get(url, expect=403, auth=self.get_nobody_credentials())
other.organizations.add(Organization.objects.get(pk=2))
other.save()
my_teams1 = self.get(url, expect=200, auth=self.get_normal_credentials())
my_teams2 = self.get(url, expect=200, auth=self.get_other_credentials())
self.assertEqual(my_teams1['count'], 2)
self.assertEqual(my_teams1, my_teams2)
# =====================================================================
# USER PROJECTS
url = '/api/v1/users/%s/projects/' % other.pk
# from a user, can see what projects they can see based on team association
# though this resource doesn't do anything else
got = self.get(url, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 5)
got = self.get(url, expect=403, auth=self.get_nobody_credentials())
got = self.get(url, expect=401, auth=self.get_invalid_credentials())
got = self.get(url, expect=401)
got = self.get(url, expect=200, auth=self.get_super_credentials())
# =====================================================================
# CREDENTIALS
other_creds = '/api/v1/users/%s/credentials/' % other.pk
team_creds = '/api/v1/teams/%s/credentials/' % team.pk
new_credentials = dict(
name = 'credential',
project = Project.objects.all()[0].pk,
default_username = 'foo',
ssh_key_data = 'bar',
ssh_key_unlock = 'baz',
ssh_password = 'narf',
sudo_password = 'troz'
)
# can add credentials to a user (if user or org admin or super user)
self.post(other_creds, data=new_credentials, expect=401)
self.post(other_creds, data=new_credentials, expect=401, auth=self.get_invalid_credentials())
self.post(other_creds, data=new_credentials, expect=201, auth=self.get_super_credentials())
self.post(other_creds, data=new_credentials, expect=201, auth=self.get_normal_credentials())
result = self.post(other_creds, data=new_credentials, expect=201, auth=self.get_other_credentials())
self.post(other_creds, data=new_credentials, expect=403, auth=self.get_nobody_credentials())
cred_user = result['id']
# can add credentials to a team
self.post(team_creds, data=new_credentials, expect=401)
self.post(team_creds, data=new_credentials, expect=401, auth=self.get_invalid_credentials())
self.post(team_creds, data=new_credentials, expect=201, auth=self.get_super_credentials())
result = self.post(team_creds, data=new_credentials, expect=201, auth=self.get_normal_credentials())
self.post(team_creds, data=new_credentials, expect=403, auth=self.get_other_credentials())
self.post(team_creds, data=new_credentials, expect=403, auth=self.get_nobody_credentials())
cred_team = result['id']
# can list credentials on a user
self.get(other_creds, expect=401)
self.get(other_creds, expect=401, auth=self.get_invalid_credentials())
self.get(other_creds, expect=200, auth=self.get_super_credentials())
self.get(other_creds, expect=200, auth=self.get_normal_credentials())
self.get(other_creds, expect=200, auth=self.get_other_credentials())
self.get(other_creds, expect=403, auth=self.get_nobody_credentials())
# can list credentials on a team
self.get(team_creds, expect=401)
self.get(team_creds, expect=401, auth=self.get_invalid_credentials())
self.get(team_creds, expect=200, auth=self.get_super_credentials())
self.get(team_creds, expect=200, auth=self.get_normal_credentials())
self.get(team_creds, expect=403, auth=self.get_other_credentials())
self.get(team_creds, expect=403, auth=self.get_nobody_credentials())
# Check /api/v1/credentials (GET)
url = reverse('main:credentials_list')
with self.current_user(self.super_django_user):
self.options(url)
self.head(url)
response = self.get(url)
qs = Credential.objects.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# POST should fail for all users.
with self.current_user(self.super_django_user):
data = dict(name='xyz', user=self.super_django_user.pk)
self.post(url, data, expect=405)
# FIXME: Check list as other users.
# can edit a credential
cred_user = Credential.objects.get(pk=cred_user)
cred_team = Credential.objects.get(pk=cred_team)
d_cred_user = dict(id=cred_user.pk, name='x', sudo_password='blippy', user=cred_user.pk)
d_cred_user2 = dict(id=cred_user.pk, name='x', sudo_password='blippy', user=User.objects.get(pk=1).pk)
d_cred_team = dict(id=cred_team.pk, name='x', sudo_password='blippy', team=cred_team.pk)
edit_creds1 = '/api/v1/credentials/%s/' % cred_user.pk
edit_creds2 = '/api/v1/credentials/%s/' % cred_team.pk
self.put(edit_creds1, data=d_cred_user, expect=401)
self.put(edit_creds1, data=d_cred_user, expect=401, auth=self.get_invalid_credentials())
self.put(edit_creds1, data=d_cred_user, expect=200, auth=self.get_super_credentials())
self.put(edit_creds1, data=d_cred_user, expect=200, auth=self.get_normal_credentials())
# editing a credential to edit the user record is not legal, this is a test of the .validate
# method on the serializer to allow 'write once' fields
self.put(edit_creds1, data=d_cred_user2, expect=400, auth=self.get_normal_credentials())
cred_put_u = self.put(edit_creds1, data=d_cred_user, expect=200, auth=self.get_other_credentials())
self.put(edit_creds2, data=d_cred_team, expect=401)
self.put(edit_creds2, data=d_cred_team, expect=401, auth=self.get_invalid_credentials())
cred_team = Credential.objects.get(pk=cred_team.pk)
self.put(edit_creds2, data=d_cred_team, expect=200, auth=self.get_super_credentials())
cred_team = Credential.objects.get(pk=cred_team.pk)
cred_put_t = self.put(edit_creds2, data=d_cred_team, expect=200, auth=self.get_normal_credentials())
self.put(edit_creds2, data=d_cred_team, expect=403, auth=self.get_other_credentials())
cred_put_t['disassociate'] = 1
team_url = "/api/v1/teams/%s/credentials/" % cred_put_t['team']
self.post(team_url, data=cred_put_t, expect=204, auth=self.get_normal_credentials())
# can remove credentials from a user (via disassociate)
cred_put_u['disassociate'] = 1
url = cred_put_u['url']
user_url = "/api/v1/users/%s/credentials/" % cred_put_u['user']
self.post(user_url, data=cred_put_u, expect=204, auth=self.get_normal_credentials())
# can delete a credential directly -- probably won't be used too often
data = self.delete(url, expect=204, auth=self.get_other_credentials())
data = self.delete(url, expect=404, auth=self.get_other_credentials())
# =====================================================================
# PERMISSIONS
user = self.other_django_user
team = Team.objects.get(pk=1)
organization = Organization.objects.get(pk=1)
inventory = Inventory.objects.create(
name = 'test inventory',
organization = organization,
created_by = self.super_django_user
)
project = Project.objects.get(pk=1)
# can add permissions to a user
user_permission = dict(
name='user can deploy a certain project to a certain inventory',
# user=user.pk, # no need to specify, this will be automatically filled in
inventory=inventory.pk,
project=project.pk,
permission_type=PERM_INVENTORY_DEPLOY
)
team_permission = dict(
name='team can deploy a certain project to a certain inventory',
# team=team.pk, # no need to specify, this will be automatically filled in
inventory=inventory.pk,
project=project.pk,
permission_type=PERM_INVENTORY_DEPLOY
)
url = '/api/v1/users/%s/permissions/' % user.pk
posted = self.post(url, user_permission, expect=201, auth=self.get_super_credentials())
url2 = posted['url']
got = self.get(url2, expect=200, auth=self.get_other_credentials())
# can add permissions on a team
url = '/api/v1/teams/%s/permissions/' % team.pk
posted = self.post(url, team_permission, expect=201, auth=self.get_super_credentials())
url2 = posted['url']
# check we can get that permission back
got = self.get(url2, expect=200, auth=self.get_other_credentials())
# can list permissions on a user
url = '/api/v1/users/%s/permissions/' % user.pk
got = self.get(url, expect=200, auth=self.get_super_credentials())
got = self.get(url, expect=200, auth=self.get_other_credentials())
got = self.get(url, expect=403, auth=self.get_nobody_credentials())
# can list permissions on a team
url = '/api/v1/teams/%s/permissions/' % team.pk
got = self.get(url, expect=200, auth=self.get_super_credentials())
got = self.get(url, expect=200, auth=self.get_other_credentials())
got = self.get(url, expect=403, auth=self.get_nobody_credentials())
# can edit a permission -- reducing the permission level
team_permission['permission_type'] = PERM_INVENTORY_CHECK
self.put(url2, team_permission, expect=200, auth=self.get_super_credentials())
self.put(url2, team_permission, expect=403, auth=self.get_other_credentials())
# can remove permissions
# do need to disassociate, just delete it
self.delete(url2, expect=403, auth=self.get_other_credentials())
self.delete(url2, expect=204, auth=self.get_super_credentials())
self.delete(url2, expect=404, auth=self.get_other_credentials())

View File

@@ -0,0 +1,517 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import os
import shutil
import tempfile
from django.conf import settings
from django.test.utils import override_settings
from ansibleworks.main.models import *
from ansibleworks.main.tests.base import BaseTransactionTest
from ansibleworks.main.tasks import RunJob
TEST_PLAYBOOK = '''- hosts: test-group
gather_facts: False
tasks:
- name: should pass
command: test 1 = 1
- name: should also pass
command: test 2 = 2
'''
TEST_PLAYBOOK2 = '''- hosts: test-group
gather_facts: False
tasks:
- name: should fail
command: test 1 = 0
'''
TEST_SSH_KEY_DATA = '''-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAyQ8F5bbgjHvk4SZJsKI9OmJKMFxZqRhvx4LaqjLTKbBwRBsY
1/C00NPiZn70dKbeyV7RNVZxuzM6yd3D3lwTdbDu/eJ0x72t3ch+TdLt/aenyy10
IvZyhSlxCLDkDaVVPFYJOQzVS8TkdOi6ZHc+R0c0A+4ZE8OQ8C0zIKtUTHqRk4/v
gYK5guhNS0DdgWkBj6K+r/9D4bqdPTJPt4S7H75vb1tBgseiqftEkLYOhTK2gsCi
5uJgpG4zPQY4Kk/97dbW7pwcvPkr1rKkAwEJ27Bfo+DBv3oEx3SinpXQtOrH1aEO
RHSXldBaymdBtVLUhjxDlnnQ7Ps+fNX04R7N4QIDAQABAoIBAQClEDxbNyRqsVxa
q8BbzxZNVFxsD6Vceb9rIDa8/DT4SO4iO8zNm8QWnZ2FYDz5d/X3hGxlSa7dbVWa
XQJtD1K6kKPks4IEaejP58Ypxj20vWu4Fnz+Jy4lvLwb0n2n5lBv1IKF389NATw9
7sL3sB3lDsPZZiQYYbogNDuBWqc+kP0zD84bONsM/B2HMRm9BRv2UsZf+zKU4pTA
UqHffyjmw7LqHmbtVjwVcUsC+xcE4kCuWLvabFnTWOSnWECyIw2+trxKdwCXbfzG
s5rn4Dj+aEKimzFaRpTSVx6w4yw9xw/EjsSaZ88jKSpTP8ocCut6zv+P/JwlukEX
4A4FxqyxAoGBAOp3G9EIAAWijcIgO5OdiZNEqVyqd3yyPzT6d/q7bf4dpVCZiLNA
bRmge83aMc4g2Dpkn/++It3bDmnXXGg+BZSX5KT9JLklXchaw9phv9J0diZEUvYS
mSQafbUGIqYnYzns3TU0cbgITs1iVIEstHYjGr3J88nDG+HFCHboxa93AoGBANuG
cDFgyvm79+haK2fHhUCZgaFFYBpkpuz+zjDjzIytOzymWa2gD9jIa7mvdvoH2ge3
AVG0vy+n9cJaqJMuLkhdI01wVlqY9wvDHFyZCXyIvKVPMljKeTvCNGCupsG4R171
gSKT5ryOx58MGbE7knAZC+QWpwxFpdpbfej6g7NnAoGBAMz6ipAJbXN/tG0FnvAj
pxXfzizcPw/+CTI40tGaMMQbiN5ZC+CiL39bBUFnQ2mQ31jVheegg3zvuL8hb4EW
z+wjitoPEZ7nowC5EUaHdJr6BBzaWKkWg1nD6yhqj7ow7xfCE3YjPlQEt1fpYjV4
LuClOgi4WPCIKYUMq6TBRaprAoGAVrEjs0xPPApQH5EkXQp9BALbH23/Qs0G4sbJ
dKMxT0jGAPCMr7VrLKgRarXxXVImdy99NOAVNGO2+PbGZcEyA9/MJjO71nFb9mgp
1iOVjHmPThUVg90JvWC3QIsYTZ5RiR2Yzqfr0gDsslGb/9LPxLcPbBbKB12l3rKM
6amswvcCgYEAvgcSlTfAkI3ac8rB70HuDmSdqKblIiQjtPtT/ixXaFkZOmHRr4AE
KepMRDnaO/ldPDPEWCGqPzEM0t/0jS8/hCu3zLHHpZ+0LnHq+EXkOI0/GB4P+z5l
Vz3kouC0BTav0rCEnDop/cWMTiAp/XhKXfrTTTOra/F8l2xD8n/mnzY=
-----END RSA PRIVATE KEY-----'''
TEST_SSH_KEY_DATA_LOCKED = '''-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: AES-128-CBC,6B4E92AF4C29DE26FD8535D81825BDE6
pg8YplxPpfzgEGUiko34DGaYklyGyYKXjOrGFGyLoquNAVNFyewT34dDrZi0IAaE
79wMVcdlHbrJfZz8ML8I/ft6zM6BdlwZExH4y9DRAaktY3yIXxSvowBQ6ljh3wUy
M6m0afOfVjT22V8hLFgX0yTQ6P9zTG1cmj6+JQWTsMJ5EP3rnFK5CyrJXP48B3GI
GgE66rkXDvcKlVeIrbrpcTyfmEpafPgVRJYCDFXxeO/BfKgUFVxFq1PgFbvGQMmD
wA6EsyRrN+aoub1sqzj8tM8e4nwEi0EifdRShkFeqH4GUOKypanTXfCqwFBgYi5a
i3YwSnniZZPwCniGR5cl8oetrc5dubq/IR0txsGi2lO6zJEWdSer/EadS0QAll4S
yXrSc/lFaez1VmVe/8aoBKDOHhe7jV3YXAuqCeB4o/SThB/9Gad44MTbqFH3d7cD
k+F0Cjup7LZqZpXeB7ZHRG/Yt9MtBzwDVmEWaxA1WIN5a8xyZEVzRswSi4lZX69z
Va7eTKcrCbHOQmIbLZGRiZbAbfgriwwxQCJWELv80h+A754Bhi23n3WzcT094fRi
cqK//HcHHXxYGmrfUbHYcj+GCQ07Uk2ZR3qglmPISUCgfZwM9k0LpXudWE8vmF2S
pAnbgxgrfUMtpu5EAO+d8Sn5wQLVD7YzPBUhM4PYfYUbJnRoZQryuR4lqCzcg0te
BM8x1LzSXyBEbQaonuMzSz1hCQ9hZpUwUEqDWAT3cPNmgyWkXQ1P8ehJhTmryGJw
/GHxNzMZDGj+bBKo7ic3r1g3ZmmlSU1EVxMLvRBKhdc1XicBVqepDma6/LEpj+5X
oplR+3Q0QSQ8CchcSxYtOpI3UBCatpyu09GtfzS+7bI5I7FVYUccR83+oQlKpPHC
5O2irB8JeXqAY679fx2N4i0E6l5Xr5AjUtOBCNil0Y70eOf9ER6i7kGakR7bUtk5
fQn8Em9pLsYYalnekn4sxyHpGq59KgNPjQiJRByYidSJ/oyNbmtPlxfXLwpuicd2
8HLm1e0UeGidfF/bSlySwDzy1ZlSr/Apdcn9ou5hfhaGuQvjr9SvJwxQFNRMPdHj
ukBSDGuxyyU+qBrWJhFsymiZAWDofY/4GzgMu4hh0PwN5arzoTxnLHmc/VFttyMx
nP7bTaa9Sr54TlMr7NuKTzz5biXKjqJ9AZKIUF2+ERebjV0hMpJ5NPsLwPUnA9kx
R3tl1JL2Ia82ovS81Ghff/cBZsx/+LQYa+ac4eDTyXxyg4ei5tPwOlzz7pDKJAr9
XEh2X6rywCNghEMZPaOQLiEDLJ2is6P4OarSa/yoU4OMetpFfwZ0oJSCmGlEa+CF
zeJ80yXhU1Ru2eqiUjCAUg25BFPwoiMJDc6jWWow7OrXCQsw7Ddo2ncy1p9QeWjM
2R4ojPHWuXKYxvwVSc8NZHASlycBCaxHLDAEyH4avOSDPWOB1H5t+RrNmo0qgush
0aRo6F7BjzB2rA4E+xu2u11TBfF8iB3PC919/vxnkXF97NqezsaCz6VbRlsU0A+B
wwoi+P4JlJF6ZuhuDv6mhmBCSdXdc1bvimvdpOljhThr+cG5mM08iqWGKdA665cw
-----END RSA PRIVATE KEY-----
'''
TEST_SSH_KEY_DATA_UNLOCK = 'unlockme'
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
class BaseCeleryTest(BaseTransactionTest):
'''
Base class for celery task tests.
'''
@override_settings(ANSIBLE_TRANSPORT='local')
class RunJobTest(BaseCeleryTest):
'''
Test cases for RunJob celery task.
'''
def setUp(self):
super(RunJobTest, self).setUp()
self.test_project_path = None
self.setup_users()
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.inventory = Inventory.objects.create(name='test-inventory',
description='description for test-inventory',
organization=self.organization)
self.host = self.inventory.hosts.create(name='host.example.com',
inventory=self.inventory)
self.group = self.inventory.groups.create(name='test-group',
inventory=self.inventory)
self.group.hosts.add(self.host)
self.project = None
self.credential = None
# Pass test database name in environment for use by the inventory script.
os.environ['ACOM_TEST_DATABASE_NAME'] = settings.DATABASES['default']['NAME']
# Monkeypatch RunJob to capture list of command line arguments.
self.original_build_args = RunJob.build_args
self.run_job_args = None
self.build_args_callback = lambda: None
def new_build_args(_self, job, **kw):
args = self.original_build_args(_self, job, **kw)
self.run_job_args = args
self.build_args_callback()
return args
RunJob.build_args = new_build_args
def tearDown(self):
super(RunJobTest, self).tearDown()
os.environ.pop('ACOM_TEST_DATABASE_NAME', None)
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)
RunJob.build_args = self.original_build_args
def create_test_credential(self, **kwargs):
opts = {
'name': 'test-creds',
'user': self.super_django_user,
'ssh_username': '',
'ssh_key_data': '',
'ssh_key_unlock': '',
'ssh_password': '',
'sudo_username': '',
'sudo_password': '',
}
opts.update(kwargs)
self.credential = Credential.objects.create(**opts)
return self.credential
def create_test_project(self, playbook_content):
self.project = self.make_projects(self.normal_django_user, 1, playbook_content)[0]
self.organization.projects.add(self.project)
def create_test_job_template(self, **kwargs):
opts = {
'name': 'test-job-template',
'inventory': self.inventory,
'project': self.project,
'credential': self.credential,
}
try:
opts['playbook'] = self.project.playbooks[0]
except (AttributeError, IndexError):
pass
opts.update(kwargs)
self.job_template = JobTemplate.objects.create(**opts)
return self.job_template
def create_test_job(self, **kwargs):
job_template = kwargs.pop('job_template', None)
if job_template:
self.job = job_template.create_job(**kwargs)
else:
opts = {
'name': 'test-job',
'inventory': self.inventory,
'project': self.project,
'credential': self.credential,
}
try:
opts['playbook'] = self.project.playbooks[0]
except (AttributeError, IndexError):
pass
opts.update(kwargs)
self.job = Job.objects.create(**opts)
return self.job
def check_job_result(self, job, expected='successful', expect_stdout=True,
expect_traceback=False):
msg = 'job status is %s, expected %s' % (job.status, expected)
if job.result_traceback:
msg = '%s\ngot traceback:\n%s' % (msg, job.result_traceback)
if job.result_stdout:
msg = '%s\ngot stdout:\n%s' % (msg, job.result_stdout)
if isinstance(expected, (list, tuple)):
self.assertTrue(job.status in expected)
else:
self.assertEqual(job.status, expected, msg)
if expect_stdout:
self.assertTrue(job.result_stdout)
else:
self.assertFalse(job.result_stdout,
'expected no stdout, got:\n%s' %
job.result_stdout)
if expect_traceback:
self.assertTrue(job.result_traceback)
else:
self.assertFalse(job.result_traceback,
'expected no traceback, got:\n%s' %
job.result_traceback)
def test_run_job(self):
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
job_events = job.job_events.all()
for job_event in job_events:
unicode(job_event) # For test coverage.
job_event.save()
self.assertEqual(job_events.filter(event='playbook_on_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_play_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_task_start').count(), 2)
self.assertEqual(job_events.filter(event='runner_on_ok').count(), 2)
for evt in job_events.filter(event='runner_on_ok'):
self.assertEqual(evt.host, self.host)
self.assertEqual(job_events.filter(event='playbook_on_stats').count(), 1)
for job_host_summary in job.job_host_summaries.all():
unicode(job_host_summary) # For test coverage.
self.assertEqual(job_host_summary.host.last_job_host_summary, job_host_summary)
self.host = Host.objects.get(pk=self.host.pk)
self.assertEqual(self.host.last_job, job)
self.assertEqual(job.successful_hosts.count(), 1)
self.assertEqual(job.failed_hosts.count(), 0)
self.assertEqual(job.changed_hosts.count(), 1)
self.assertEqual(job.unreachable_hosts.count(), 0)
self.assertEqual(job.skipped_hosts.count(), 0)
self.assertEqual(job.processed_hosts.count(), 1)
def test_check_job(self):
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template, job_type='check')
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
job_events = job.job_events.all()
self.assertEqual(job_events.filter(event='playbook_on_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_play_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_task_start').count(), 2)
self.assertEqual(job_events.filter(event='runner_on_skipped').count(), 2)
for evt in job_events.filter(event='runner_on_skipped'):
self.assertEqual(evt.host, self.host)
self.assertEqual(job_events.filter(event='playbook_on_stats').count(), 1)
self.host = Host.objects.get(pk=self.host.pk)
self.assertEqual(self.host.last_job, job)
self.assertEqual(job.successful_hosts.count(), 0)
self.assertEqual(job.failed_hosts.count(), 0)
self.assertEqual(job.changed_hosts.count(), 0)
self.assertEqual(job.unreachable_hosts.count(), 0)
self.assertEqual(job.skipped_hosts.count(), 1)
self.assertEqual(job.processed_hosts.count(), 1)
def test_run_job_that_fails(self):
self.create_test_project(TEST_PLAYBOOK2)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
job_events = job.job_events.all()
self.assertEqual(job_events.filter(event='playbook_on_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_play_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_task_start').count(), 1)
self.assertEqual(job_events.filter(event='runner_on_failed').count(), 1)
self.assertEqual(job_events.get(event='runner_on_failed').host, self.host)
self.assertEqual(job_events.filter(event='playbook_on_stats').count(), 1)
self.host = Host.objects.get(pk=self.host.pk)
self.assertEqual(self.host.last_job, job)
self.assertEqual(job.successful_hosts.count(), 0)
self.assertEqual(job.failed_hosts.count(), 1)
self.assertEqual(job.changed_hosts.count(), 0)
self.assertEqual(job.unreachable_hosts.count(), 0)
self.assertEqual(job.skipped_hosts.count(), 0)
self.assertEqual(job.processed_hosts.count(), 1)
def test_check_job_where_task_would_fail(self):
self.create_test_project(TEST_PLAYBOOK2)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template, job_type='check')
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
# Since we don't actually run the task, the --check should indicate
# everything is successful.
self.check_job_result(job, 'successful')
job_events = job.job_events.all()
self.assertEqual(job_events.filter(event='playbook_on_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_play_start').count(), 1)
self.assertEqual(job_events.filter(event='playbook_on_task_start').count(), 1)
self.assertEqual(job_events.filter(event='runner_on_skipped').count(), 1)
self.assertEqual(job_events.get(event='runner_on_skipped').host, self.host)
self.assertEqual(job_events.filter(event='playbook_on_stats').count(), 1)
self.host = Host.objects.get(pk=self.host.pk)
self.assertEqual(self.host.last_job, job)
self.assertEqual(job.successful_hosts.count(), 0)
self.assertEqual(job.failed_hosts.count(), 0)
self.assertEqual(job.changed_hosts.count(), 0)
self.assertEqual(job.unreachable_hosts.count(), 0)
self.assertEqual(job.skipped_hosts.count(), 1)
self.assertEqual(job.processed_hosts.count(), 1)
def _cancel_job_callback(self):
job = Job.objects.get(pk=self.job.pk)
self.assertTrue(job.cancel())
self.assertTrue(job.cancel()) # No change from calling again.
def test_cancel_job(self):
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
# Pass save=False just for the sake of test coverage.
job = self.create_test_job(job_template=job_template, save=False)
job.save()
self.assertEqual(job.status, 'new')
self.assertEqual(job.cancel_flag, False)
# Calling cancel before start has no effect.
self.assertFalse(job.cancel())
self.assertEqual(job.cancel_flag, False)
self.assertFalse(job.get_passwords_needed_to_start())
self.build_args_callback = self._cancel_job_callback
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'canceled')
self.assertEqual(job.cancel_flag, True)
# Calling cancel afterwards just returns the cancel flag.
self.assertTrue(job.cancel())
# Read attribute for test coverage.
job.celery_task
job.celery_task_id = ''
job.save()
self.assertEqual(job.celery_task, None)
# Unable to start job again.
self.assertFalse(job.start())
def test_extra_job_options(self):
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template(forks=3, verbosity=2,
extra_vars='foo=1')
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
# Job may fail if current user doesn't have password-less sudo
# privileges, but we're mainly checking the command line arguments.
self.check_job_result(job, ('successful', 'failed'))
self.assertTrue('--forks=3' in self.run_job_args)
self.assertTrue('-vv' in self.run_job_args)
self.assertTrue('-e' in self.run_job_args)
def test_limit_option(self):
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template(limit='bad.example.com')
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
self.assertTrue('-l' in self.run_job_args)
def test_ssh_username_and_password(self):
self.create_test_credential(ssh_username='sshuser',
ssh_password='sshpass')
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('-u' in self.run_job_args)
self.assertTrue('--ask-pass' in self.run_job_args)
def test_ssh_ask_password(self):
self.create_test_credential(ssh_password='ASK')
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertTrue(job.get_passwords_needed_to_start())
self.assertTrue('ssh_password' in job.get_passwords_needed_to_start())
self.assertFalse(job.start())
self.assertEqual(job.status, 'new')
self.assertTrue(job.start(ssh_password='sshpass'))
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('--ask-pass' in self.run_job_args)
def test_sudo_username_and_password(self):
self.create_test_credential(sudo_username='sudouser',
sudo_password='sudopass')
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
# Job may fail if current user doesn't have password-less sudo
# privileges, but we're mainly checking the command line arguments.
self.check_job_result(job, ('successful', 'failed'))
self.assertTrue('-U' in self.run_job_args)
self.assertTrue('--ask-sudo-pass' in self.run_job_args)
def test_sudo_ask_password(self):
self.create_test_credential(sudo_password='ASK')
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertTrue(job.get_passwords_needed_to_start())
self.assertTrue('sudo_password' in job.get_passwords_needed_to_start())
self.assertFalse(job.start())
self.assertEqual(job.status, 'new')
self.assertTrue(job.start(sudo_password='sudopass'))
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
# Job may fail if current user doesn't have password-less sudo
# privileges, but we're mainly checking the command line arguments.
self.assertTrue(job.status in ('successful', 'failed'))
self.assertTrue('--ask-sudo-pass' in self.run_job_args)
def test_unlocked_ssh_key(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA)
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in self.run_job_args)
def test_locked_ssh_key_with_password(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK)
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in self.run_job_args)
self.assertTrue('Bad passphrase' not in job.result_stdout)
def test_locked_ssh_key_with_bad_password(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock='not the passphrase')
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.get_passwords_needed_to_start())
self.assertTrue(job.start())
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'failed')
self.assertTrue('ssh-agent' in self.run_job_args)
self.assertTrue('Bad passphrase' in job.result_stdout)
def test_locked_ssh_key_ask_password(self):
self.create_test_credential(ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock='ASK')
self.create_test_project(TEST_PLAYBOOK)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertTrue(job.get_passwords_needed_to_start())
self.assertTrue('ssh_key_unlock' in job.get_passwords_needed_to_start())
self.assertFalse(job.start())
self.assertEqual(job.status, 'new')
self.assertTrue(job.start(ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK))
self.assertEqual(job.status, 'pending')
job = Job.objects.get(pk=job.pk)
self.check_job_result(job, 'successful')
self.assertTrue('ssh-agent' in self.run_job_args)
self.assertTrue('Bad passphrase' not in job.result_stdout)

View File

@@ -0,0 +1,226 @@
# Copyright (c) 2013 AnsibleWorks, Inc.
# All Rights Reserved.
import json
from django.contrib.auth.models import User as DjangoUser
import django.test
from django.test.client import Client
from ansibleworks.main.models import *
from ansibleworks.main.tests.base import BaseTest
class UsersTest(BaseTest):
def collection(self):
return '/api/v1/users/'
def setUp(self):
super(UsersTest, self).setUp()
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 1)
self.organizations[0].admins.add(self.normal_django_user)
self.organizations[0].users.add(self.other_django_user)
self.organizations[0].users.add(self.normal_django_user)
def test_only_super_user_or_org_admin_can_add_users(self):
url = '/api/v1/users/'
new_user = dict(username='blippy')
new_user2 = dict(username='blippy2')
self.post(url, expect=401, data=new_user, auth=None)
self.post(url, expect=401, data=new_user, auth=self.get_invalid_credentials())
self.post(url, expect=403, data=new_user, auth=self.get_other_credentials())
self.post(url, expect=201, data=new_user, auth=self.get_super_credentials())
self.post(url, expect=400, data=new_user, auth=self.get_super_credentials())
self.post(url, expect=201, data=new_user2, auth=self.get_normal_credentials())
self.post(url, expect=400, data=new_user2, auth=self.get_normal_credentials())
def test_auth_token_login(self):
auth_token_url = '/api/v1/authtoken/'
# Always returns a 405 for any GET request, regardless of credentials.
self.get(auth_token_url, expect=405, auth=None)
self.get(auth_token_url, expect=405, auth=self.get_invalid_credentials())
self.get(auth_token_url, expect=405, auth=self.get_normal_credentials())
# Posting without username/password fields or invalid username/password
# returns a 400 error.
data = {}
self.post(auth_token_url, data, expect=400)
data = dict(zip(('username', 'password'), self.get_invalid_credentials()))
self.post(auth_token_url, data, expect=400)
# A valid username/password should give us an auth token.
data = dict(zip(('username', 'password'), self.get_normal_credentials()))
result = self.post(auth_token_url, data, expect=200, auth=None)
self.assertTrue('token' in result)
self.assertEqual(result['token'], self.normal_django_user.auth_token.key)
auth_token = result['token']
# Verify we can access our own user information with the auth token.
data = self.get('/api/v1/me/', expect=200, auth=auth_token)
self.assertEquals(data['results'][0]['username'], 'normal')
self.assertEquals(data['count'], 1)
def test_ordinary_user_can_modify_some_fields_about_himself_but_not_all_and_passwords_work(self):
detail_url = '/api/v1/users/%s/' % self.other_django_user.pk
data = self.get(detail_url, expect=200, auth=self.get_other_credentials())
# can't change first_name, last_name, etc
data['last_name'] = "NewLastName"
self.put(detail_url, data, expect=403, auth=self.get_other_credentials())
# can't change username
data['username'] = 'newUsername'
self.put(detail_url, data, expect=403, auth=self.get_other_credentials())
# if superuser, CAN change lastname and username and such
self.put(detail_url, data, expect=200, auth=self.get_super_credentials())
# and user can still login
creds = self.get_other_credentials()
creds = ('newUsername', creds[1])
data = self.get(detail_url, expect=200, auth=creds)
# user can change their password (submit as text) and can still login
# and password is not stored as plaintext
data['password'] = 'newPassWord1234Changed'
changed = self.put(detail_url, data, expect=200, auth=creds)
creds = (creds[0], data['password'])
self.get(detail_url, expect=200, auth=creds)
# make another nobody user, and make sure they can't send any edits
obj = User.objects.create(username='new_user')
obj.set_password('new_user')
obj.save()
hacked = dict(password='asdf')
changed = self.put(detail_url, hacked, expect=403, auth=('new_user', 'new_user'))
hacked = dict(username='asdf')
changed = self.put(detail_url, hacked, expect=403, auth=('new_user', 'new_user'))
# password is not stored in plaintext
self.assertTrue(User.objects.get(pk=self.normal_django_user.pk).password != data['password'])
def test_user_created_with_password_can_login(self):
# this is something an org admin can do...
url = '/api/v1/users/'
data = dict(username='username', password='password')
data2 = dict(username='username2', password='password2')
data = self.post(url, expect=201, data=data, auth=self.get_normal_credentials())
# verify that the login works...
self.get(url, expect=200, auth=('username', 'password'))
# but a regular user cannot
data = self.post(url, expect=403, data=data2, auth=self.get_other_credentials())
# a super user can also create new users
data = self.post(url, expect=201, data=data2, auth=self.get_super_credentials())
# verify that the login works
self.get(url, expect=200, auth=('username2', 'password2'))
# verify that if you post a user with a pk, you do not alter that user's password info
mod = dict(id=1, username='change', password='change')
data = self.post(url, expect=201, data=mod, auth=self.get_super_credentials())
orig = User.objects.get(pk=1)
self.assertTrue(orig.username != 'change')
def test_password_not_shown_in_get_operations_for_list_or_detail(self):
url = '/api/v1/users/1/'
data = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertTrue('password' not in data)
url = '/api/v1/users/'
data = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertTrue('password' not in data['results'][0])
def test_user_list_filtered(self):
url = '/api/v1/users/'
data3 = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertEquals(data3['count'], 3)
data2 = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data2['count'], 2)
data1 = self.get(url, expect=200, auth=self.get_other_credentials())
self.assertEquals(data1['count'], 1)
def test_super_user_can_delete_a_user_but_only_marked_inactive(self):
url = '/api/v1/users/2/'
data = self.delete(url, expect=204, auth=self.get_super_credentials())
data = self.get(url, expect=404, auth=self.get_super_credentials())
url = '/api/v1/users/2/'
obj = User.objects.get(pk=2)
self.assertEquals(obj.is_active, False)
def test_non_org_admin_user_cannot_delete_any_user_including_himself(self):
url1 = '/api/v1/users/1/'
url2 = '/api/v1/users/2/'
url3 = '/api/v1/users/3/'
data = self.delete(url1, expect=403, auth=self.get_other_credentials())
data = self.delete(url2, expect=403, auth=self.get_other_credentials())
data = self.delete(url3, expect=403, auth=self.get_other_credentials())
def test_there_exists_an_obvious_url_where_a_user_may_find_his_user_record(self):
url = '/api/v1/me/'
data = self.get(url, expect=401, auth=None)
data = self.get(url, expect=401, auth=self.get_invalid_credentials())
data = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['results'][0]['username'], 'normal')
self.assertEquals(data['count'], 1)
data = self.get(url, expect=200, auth=self.get_other_credentials())
self.assertEquals(data['results'][0]['username'], 'other')
self.assertEquals(data['count'], 1)
data = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['results'][0]['username'], 'admin')
self.assertEquals(data['count'], 1)
def test_user_related_resources(self):
# organizations the user is a member of, should be 1
url = '/api/v1/users/2/organizations/'
data = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 1)
# also accessible via superuser
data = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['count'], 1)
# but not by other user
data = self.get(url, expect=403, auth=self.get_other_credentials())
# organizations the user is an admin of, should be 1
url = '/api/v1/users/2/admin_of_organizations/'
data = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 1)
# also accessible via superuser
data = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['count'], 1)
# but not by other user
data = self.get(url, expect=403, auth=self.get_other_credentials())
# teams the user is on, should be 0
url = '/api/v1/users/2/teams/'
data = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 0)
# also accessible via superuser
data = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['count'], 0)
# but not by other user
data = self.get(url, expect=403, auth=self.get_other_credentials())
# verify org admin can still read other user data too
url = '/api/v1/users/3/organizations/'
data = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 1)
url = '/api/v1/users/3/admin_of_organizations/'
data = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 0)
url = '/api/v1/users/3/teams/'
data = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 0)
# FIXME: add test that shows posting a user w/o id to /organizations/2/users/ can create a new one & associate
# FIXME: add test that shows posting a user w/o id to /organizations/2/admins/ can create a new one & associate
# FIXME: add test that shows posting a projects w/o id to /organizations/2/projects/ can create a new one & associate