mirror of
https://github.com/ansible/awx.git
synced 2026-01-09 23:12:08 -03:30
Streamlining RBAC layer code, adding tests for PUT operations.
This commit is contained in:
parent
8cae93c55f
commit
9237cd6176
@ -2,6 +2,7 @@ from lib.main.models import *
|
||||
from lib.main.serializers import *
|
||||
from rest_framework import permissions
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.core.exceptions import PermissionDenied
|
||||
|
||||
# FIXME: this will probably need to be subclassed by object type
|
||||
|
||||
@ -10,6 +11,7 @@ class CustomRbac(permissions.BasePermission):
|
||||
def _common_user_check(self, request):
|
||||
# no anonymous users
|
||||
if type(request.user) == AnonymousUser:
|
||||
# 401, not 403, hence no raised exception
|
||||
return False
|
||||
# superusers are always good
|
||||
if request.user.is_superuser:
|
||||
@ -17,23 +19,26 @@ class CustomRbac(permissions.BasePermission):
|
||||
# other users must have associated acom user records & be active
|
||||
acom_user = User.objects.filter(auth_user = request.user)
|
||||
if len(acom_user) != 1:
|
||||
return False
|
||||
raise PermissionDenied()
|
||||
if not acom_user[0].active:
|
||||
return False
|
||||
raise PermissionDenied()
|
||||
return True
|
||||
|
||||
def has_permission(self, request, view, obj=None):
|
||||
if not self._common_user_check(request):
|
||||
return False
|
||||
if obj is None:
|
||||
# filtering happens in the view
|
||||
return True
|
||||
else:
|
||||
# haven't tested around these confines yet
|
||||
raise Exception("FIXME")
|
||||
raise Exception("did not expect to get to this position")
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
if request.user.is_superuser:
|
||||
return True
|
||||
if not self._common_user_check(request):
|
||||
return False
|
||||
# FIXME: TODO: verify the user is actually allowed to see this resource
|
||||
if not view.permissions_check(request, obj):
|
||||
raise PermissionDenied()
|
||||
return True
|
||||
|
||||
|
||||
@ -74,9 +74,10 @@ class BaseTest(django.test.TestCase):
|
||||
method = getattr(client,method)
|
||||
response = None
|
||||
if data is not None:
|
||||
response = method(url, data=json.dumps(data))
|
||||
response = method(url, json.dumps(data), 'application/json')
|
||||
else:
|
||||
response = method(url)
|
||||
|
||||
if response.status_code == 500 and expect != 500:
|
||||
assert False, "Failed: %s" % response.content
|
||||
if expect is not None:
|
||||
@ -87,15 +88,21 @@ class BaseTest(django.test.TestCase):
|
||||
def get(self, url, expect=200, auth=None):
|
||||
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='get')
|
||||
|
||||
def post(self, url, expect=204, auth=None):
|
||||
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='post')
|
||||
def post(self, url, data, expect=204, auth=None):
|
||||
return self._generic_rest(url, data=data, expect=expect, auth=auth, method='post')
|
||||
|
||||
def put(self, url, expect=200, auth=None):
|
||||
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='put')
|
||||
def put(self, url, data, expect=200, auth=None):
|
||||
return self._generic_rest(url, data=data, expect=expect, auth=auth, method='put')
|
||||
|
||||
def delete(self, url, expect=201, auth=None):
|
||||
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='delete')
|
||||
|
||||
def get_urls(self, collection_url, auth=None):
|
||||
# TODO: this test helper function doesn't support pagination
|
||||
data = self.get(collection_url, expect=200, auth=auth)
|
||||
return [item['url'] for item in data['results']]
|
||||
|
||||
|
||||
class OrganizationsTest(BaseTest):
|
||||
|
||||
def collection(self):
|
||||
@ -156,8 +163,9 @@ class OrganizationsTest(BaseTest):
|
||||
# make sure invalid user cannot
|
||||
data = self.get(urls[0], expect=401, auth=self.get_invalid_credentials())
|
||||
|
||||
# normal user should be able to get org 0 but not org 9 (as he's not a user or admin of it)
|
||||
# normal user should be able to get org 0 and org 1 but not org 9 (as he's not a user or admin of it)
|
||||
data = self.get(urls[0], expect=200, auth=self.get_normal_credentials())
|
||||
data = self.get(urls[1], expect=200, auth=self.get_normal_credentials())
|
||||
data = self.get(urls[9], expect=403, auth=self.get_normal_credentials())
|
||||
|
||||
# other user isn't a user or admin of anything, and similarly can't get in
|
||||
@ -168,6 +176,7 @@ class OrganizationsTest(BaseTest):
|
||||
|
||||
def test_get_item_subobjects_projects(self):
|
||||
pass
|
||||
|
||||
|
||||
def test_get_item_subobjects_users(self):
|
||||
pass
|
||||
@ -188,7 +197,28 @@ class OrganizationsTest(BaseTest):
|
||||
pass
|
||||
|
||||
def test_put_item(self):
|
||||
pass
|
||||
|
||||
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
|
||||
data = self.get(urls[0], expect=401, auth=self.get_invalid_credentials())
|
||||
|
||||
# test that an unauthenticated user cannot do a put
|
||||
new_data = data.copy()
|
||||
new_data['description'] = 'updated description'
|
||||
self.put(urls[0], new_data, expect=401, auth=None)
|
||||
self.put(urls[0], new_data, expect=401, auth=self.get_invalid_credentials())
|
||||
|
||||
# user normal is an admin of org 0 and a member of org 1 so should be able to put only org 1
|
||||
self.put(urls[1], new_data, expect=403, auth=self.get_normal_credentials())
|
||||
put_result = self.put(urls[1], new_data, expect=200, auth=self.get_normal_credentials())
|
||||
|
||||
# FIXME: test the contents of the put returned object
|
||||
|
||||
# get back org 1 and see if it changed
|
||||
get_result = self.get(urls[1], data=new_data, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEquals(get_result['description'], 'updated description')
|
||||
|
||||
# super user can also put even though they aren't added to the org users or admins list
|
||||
self.put(urls[0], new_data, expect=200, auth=self.get_super_credentials())
|
||||
|
||||
def test_put_item_subobjects_projects(self):
|
||||
pass
|
||||
|
||||
@ -7,7 +7,7 @@ from django.contrib.auth.models import AnonymousUser
|
||||
from rest_framework import mixins
|
||||
from rest_framework import generics
|
||||
from rest_framework import permissions
|
||||
|
||||
import exceptions
|
||||
|
||||
class OrganizationsList(generics.ListCreateAPIView):
|
||||
|
||||
@ -25,6 +25,9 @@ class OrganizationsList(generics.ListCreateAPIView):
|
||||
return Organization.objects.filter(active = True, admins__in = [ self.request.user.application_user ]).distinct() | \
|
||||
Organization.objects.filter(active = True, users__in = [ self.request.user.application_user ]).distinct()
|
||||
|
||||
def permissions_check(self, request, obj):
|
||||
raise exceptions.NotImplementedError
|
||||
|
||||
|
||||
class OrganizationsDetail(generics.RetrieveUpdateDestroyAPIView):
|
||||
model = Organization
|
||||
@ -35,5 +38,10 @@ class OrganizationsDetail(generics.RetrieveUpdateDestroyAPIView):
|
||||
#def pre_save(self, obj):
|
||||
# obj.owner = self.request.user
|
||||
|
||||
|
||||
|
||||
def permissions_check(self, request, obj):
|
||||
admin = request.user.application_user in obj.admins.all()
|
||||
user = request.user.application_user in obj.users.all()
|
||||
if request.method == 'GET':
|
||||
return admin or user
|
||||
if request.method == 'PUT':
|
||||
return admin
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user