mirror of
https://github.com/ansible/awx.git
synced 2026-03-16 00:17:29 -02:30
Deleted objects are not really deleted but now appear to be via REST.
This commit is contained in:
@@ -3,6 +3,7 @@ from lib.main.serializers import *
|
|||||||
from rest_framework import permissions
|
from rest_framework import permissions
|
||||||
from django.contrib.auth.models import AnonymousUser
|
from django.contrib.auth.models import AnonymousUser
|
||||||
from django.core.exceptions import PermissionDenied
|
from django.core.exceptions import PermissionDenied
|
||||||
|
from django.http import Http404
|
||||||
|
|
||||||
# FIXME: this will probably need to be subclassed by object type
|
# FIXME: this will probably need to be subclassed by object type
|
||||||
|
|
||||||
@@ -45,6 +46,8 @@ class CustomRbac(permissions.BasePermission):
|
|||||||
return True
|
return True
|
||||||
if not self._common_user_check(request):
|
if not self._common_user_check(request):
|
||||||
return False
|
return False
|
||||||
|
if not obj.active:
|
||||||
|
raise Http404()
|
||||||
if not view.item_permissions_check(request, obj):
|
if not view.item_permissions_check(request, obj):
|
||||||
raise PermissionDenied()
|
raise PermissionDenied()
|
||||||
return True
|
return True
|
||||||
|
|||||||
@@ -66,7 +66,8 @@ class BaseTest(django.test.TestCase):
|
|||||||
|
|
||||||
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None):
|
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None):
|
||||||
assert method is not None
|
assert method is not None
|
||||||
if method != 'get':
|
method = method.lower()
|
||||||
|
if method not in [ 'get', 'delete' ]:
|
||||||
assert data is not None
|
assert data is not None
|
||||||
client = Client()
|
client = Client()
|
||||||
if auth:
|
if auth:
|
||||||
@@ -82,8 +83,10 @@ class BaseTest(django.test.TestCase):
|
|||||||
assert False, "Failed: %s" % response.content
|
assert False, "Failed: %s" % response.content
|
||||||
if expect is not None:
|
if expect is not None:
|
||||||
assert response.status_code == expect, "expected status %s, got %s for url=%s as auth=%s: %s" % (expect, response.status_code, url, auth, response.content)
|
assert response.status_code == expect, "expected status %s, got %s for url=%s as auth=%s: %s" % (expect, response.status_code, url, auth, response.content)
|
||||||
data = json.loads(response.content)
|
if response.status_code != 204:
|
||||||
return data
|
return json.loads(response.content)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def get(self, url, expect=200, auth=None):
|
def get(self, url, expect=200, auth=None):
|
||||||
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='get')
|
return self._generic_rest(url, data=None, expect=expect, auth=auth, method='get')
|
||||||
@@ -251,7 +254,26 @@ class OrganizationsTest(BaseTest):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def test_delete_item(self):
|
def test_delete_item(self):
|
||||||
pass
|
|
||||||
|
# first get some urls
|
||||||
|
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
|
||||||
|
urldata1 = self.get(urls[1], auth=self.get_super_credentials())
|
||||||
|
|
||||||
|
# check authentication -- admins of the org and superusers can delete objects only
|
||||||
|
self.delete(urls[0], expect=401, auth=None)
|
||||||
|
self.delete(urls[0], expect=401, auth=self.get_invalid_credentials())
|
||||||
|
self.delete(urls[8], expect=403, auth=self.get_normal_credentials())
|
||||||
|
self.delete(urls[1], expect=204, auth=self.get_normal_credentials())
|
||||||
|
self.delete(urls[0], expect=204, auth=self.get_super_credentials())
|
||||||
|
|
||||||
|
# check that when we have deleted an object it comes back 404 via GET
|
||||||
|
# but that it's still in the database as inactive
|
||||||
|
self.get(urls[1], expect=404, auth=self.get_normal_credentials())
|
||||||
|
org1 = Organization.objects.get(pk=urldata1['id'])
|
||||||
|
self.assertEquals(org1.active, False)
|
||||||
|
|
||||||
|
# also check that DELETE on the collection doesn't work
|
||||||
|
self.delete(self.collection(), expect=405, auth=self.get_super_credentials())
|
||||||
|
|
||||||
def test_delete_item_subobjects_projects(self):
|
def test_delete_item_subobjects_projects(self):
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -4,26 +4,14 @@ from lib.main.models import *
|
|||||||
from lib.main.serializers import *
|
from lib.main.serializers import *
|
||||||
from lib.main.rbac import *
|
from lib.main.rbac import *
|
||||||
from django.contrib.auth.models import AnonymousUser
|
from django.contrib.auth.models import AnonymousUser
|
||||||
|
from django.core.exceptions import PermissionDenied
|
||||||
from rest_framework import mixins
|
from rest_framework import mixins
|
||||||
from rest_framework import generics
|
from rest_framework import generics
|
||||||
from rest_framework import permissions
|
from rest_framework import permissions
|
||||||
import exceptions
|
import exceptions
|
||||||
|
import datetime
|
||||||
|
|
||||||
class OrganizationsList(generics.ListCreateAPIView):
|
class BaseList(generics.ListCreateAPIView):
|
||||||
|
|
||||||
model = Organization
|
|
||||||
serializer_class = OrganizationSerializer
|
|
||||||
permission_classes = (CustomRbac,)
|
|
||||||
|
|
||||||
#def pre_save(self, obj):
|
|
||||||
# obj.owner = self.request.user
|
|
||||||
|
|
||||||
def get_queryset(self):
|
|
||||||
|
|
||||||
if self.request.user.is_superuser:
|
|
||||||
return Organization.objects.filter(active=True)
|
|
||||||
return Organization.objects.filter(active = True, admins__in = [ self.request.user.application_user ]).distinct() | \
|
|
||||||
Organization.objects.filter(active = True, users__in = [ self.request.user.application_user ]).distinct()
|
|
||||||
|
|
||||||
def list_permissions_check(self, request, obj=None):
|
def list_permissions_check(self, request, obj=None):
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
@@ -34,8 +22,40 @@ class OrganizationsList(generics.ListCreateAPIView):
|
|||||||
return False
|
return False
|
||||||
raise exceptions.NotImplementedError
|
raise exceptions.NotImplementedError
|
||||||
|
|
||||||
|
def get_queryset(self):
|
||||||
|
return self._get_queryset().filter(active=True)
|
||||||
|
|
||||||
|
class BaseDetail(generics.RetrieveUpdateDestroyAPIView):
|
||||||
|
|
||||||
|
def destroy(self, request, *args, **kwargs):
|
||||||
|
# somewhat lame that delete has to call it's own permissions check
|
||||||
|
obj = self.model.objects.get(pk=kwargs['pk'])
|
||||||
|
if not request.user.is_superuser and not self.delete_permissions_check(request, obj):
|
||||||
|
raise PermissionDenied()
|
||||||
|
obj.name = "_deleted_%s_%s" % (str(datetime.time()), obj.name)
|
||||||
|
obj.active = False
|
||||||
|
obj.save()
|
||||||
|
return HttpResponse(status=204)
|
||||||
|
|
||||||
|
class OrganizationsList(BaseList):
|
||||||
|
|
||||||
|
model = Organization
|
||||||
|
serializer_class = OrganizationSerializer
|
||||||
|
permission_classes = (CustomRbac,)
|
||||||
|
|
||||||
|
def _get_queryset(self):
|
||||||
|
|
||||||
|
if self.request.user.is_superuser:
|
||||||
|
return Organization.objects.filter(active=True)
|
||||||
|
|
||||||
|
return Organization.objects.filter(
|
||||||
|
admins__in = [ self.request.user.application_user ]
|
||||||
|
).distinct() | Organization.objects.filter(
|
||||||
|
users__in = [ self.request.user.application_user ]
|
||||||
|
).distinct()
|
||||||
|
|
||||||
|
class OrganizationsDetail(BaseDetail):
|
||||||
|
|
||||||
class OrganizationsDetail(generics.RetrieveUpdateDestroyAPIView):
|
|
||||||
model = Organization
|
model = Organization
|
||||||
serializer_class = OrganizationSerializer
|
serializer_class = OrganizationSerializer
|
||||||
|
|
||||||
@@ -45,12 +65,15 @@ class OrganizationsDetail(generics.RetrieveUpdateDestroyAPIView):
|
|||||||
# obj.owner = self.request.user
|
# obj.owner = self.request.user
|
||||||
|
|
||||||
def item_permissions_check(self, request, obj):
|
def item_permissions_check(self, request, obj):
|
||||||
admin = request.user.application_user in obj.admins.all()
|
is_admin = request.user.application_user in obj.admins.all()
|
||||||
user = request.user.application_user in obj.users.all()
|
is_user = request.user.application_user in obj.users.all()
|
||||||
|
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
return admin or user
|
return is_admin or is_user
|
||||||
if request.method == 'PUT':
|
elif request.method in [ 'PUT' ]:
|
||||||
return admin
|
return is_admin
|
||||||
|
return False
|
||||||
|
|
||||||
|
def delete_permissions_check(self, request, obj):
|
||||||
|
return request.user.application_user in obj.admins.all()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user