Updates to permissions checks (and tests), add logging around permission checks, permission-related fixes to support browsable API, work in progress on job templates API, added default logging settings.

This commit is contained in:
Chris Church
2013-05-01 14:10:42 -04:00
parent b6e7d964c2
commit ef92fe3960
11 changed files with 320 additions and 153 deletions

View File

@@ -20,5 +20,5 @@ from lib.main.tests.inventory import InventoryTest
from lib.main.tests.projects import ProjectsTest
from lib.main.tests.commands import *
from lib.main.tests.tasks import RunJobTest
from lib.main.tests.jobs import JobsTest # similar to above, but mostly focused on REST API
from lib.main.tests.jobs import *

View File

@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with Ansible Commander. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import datetime
import json
import os
@@ -21,7 +22,7 @@ import shutil
import tempfile
from django.conf import settings
from django.contrib.auth.models import User as DjangoUser
from django.contrib.auth.models import User
import django.test
from django.test.client import Client
from lib.main.models import *
@@ -36,6 +37,8 @@ class BaseTestMixin(object):
super(BaseTestMixin, self).setUp()
self.object_ctr = 0
self._temp_project_dirs = []
self._current_auth = None
self._user_passwords = {}
def tearDown(self):
super(BaseTestMixin, self).tearDown()
@@ -43,14 +46,30 @@ class BaseTestMixin(object):
if os.path.exists(project_dir):
shutil.rmtree(project_dir, True)
def make_user(self, username, password, super_user=False):
django_user = None
def make_user(self, username, password=None, super_user=False):
user = None
password = password or username
if super_user:
django_user = DjangoUser.objects.create_superuser(username, "%s@example.com", password)
user = User.objects.create_superuser(username, "%s@example.com", password)
else:
django_user = DjangoUser.objects.create_user(username, "%s@example.com", password)
self.assertTrue(django_user.auth_token)
return django_user
user = User.objects.create_user(username, "%s@example.com", password)
self.assertTrue(user.auth_token)
self._user_passwords[user.username] = password
return user
@contextlib.contextmanager
def current_user(self, user_or_username, password=None):
try:
if isinstance(user_or_username, User):
username = user_or_username.username
else:
username = user_or_username
password = password or self._user_passwords.get(username)
previous_auth = self._current_auth
self._current_auth = (username, password)
yield
finally:
self._current_auth = previous_auth
def make_organizations(self, created_by, count=1):
results = []
@@ -121,16 +140,17 @@ class BaseTestMixin(object):
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None):
assert method is not None
method = method.lower()
if method not in [ 'get', 'delete' ]:
method_name = method.lower()
if method_name not in ('options', 'head', 'get', 'delete'):
assert data is not None
client = Client()
auth = auth or self._current_auth
if auth:
if isinstance(auth, (list, tuple)):
client.login(username=auth[0], password=auth[1])
elif isinstance(auth, basestring):
client = Client(HTTP_AUTHORIZATION='Token %s' % auth)
method = getattr(client,method)
method = getattr(client, method_name)
response = None
if data is not None:
response = method(url, json.dumps(data), 'application/json')
@@ -141,11 +161,19 @@ class BaseTestMixin(object):
assert False, "Failed: %s" % response.content
if expect is not None:
assert response.status_code == expect, "expected status %s, got %s for url=%s as auth=%s: %s" % (expect, response.status_code, url, auth, response.content)
if response.status_code not in [ 202, 204, 400, 405, 409 ]:
if method_name == 'head':
self.assertFalse(response.content)
if response.status_code not in [ 202, 204, 400, 405, 409 ] and method_name != 'head':
# no JSON responses in these at least for now, 400/409 should probably return some (FIXME)
return json.loads(response.content)
else:
return None
def options(self, url, expect=200, auth=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='options')
def head(self, url, expect=200, auth=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='head')
def get(self, url, expect=200, auth=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='get')

View File

@@ -16,18 +16,23 @@
import datetime
import json
from django.contrib.auth.models import User as DjangoUser
import django.test
from django.core.urlresolvers import reverse
from django.test.client import Client
from lib.main.models import *
from lib.main.tests.base import BaseTest
class JobsTest(BaseTest):
__all__ = ['JobTemplateTest', 'JobTest']
def collection(self):
# not really used
return '/api/v1/job_templates/'
TEST_PLAYBOOK = '''- hosts: mygroup
gather_facts: false
tasks:
- name: woohoo
command: test 1 = 1
'''
class BaseJobTest(BaseTest):
''''''
def get_other2_credentials(self):
return ('other2', 'other2')
@@ -36,45 +41,66 @@ class JobsTest(BaseTest):
return ('nobody', 'nobody')
def setUp(self):
super(JobsTest, self).setUp()
super(BaseJobTest, self).setUp()
# Users
self.setup_users()
self.other2_django_user = self.make_user('other2', 'other2')
self.nobody_django_user = self.make_user('nobody', 'nobody')
self.other2_django_user = User.objects.create(username='other2')
self.other2_django_user.set_password('other2')
self.other2_django_user.save()
self.nobody_django_user = User.objects.create(username='nobody')
self.nobody_django_user.set_password('nobody')
self.nobody_django_user.save()
# Organization
self.organization = Organization.objects.create(
name = 'engineering',
created_by = self.normal_django_user
)
self.inventory = Inventory.objects.create(
name = 'prod',
organization = self.organization,
created_by = self.normal_django_user
name='engineering',
created_by=self.normal_django_user,
)
self.organization.admins.add(self.normal_django_user)
self.organization.users.add(self.normal_django_user)
self.organization.users.add(self.other_django_user)
self.organization.users.add(self.other2_django_user)
self.group_a = Group.objects.create(
name = 'group1',
inventory = self.inventory,
created_by = self.normal_django_user
# Team
self.team = self.organization.teams.create(
name='Tigger',
created_by=self.normal_django_user,
)
self.team = Team.objects.create(
name = 'Tigger',
created_by = self.normal_django_user
)
self.team.users.add(self.other_django_user)
self.team.users.add(self.other2_django_user)
# Project
self.project = self.make_projects(self.normal_django_user, 1,
playbook_content='')[0]
playbook_content=TEST_PLAYBOOK)[0]
self.organization.projects.add(self.project)
# Inventory
self.inventory = self.organization.inventories.create(
name = 'prod',
created_by = self.normal_django_user,
)
self.group_a = self.inventory.groups.create(
name = 'group1',
created_by = self.normal_django_user
)
self.host_a = self.inventory.hosts.create(
name = '127.0.0.1',
created_by = self.normal_django_user
)
self.host_b = self.inventory.hosts.create(
name = '127.0.0.2',
created_by = self.normal_django_user
)
self.group_a.hosts.add(self.host_a)
self.group_a.hosts.add(self.host_b)
# Credentials
self.user_credential = self.other_django_user.credentials.create(
ssh_key_data = 'xxx',
created_by = self.normal_django_user,
)
self.team_credential = self.team.credentials.create(
ssh_key_data = 'xxx',
created_by = self.normal_django_user,
)
# other django user is on the project team and can deploy
self.permission1 = Permission.objects.create(
inventory = self.inventory,
@@ -82,8 +108,7 @@ class JobsTest(BaseTest):
team = self.team,
permission_type = PERM_INVENTORY_DEPLOY,
created_by = self.normal_django_user
)
)
# individual permission granted to other2 user, can run check mode
self.permission2 = Permission.objects.create(
inventory = self.inventory,
@@ -93,58 +118,105 @@ class JobsTest(BaseTest):
created_by = self.normal_django_user
)
self.host_a = Host.objects.create(
name = '127.0.0.1',
inventory = self.inventory,
created_by = self.normal_django_user
)
self.host_b = Host.objects.create(
name = '127.0.0.2',
inventory = self.inventory,
created_by = self.normal_django_user
)
self.group_a.hosts.add(self.host_a)
self.group_a.hosts.add(self.host_b)
self.group_a.save()
self.credential = Credential.objects.create(
ssh_key_data = 'xxx',
created_by = self.normal_django_user,
user = self.other_django_user
)
self.credential2 = Credential.objects.create(
ssh_key_data = 'xxx',
created_by = self.normal_django_user,
team = self.team,
)
self.organization.projects.add(self.project)
self.organization.admins.add(self.normal_django_user)
self.organization.users.add(self.normal_django_user)
self.organization.save()
self.template1 = JobTemplate.objects.create(
self.job_template1 = JobTemplate.objects.create(
name = 'job-run',
job_type = 'run',
inventory = self.inventory,
credential = self.credential,
credential = self.user_credential,
project = self.project,
created_by = self.normal_django_user,
)
self.template2 = JobTemplate.objects.create(
self.job_template2 = JobTemplate.objects.create(
name = 'job-check',
job_type = 'check',
inventory = self.inventory,
credential = self.credential,
credential = self.team_credential,
project = self.project,
created_by = self.normal_django_user,
)
class JobTemplateTest(BaseJobTest):
def setUp(self):
super(JobTemplateTest, self).setUp()
def test_job_template_list(self):
url = reverse('main:job_templates_list')
response = self.get(url, expect=401)
with self.current_user(self.normal_django_user):
response = self.get(url, expect=200)
self.assertTrue(response['count'], JobTemplate.objects.count())
# FIXME: Test that user can only see job templates from own organization.
# org admin can add job template
data = dict(
name = 'job-foo',
credential = self.user_credential.pk,
inventory = self.inventory.pk,
project = self.project.pk,
job_type = PERM_INVENTORY_DEPLOY,
)
with self.current_user(self.normal_django_user):
response = self.post(url, data, expect=201)
detail_url = reverse('main:job_templates_detail',
args=(response['id'],))
self.assertEquals(response['url'], detail_url)
# other_django_user is on a team that can deploy, so can create both
# deploy and check type job templates
with self.current_user(self.other_django_user):
data['name'] = 'job-foo2'
response = self.post(url, data, expect=201)
data['name'] = 'job-foo3'
data['job_type'] = PERM_INVENTORY_CHECK
response = self.post(url, data, expect=201)
# other2_django_user has individual permissions to run check mode,
# but not deploy
with self.current_user(self.other2_django_user):
data['name'] = 'job-foo4'
#data['credential'] = self.user_credential.pk
#response = self.post(url, data, expect=201)
data['name'] = 'job-foo5'
data['job_type'] = PERM_INVENTORY_DEPLOY
response = self.post(url, data, expect=403)
# nobody user can't even run check mode
with self.current_user(self.nobody_django_user):
data['name'] = 'job-foo5'
data['job_type'] = PERM_INVENTORY_CHECK
response = self.post(url, data, expect=403)
data['job_type'] = PERM_INVENTORY_DEPLOY
response = self.post(url, data, expect=403)
def test_job_template_detail(self):
return # FIXME
# verify we can also get the job template record
got = self.get(url, expect=200, auth=self.get_other2_credentials())
self.failUnlessEqual(got['url'], '/api/v1/job_templates/6/')
# TODO: add more tests that show
# the method used to START a JobTemplate follow the exact same permissions as those to create it ...
# and that jobs come back nicely serialized with related resources and so on ...
# that we can drill all the way down and can get at host failure lists, etc ...
class JobTest(BaseJobTest):
def setUp(self):
super(JobTest, self).setUp()
def test_mainline(self):
return # FIXME
# job templates
data = self.get('/api/v1/job_templates/', expect=401)
data = self.get('/api/v1/job_templates/', expect=200, auth=self.get_normal_credentials())

View File

@@ -18,6 +18,7 @@ import datetime
import json
from django.contrib.auth.models import User as DjangoUser
from django.core.urlresolvers import reverse
import django.test
from django.test.client import Client
from lib.main.models import *
@@ -64,17 +65,26 @@ class OrganizationsTest(BaseTest):
self.organizations[1].admins.add(self.normal_django_user)
def test_get_list(self):
url = reverse('main:organizations_list')
# no credentials == 401
self.get(self.collection(), expect=401)
self.options(url, expect=401)
self.head(url, expect=401)
self.get(url, expect=401)
# wrong credentials == 401
self.get(self.collection(), expect=401, auth=self.get_invalid_credentials())
with self.current_user(self.get_invalid_credentials()):
self.options(url, expect=401)
self.head(url, expect=401)
self.get(url, expect=401)
# superuser credentials == 200, full list
data = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
self.check_pagination_and_size(data, 10, previous=None, next=None)
[self.assertTrue(key in data['results'][0]) for key in ['name', 'description', 'url', 'creation_date', 'id' ]]
with self.current_user(self.super_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
data = self.get(url, expect=200)
self.check_pagination_and_size(data, 10, previous=None, next=None)
[self.assertTrue(key in data['results'][0]) for key in ['name', 'description', 'url', 'creation_date', 'id' ]]
# check that the related URL functionality works
related = data['results'][0]['related']