Permissions-related updates and fixes, more tests, better handling of any user input that would previously generate a server error.

This commit is contained in:
Chris Church
2013-07-22 21:20:28 -04:00
parent 5ef8a48600
commit bc49627203
15 changed files with 558 additions and 326 deletions

View File

@@ -124,13 +124,15 @@ class BaseTestMixin(object):
self.normal_password = 'normal'
self.other_username = 'other'
self.other_password = 'other'
self.nobody_username = 'nobody'
self.nobody_password = 'nobody'
self.super_django_user = self.make_user(self.super_username, self.super_password, super_user=True)
if not just_super_user:
self.normal_django_user = self.make_user(self.normal_username, self.normal_password, super_user=False)
self.other_django_user = self.make_user(self.other_username, self.other_password, super_user=False)
self.nobody_django_user = self.make_user(self.nobody_username, self.nobody_password, super_user=False)
def get_super_credentials(self):
return (self.super_username, self.super_password)
@@ -141,6 +143,10 @@ class BaseTestMixin(object):
def get_other_credentials(self):
return (self.other_username, self.other_password)
def get_nobody_credentials(self):
# here is a user without any permissions...
return (self.nobody_username, self.nobody_password)
def get_invalid_credentials(self):
return ('random', 'combination')
@@ -239,6 +245,39 @@ class BaseTestMixin(object):
data = self.get(collection_url, expect=200, auth=auth)
return [item['url'] for item in data['results']]
def check_invalid_auth(self, url, data=None, methods=None):
'''
Check various methods of accessing the given URL with invalid
authentication credentials.
'''
data = data or {}
methods = methods or ('options', 'head', 'get')
for auth in [(None,), ('invalid', 'password')]:
with self.current_user(*auth):
for method in methods:
f = getattr(self, method)
if method in ('post', 'put', 'patch'):
f(url, data, expect=401)
else:
f(url, expect=401)
def check_get_list(self, url, user, qs, fields=None, expect=200):
'''
Check that the given list view URL returns results for the given user
that match the given queryset.
'''
with self.current_user(user):
self.options(url, expect=expect)
self.head(url, expect=expect)
response = self.get(url, expect=expect)
if expect != 200:
return
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
if fields:
for obj in response['results']:
self.assertTrue(set(obj.keys()) <= set(fields))
class BaseTest(BaseTestMixin, django.test.TestCase):
'''
Base class for unit tests.

View File

@@ -33,15 +33,137 @@ class InventoryTest(BaseTest):
permission_type = 'read'
)
# and make one more user that won't be a part of any org, just for negative-access testing
def test_get_inventory_list(self):
url = reverse('main:inventory_list')
qs = Inventory.objects.filter(active=True).distinct()
self.nobody_django_user = User.objects.create(username='nobody')
self.nobody_django_user.set_password('nobody')
self.nobody_django_user.save()
# Check list view with invalid authentication.
self.check_invalid_auth(url)
def get_nobody_credentials(self):
# here is a user without any permissions...
return ('nobody', 'nobody')
# a super user can list all inventories
self.check_get_list(url, self.super_django_user, qs)
# an org admin can list inventories but is filtered to what he adminsters
normal_qs = qs.filter(organization__admins__in=[self.normal_django_user])
self.check_get_list(url, self.normal_django_user, normal_qs)
# a user who is on a team who has a read permissions on an inventory can see filtered inventories
other_qs = qs.filter(permissions__user__in=[self.other_django_user])
self.check_get_list(url, self.other_django_user, other_qs)
# a regular user not part of anything cannot see any inventories
nobody_qs = qs.none()
self.check_get_list(url, self.nobody_django_user, nobody_qs)
def test_post_inventory_list(self):
url = reverse('main:inventory_list')
# Check post to list view with invalid authentication.
new_inv_0 = dict(name='inventory-c', description='baz', organization=self.organizations[0].pk)
self.check_invalid_auth(url, new_inv_0, methods=('post',))
# a super user can create inventory
new_inv_1 = dict(name='inventory-c', description='baz', organization=self.organizations[0].pk)
new_id = max(Inventory.objects.values_list('pk', flat=True)) + 1
with self.current_user(self.super_django_user):
data = self.post(url, data=new_inv_1, expect=201)
self.assertEquals(data['id'], new_id)
# an org admin of any org can create inventory, if it is one of his organizations
# the organization parameter is required!
new_inv_incomplete = dict(name='inventory-d', description='baz')
new_inv_not_my_org = dict(name='inventory-d', description='baz', organization=self.organizations[2].pk)
new_inv_my_org = dict(name='inventory-d', description='baz', organization=self.organizations[0].pk)
with self.current_user(self.normal_django_user):
data = self.post(url, data=new_inv_incomplete, expect=400)
data = self.post(url, data=new_inv_not_my_org, expect=403)
data = self.post(url, data=new_inv_my_org, expect=201)
# a regular user cannot create inventory
new_inv_denied = dict(name='inventory-e', description='glorp', organization=self.organizations[0].pk)
with self.current_user(self.other_django_user):
data = self.post(url, data=new_inv_denied, expect=403)
def test_get_inventory_detail(self):
url_a = reverse('main:inventory_detail', args=(self.inventory_a.pk,))
url_b = reverse('main:inventory_detail', args=(self.inventory_b.pk,))
# Check detail view with invalid authentication.
self.check_invalid_auth(url_a)
self.check_invalid_auth(url_b)
# a super user can get inventory records
with self.current_user(self.super_django_user):
data = self.get(url_a, expect=200)
self.assertEquals(data['name'], 'inventory-a')
# an org admin can get inventory records for his orgs only
with self.current_user(self.normal_django_user):
data = self.get(url_a, expect=200)
self.assertEquals(data['name'], 'inventory-a')
data = self.get(url_b, expect=403)
# a user who is on a team who has read permissions on an inventory can see inventory records
with self.current_user(self.other_django_user):
data = self.get(url_a, expect=403)
data = self.get(url_b, expect=200)
self.assertEquals(data['name'], 'inventory-b')
# a regular user cannot read any inventory records
with self.current_user(self.nobody_django_user):
data = self.get(url_a, expect=403)
data = self.get(url_b, expect=403)
def test_put_inventory_detail(self):
url_a = reverse('main:inventory_detail', args=(self.inventory_a.pk,))
url_b = reverse('main:inventory_detail', args=(self.inventory_b.pk,))
# Check put to detail view with invalid authentication.
self.check_invalid_auth(url_a, methods=('put',))
self.check_invalid_auth(url_b, methods=('put',))
# a super user can update inventory records
with self.current_user(self.super_django_user):
data = self.get(url_a, expect=200)
data['name'] = 'inventory-a-update1'
self.put(url_a, data, expect=200)
data = self.get(url_b, expect=200)
data['name'] = 'inventory-b-update1'
self.put(url_b, data, expect=200)
# an org admin can update inventory records for his orgs only.
with self.current_user(self.normal_django_user):
data = self.get(url_a, expect=200)
data['name'] = 'inventory-a-update2'
self.put(url_a, data, expect=200)
self.put(url_b, data, expect=403)
# a user who is on a team who has read permissions on an inventory can
# see inventory records, but not update.
with self.current_user(self.other_django_user):
data = self.get(url_b, expect=200)
data['name'] = 'inventory-b-update3'
self.put(url_b, data, expect=403)
# a regular user cannot update any inventory records
with self.current_user(self.nobody_django_user):
self.put(url_a, {}, expect=403)
self.put(url_b, {}, expect=403)
# a superuser can reassign an inventory to another organization.
with self.current_user(self.super_django_user):
data = self.get(url_b, expect=200)
self.assertEqual(data['organization'], self.organizations[1].pk)
data['organization'] = self.organizations[0].pk
self.put(url_b, data, expect=200)
# a normal user can't reassign an inventory to an organization where
# he isn't an admin.
with self.current_user(self.normal_django_user):
data = self.get(url_a, expect=200)
self.assertEqual(data['organization'], self.organizations[0].pk)
data['organization'] = self.organizations[1].pk
self.put(url_a, data, expect=403)
def test_main_line(self):
@@ -53,57 +175,57 @@ class InventoryTest(BaseTest):
groups = reverse('main:group_list')
# a super user can list inventories
data = self.get(inventories, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['count'], 2)
#data = self.get(inventories, expect=200, auth=self.get_super_credentials())
#self.assertEquals(data['count'], 2)
# an org admin can list inventories but is filtered to what he adminsters
data = self.get(inventories, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 1)
#data = self.get(inventories, expect=200, auth=self.get_normal_credentials())
#self.assertEquals(data['count'], 1)
# a user who is on a team who has a read permissions on an inventory can see filtered inventories
data = self.get(inventories, expect=200, auth=self.get_other_credentials())
self.assertEquals(data['count'], 1)
#data = self.get(inventories, expect=200, auth=self.get_other_credentials())
#self.assertEquals(data['count'], 1)
# a regular user not part of anything cannot see any inventories
data = self.get(inventories, expect=200, auth=self.get_nobody_credentials())
self.assertEquals(data['count'], 0)
#data = self.get(inventories, expect=200, auth=self.get_nobody_credentials())
#self.assertEquals(data['count'], 0)
# a super user can get inventory records
data = self.get(inventories_1, expect=200, auth=self.get_super_credentials())
self.assertEquals(data['name'], 'inventory-a')
#data = self.get(inventories_1, expect=200, auth=self.get_super_credentials())
#self.assertEquals(data['name'], 'inventory-a')
# an org admin can get inventory records
data = self.get(inventories_1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['name'], 'inventory-a')
#data = self.get(inventories_1, expect=200, auth=self.get_normal_credentials())
#self.assertEquals(data['name'], 'inventory-a')
# a user who is on a team who has read permissions on an inventory can see inventory records
data = self.get(inventories_1, expect=403, auth=self.get_other_credentials())
data = self.get(inventories_2, expect=200, auth=self.get_other_credentials())
self.assertEquals(data['name'], 'inventory-b')
#data = self.get(inventories_1, expect=403, auth=self.get_other_credentials())
#data = self.get(inventories_2, expect=200, auth=self.get_other_credentials())
#self.assertEquals(data['name'], 'inventory-b')
# a regular user cannot read any inventory records
data = self.get(inventories_1, expect=403, auth=self.get_nobody_credentials())
data = self.get(inventories_2, expect=403, auth=self.get_nobody_credentials())
#data = self.get(inventories_1, expect=403, auth=self.get_nobody_credentials())
#data = self.get(inventories_2, expect=403, auth=self.get_nobody_credentials())
# a super user can create inventory
new_inv_1 = dict(name='inventory-c', description='baz', organization=self.organizations[0].pk)
new_id = max(Inventory.objects.values_list('pk', flat=True)) + 1
data = self.post(inventories, data=new_inv_1, expect=201, auth=self.get_super_credentials())
self.assertEquals(data['id'], new_id)
#new_inv_1 = dict(name='inventory-c', description='baz', organization=self.organizations[0].pk)
#new_id = max(Inventory.objects.values_list('pk', flat=True)) + 1
#data = self.post(inventories, data=new_inv_1, expect=201, auth=self.get_super_credentials())
#self.assertEquals(data['id'], new_id)
# an org admin of any org can create inventory, if it is one of his organizations
# the organization parameter is required!
new_inv_incomplete = dict(name='inventory-d', description='baz')
data = self.post(inventories, data=new_inv_incomplete, expect=400, auth=self.get_normal_credentials())
new_inv_not_my_org = dict(name='inventory-d', description='baz', organization=self.organizations[2].pk)
#new_inv_incomplete = dict(name='inventory-d', description='baz')
#data = self.post(inventories, data=new_inv_incomplete, expect=400, auth=self.get_normal_credentials())
#new_inv_not_my_org = dict(name='inventory-d', description='baz', organization=self.organizations[2].pk)
data = self.post(inventories, data=new_inv_not_my_org, expect=403, auth=self.get_normal_credentials())
new_inv_my_org = dict(name='inventory-d', description='baz', organization=self.organizations[0].pk)
data = self.post(inventories, data=new_inv_my_org, expect=201, auth=self.get_normal_credentials())
#data = self.post(inventories, data=new_inv_not_my_org, expect=403, auth=self.get_normal_credentials())
#new_inv_my_org = dict(name='inventory-d', description='baz', organization=self.organizations[0].pk)
#data = self.post(inventories, data=new_inv_my_org, expect=201, auth=self.get_normal_credentials())
# a regular user cannot create inventory
new_inv_denied = dict(name='inventory-e', description='glorp', organization=self.organizations[0].pk)
data = self.post(inventories, data=new_inv_denied, expect=403, auth=self.get_other_credentials())
#new_inv_denied = dict(name='inventory-e', description='glorp', organization=self.organizations[0].pk)
#data = self.post(inventories, data=new_inv_denied, expect=403, auth=self.get_other_credentials())
# a super user can add hosts (but inventory ID is required)
inv = Inventory.objects.create(

View File

@@ -12,7 +12,7 @@ import uuid
from django.contrib.auth.models import User as DjangoUser
from django.conf import settings
from django.core.urlresolvers import reverse
from django.db import transaction
from django.db.models import Q
import django.test
from django.test.client import Client
from django.test.utils import override_settings
@@ -427,57 +427,49 @@ class BaseJobTestMixin(BaseTestMixin):
super(BaseJobTestMixin, self).setUp()
self.populate()
def _test_invalid_creds(self, url, data=None, methods=None):
data = data or {}
methods = methods or ('options', 'head', 'get')
for auth in [(None,), ('invalid', 'password')]:
with self.current_user(*auth):
for method in methods:
f = getattr(self, method)
if method in ('post', 'put', 'patch'):
f(url, data, expect=401)
else:
f(url, expect=401)
class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
JOB_TEMPLATE_FIELDS = ('id', 'url', 'related', 'summary_fields', 'created',
'name', 'description', 'job_type', 'inventory',
'project', 'playbook', 'credential', 'forks',
'limit', 'verbosity', 'extra_vars', 'job_tags',
'host_config_key',)
def test_get_job_template_list(self):
url = reverse('main:job_template_list')
qs = JobTemplate.objects.distinct()
fields = self.JOB_TEMPLATE_FIELDS
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self.check_invalid_auth(url)
# sue's credentials (superuser) == 200, full list
with self.current_user(self.user_sue):
self.options(url)
self.head(url)
response = self.get(url)
qs = JobTemplate.objects.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# Sue's credentials (superuser) == 200, full list
self.check_get_list(url, self.user_sue, qs, fields)
# Alex's credentials (admin of all orgs) == 200, full list
self.check_get_list(url, self.user_alex, qs, fields)
# FIXME: Check individual job template result fields.
# alex's credentials (admin of all orgs) == 200, full list
with self.current_user(self.user_alex):
self.options(url)
self.head(url)
response = self.get(url)
qs = JobTemplate.objects.all()
self.check_pagination_and_size(response, qs.count())
self.check_list_ids(response, qs)
# bob's credentials (admin of eng, user of ops) == 200, all from
# Bob's credentials (admin of eng, user of ops) == 200, all from
# engineering and operations.
with self.current_user(self.user_bob):
self.options(url)
self.head(url)
response = self.get(url)
qs = JobTemplate.objects.filter(
inventory__organization__in=[self.org_eng, self.org_ops],
bob_qs = qs.filter(
Q(project__organizations__admins__in=[self.user_bob]) |
Q(project__teams__users__in=[self.user_bob]),
)
#self.check_pagination_and_size(response, qs.count())
#self.check_list_ids(response, qs)
self.check_get_list(url, self.user_bob, bob_qs, fields)
# Chuck's credentials (admin of eng) == 200, all from engineering.
chuck_qs = qs.filter(
Q(project__organizations__admins__in=[self.user_chuck]) |
Q(project__teams__users__in=[self.user_chuck]),
)
self.check_get_list(url, self.user_chuck, chuck_qs, fields)
# Doug's credentials (user of eng) == 200, none?.
doug_qs = qs.filter(
Q(project__organizations__admins__in=[self.user_doug]) |
Q(project__teams__users__in=[self.user_doug]),
)
self.check_get_list(url, self.user_doug, doug_qs, fields)
# FIXME: Check with other credentials.
@@ -492,7 +484,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
)
# Test with no auth and with invalid login.
self._test_invalid_creds(url, data, methods=('post',))
self.check_invalid_auth(url, data, methods=('post',))
# sue can always add job templates.
with self.current_user(self.user_sue):
@@ -541,7 +533,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
url = reverse('main:job_template_detail', args=(jt.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self.check_invalid_auth(url)
# sue can read the job template detail.
with self.current_user(self.user_sue):
@@ -562,7 +554,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
url = reverse('main:job_template_detail', args=(jt.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url, methods=('put',))# 'patch'))
self.check_invalid_auth(url, methods=('put',))# 'patch'))
# sue can update the job template detail.
with self.current_user(self.user_sue):
@@ -579,7 +571,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
url = reverse('main:job_template_jobs_list', args=(jt.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self.check_invalid_auth(url)
# sue can read the job template job list.
with self.current_user(self.user_sue):
@@ -601,7 +593,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
)
# Test with no auth and with invalid login.
self._test_invalid_creds(url, data, methods=('post',))
self.check_invalid_auth(url, data, methods=('post',))
# sue can create a new job from the template.
with self.current_user(self.user_sue):
@@ -615,7 +607,7 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
url = reverse('main:job_list')
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self.check_invalid_auth(url)
# sue's credentials (superuser) == 200, full list
with self.current_user(self.user_sue):
@@ -641,7 +633,7 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
)
# Test with no auth and with invalid login.
self._test_invalid_creds(url, data, methods=('post',))
self.check_invalid_auth(url, data, methods=('post',))
# sue can create a new job without a template.
with self.current_user(self.user_sue):
@@ -663,7 +655,7 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
url = reverse('main:job_detail', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self.check_invalid_auth(url)
# sue can read the job detail.
with self.current_user(self.user_sue):
@@ -679,7 +671,7 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
url = reverse('main:job_detail', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url, methods=('put',))# 'patch'))
self.check_invalid_auth(url, methods=('put',))# 'patch'))
# sue can update the job detail only if the job is new.
self.assertEqual(job.status, 'new')
@@ -780,8 +772,8 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
url = reverse('main:job_start', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self._test_invalid_creds(url, methods=('post',))
self.check_invalid_auth(url)
self.check_invalid_auth(url, methods=('post',))
# Sue can start a job (when passwords are already saved) as long as the
# status is new. Reverse list so "new" will be last.
@@ -867,8 +859,8 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
url = reverse('main:job_cancel', args=(job.pk,))
# Test with no auth and with invalid login.
self._test_invalid_creds(url)
self._test_invalid_creds(url, methods=('post',))
self.check_invalid_auth(url)
self.check_invalid_auth(url, methods=('post',))
# sue can cancel the job, but only when it is pending or running.
for status in [x[0] for x in Job.STATUS_CHOICES]:

View File

@@ -84,14 +84,6 @@ class ProjectsTest(BaseTest):
self.team1.users.add(self.normal_django_user)
self.team2.users.add(self.other_django_user)
self.nobody_django_user = User.objects.create(username='nobody')
self.nobody_django_user.set_password('nobody')
self.nobody_django_user.save()
def get_nobody_credentials(self):
# here is a user without any permissions...
return ('nobody', 'nobody')
def test_playbooks(self):
def write_test_file(project, name, content):
full_path = os.path.join(project.get_project_path(), name)
@@ -395,11 +387,13 @@ class ProjectsTest(BaseTest):
self.get(url, expect=401, auth=self.get_invalid_credentials())
self.get(url, expect=403, auth=self.get_nobody_credentials())
other.organizations.add(Organization.objects.get(pk=self.organizations[1].pk))
other.save()
# Normal user can only see some teams that other user is a part of,
# since normal user is not an admin of that organization.
my_teams1 = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(my_teams1['count'], 1)
# Other user should be able to see all his own teams.
my_teams2 = self.get(url, expect=200, auth=self.get_other_credentials())
self.assertEqual(my_teams1['count'], 2)
self.assertEqual(my_teams1, my_teams2)
self.assertEqual(my_teams2['count'], 2)
# =====================================================================
# USER PROJECTS
@@ -511,14 +505,14 @@ class ProjectsTest(BaseTest):
team_url = reverse('main:team_credentials_list', args=(cred_put_t['team'],))
self.post(team_url, data=cred_put_t, expect=204, auth=self.get_normal_credentials())
# can remove credentials from a user (via disassociate)
# can remove credentials from a user (via disassociate) - this will delete the credential.
cred_put_u['disassociate'] = 1
url = cred_put_u['url']
user_url = reverse('main:user_credentials_list', args=(cred_put_u['user'],))
self.post(user_url, data=cred_put_u, expect=204, auth=self.get_normal_credentials())
# can delete a credential directly -- probably won't be used too often
data = self.delete(url, expect=204, auth=self.get_other_credentials())
#data = self.delete(url, expect=204, auth=self.get_other_credentials())
data = self.delete(url, expect=404, auth=self.get_other_credentials())
# =====================================================================

View File

@@ -142,7 +142,7 @@ class UsersTest(BaseTest):
def test_user_list_filtered(self):
url = reverse('main:user_list')
data3 = self.get(url, expect=200, auth=self.get_super_credentials())
self.assertEquals(data3['count'], 3)
self.assertEquals(data3['count'], 4)
data2 = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data2['count'], 2)
data1 = self.get(url, expect=200, auth=self.get_other_credentials())