mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 10:00:01 -03:30
User deletion modification, and overriding the URL for the user objects.
This commit is contained in:
parent
129380e119
commit
f02aad42c0
@ -112,13 +112,25 @@ class BaseDetail(generics.RetrieveUpdateDestroyAPIView):
|
||||
obj = self.model.objects.get(pk=kwargs['pk'])
|
||||
if not request.user.is_superuser and not self.delete_permissions_check(request, obj):
|
||||
raise PermissionDenied()
|
||||
obj.name = "_deleted_%s_%s" % (str(datetime.time()), obj.name)
|
||||
obj.active = False
|
||||
obj.save()
|
||||
if isinstance(obj, CommonModel):
|
||||
obj.name = "_deleted_%s_%s" % (str(datetime.time()), obj.name)
|
||||
obj.active = False
|
||||
obj.save()
|
||||
elif type(obj) == User:
|
||||
obj.username = "_deleted_%s_%s" % (str(datetime.time()), obj.username)
|
||||
obj.is_active = False
|
||||
obj.save()
|
||||
else:
|
||||
raise Exception("InternalError: destroy() not implemented yet for %s" % obj)
|
||||
return HttpResponse(status=204)
|
||||
|
||||
def delete_permissions_check(self, request, obj):
|
||||
return self.__class__.model.can_user_delete(request.user, obj)
|
||||
if isinstance(obj, CommonModel):
|
||||
return self.__class__.model.can_user_delete(request.user, obj)
|
||||
elif isinstance(obj, User):
|
||||
return UserHelper.can_user_delete(request.user, obj)
|
||||
raise PermissionDenied()
|
||||
|
||||
|
||||
def item_permissions_check(self, request, obj):
|
||||
|
||||
|
||||
@ -57,7 +57,6 @@ class EditHelper(object):
|
||||
changed[f] = (left, right)
|
||||
return changed
|
||||
|
||||
|
||||
class UserHelper(object):
|
||||
|
||||
# fields that the user themselves cannot edit, but are not actually read only
|
||||
@ -78,6 +77,13 @@ class UserHelper(object):
|
||||
''' a user can be read if they are on the same team or can be administrated '''
|
||||
matching_teams = user.teams.filter(users__in = [ user ]).count()
|
||||
return matching_teams or cls.can_user_administrate(user, obj)
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
matching_orgs = len(set(obj.organizations.all()) & set(user.admin_of_organizations.all()))
|
||||
return matching_orgs
|
||||
|
||||
|
||||
class CommonModel(models.Model):
|
||||
|
||||
@ -56,16 +56,16 @@ class CustomRbac(permissions.BasePermission):
|
||||
raise Exception("did not expect to get to this position")
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
if request.user.is_superuser:
|
||||
return True
|
||||
if not self._common_user_check(request):
|
||||
return False
|
||||
if type(obj) == User:
|
||||
if isinstance(obj, User):
|
||||
if not obj.is_active:
|
||||
raise Http404()
|
||||
else:
|
||||
if not obj.active:
|
||||
raise Http404()
|
||||
if request.user.is_superuser:
|
||||
return True
|
||||
if not self._common_user_check(request):
|
||||
return False
|
||||
if not view.item_permissions_check(request, obj):
|
||||
raise PermissionDenied()
|
||||
return True
|
||||
|
||||
@ -20,6 +20,7 @@ from django.contrib.auth.models import User
|
||||
from lib.main.models import *
|
||||
from rest_framework import serializers, pagination
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.core.serializers import json
|
||||
import lib.urls
|
||||
|
||||
class BaseSerializer(serializers.ModelSerializer):
|
||||
@ -79,22 +80,23 @@ class ProjectSerializer(BaseSerializer):
|
||||
|
||||
class UserSerializer(BaseSerializer):
|
||||
|
||||
# FIXME: *** this is really about exposing the Django auth_user via REST so it may require
|
||||
# some custom save hooks in the view.
|
||||
|
||||
# add the URL and related resources
|
||||
url = serializers.CharField(source='get_absolute_url', read_only=True)
|
||||
url = serializers.SerializerMethodField('get_absolute_url_override')
|
||||
related = serializers.SerializerMethodField('get_related')
|
||||
|
||||
class Meta:
|
||||
model = User
|
||||
# FIXME: make sure is_active is and is_superuser is read only
|
||||
fields = ('url', 'id', 'username', 'first_name', 'last_name', 'email', 'is_active', 'is_superuser')
|
||||
fields = ('url', 'id', 'username', 'first_name', 'last_name', 'email', 'is_active', 'is_superuser', 'related')
|
||||
|
||||
def get_related(self, obj):
|
||||
# FIXME: add related lookups?
|
||||
return dict()
|
||||
|
||||
def get_absolute_url_override(self, obj):
|
||||
import lib.urls
|
||||
return reverse(lib.urls.views_UsersDetail, args=(obj.pk,))
|
||||
|
||||
|
||||
class TagSerializer(BaseSerializer):
|
||||
|
||||
# add the URL and related resources
|
||||
|
||||
@ -105,7 +105,6 @@ class UsersTest(BaseTest):
|
||||
orig = User.objects.get(pk=1)
|
||||
self.assertTrue(orig.username != 'change')
|
||||
|
||||
|
||||
def test_password_not_shown_in_get_operations_for_list_or_detail(self):
|
||||
url = '/api/v1/users/1/'
|
||||
data = self.get(url, expect=200, auth=self.get_super_credentials())
|
||||
@ -116,17 +115,29 @@ class UsersTest(BaseTest):
|
||||
self.assertTrue('password' not in data['results'][0])
|
||||
|
||||
def test_user_list_filtered(self):
|
||||
# I can see a user if I'm on a team with them, am their org admin, am a superuser, or am them
|
||||
#self.assertTrue(False)
|
||||
pass
|
||||
url = '/api/v1/users/'
|
||||
data3 = self.get(url, expect=200, auth=self.get_super_credentials())
|
||||
self.assertEquals(data3['count'], 3)
|
||||
data2 = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEquals(data2['count'], 2)
|
||||
data1 = self.get(url, expect=200, auth=self.get_other_credentials())
|
||||
self.assertEquals(data1['count'], 1)
|
||||
|
||||
def test_super_user_can_delete_a_user_but_only_marked_inactive(self):
|
||||
#self.assertTrue(False)
|
||||
pass
|
||||
url = '/api/v1/users/2/'
|
||||
data = self.delete(url, expect=204, auth=self.get_super_credentials())
|
||||
data = self.get(url, expect=404, auth=self.get_super_credentials())
|
||||
url = '/api/v1/users/2/'
|
||||
obj = User.objects.get(pk=2)
|
||||
self.assertEquals(obj.is_active, False)
|
||||
|
||||
def test_non_org_admin_user_cannot_delete_any_user_including_himself(self):
|
||||
#self.assertTrue(False)
|
||||
pass
|
||||
url1 = '/api/v1/users/1/'
|
||||
url2 = '/api/v1/users/2/'
|
||||
url3 = '/api/v1/users/3/'
|
||||
data = self.delete(url1, expect=403, auth=self.get_other_credentials())
|
||||
data = self.delete(url2, expect=403, auth=self.get_other_credentials())
|
||||
data = self.delete(url3, expect=403, auth=self.get_other_credentials())
|
||||
|
||||
def test_there_exists_an_obvious_url_where_a_user_may_find_his_user_record(self):
|
||||
#self.assertTrue(False)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user