Merge pull request #1653 from wwitzel3/issue-1482

update Access methods for superuser
This commit is contained in:
Wayne Witzel III 2016-04-22 16:06:14 -04:00
commit 2a03556729
2 changed files with 66 additions and 88 deletions

View File

@ -110,6 +110,18 @@ def check_user_access(user, model_class, action, *args, **kwargs):
return result
return False
def check_superuser(func):
'''
check_superuser is a decorator that provides a simple short circuit
for access checks. If the User object is a superuser, return True, otherwise
execute the logic of the can_access method.
'''
def wrapper(self, *args, **kwargs):
if self.user.is_superuser:
return True
return func(self, *args, **kwargs)
return wrapper
class BaseAccess(object):
'''
Base class for checking user access to a given model. Subclasses should
@ -242,10 +254,8 @@ class UserAccess(BaseAccess):
# that a user should be able to edit for themselves.
return bool(self.user == obj or self.can_admin(obj, data))
@check_superuser
def can_admin(self, obj, data):
# Admin implies changing all user fields.
if self.user.is_superuser:
return True
return Organization.objects.filter(member_role__members=obj, admin_role__members=self.user).exists()
def can_delete(self, obj):
@ -277,9 +287,8 @@ class OrganizationAccess(BaseAccess):
qs = self.model.accessible_objects(self.user, 'read_role')
return qs.select_related('created_by', 'modified_by').all()
@check_superuser
def can_change(self, obj, data):
if self.user.is_superuser:
return True
return self.user in obj.admin_role
def can_delete(self, obj):
@ -312,27 +321,25 @@ class InventoryAccess(BaseAccess):
qs = self.model.accessible_objects(self.user, 'read_role')
return qs.select_related('created_by', 'modified_by', 'organization').all()
@check_superuser
def can_read(self, obj):
if self.user.is_superuser:
return True
return self.user in obj.read_role
@check_superuser
def can_use(self, obj):
if self.user.is_superuser:
return True
return self.user in obj.use_role
@check_superuser
def can_add(self, data):
# If no data is specified, just checking for generic add permission?
if not data:
return Organization.accessible_objects(self.user, 'admin_role').exists()
if self.user.is_superuser:
return True
org_pk = get_pk_from_dict(data, 'organization')
org = get_object_or_400(Organization, pk=org_pk)
return self.user in org.admin_role
@check_superuser
def can_change(self, obj, data):
# Verify that the user has access to the new organization if moving an
# inventory to a new organization.
@ -344,6 +351,7 @@ class InventoryAccess(BaseAccess):
# Otherwise, just check for write permission.
return self.user in obj.admin_role
@check_superuser
def can_admin(self, obj, data):
# Verify that the user has access to the new organization if moving an
# inventory to a new organization.
@ -555,6 +563,7 @@ class CredentialAccess(BaseAccess):
qs = self.model.accessible_objects(self.user, 'read_role')
return qs.select_related('created_by', 'modified_by').all()
@check_superuser
def can_read(self, obj):
return self.user in obj.read_role
@ -562,14 +571,12 @@ class CredentialAccess(BaseAccess):
# Access enforced in our view where we have context enough to make a decision
return True
@check_superuser
def can_use(self, obj):
if self.user.is_superuser:
return True
return self.user in obj.use_role
@check_superuser
def can_change(self, obj, data):
if self.user.is_superuser:
return True
return self.user in obj.owner_role
def can_delete(self, obj):
@ -596,16 +603,15 @@ class TeamAccess(BaseAccess):
qs = self.model.accessible_objects(self.user, 'read_role')
return qs.select_related('created_by', 'modified_by', 'organization').all()
@check_superuser
def can_add(self, data):
if self.user.is_superuser:
org_pk = get_pk_from_dict(data, 'organization')
org = get_object_or_400(Organization, pk=org_pk)
if self.user in org.admin_role:
return True
else:
org_pk = get_pk_from_dict(data, 'organization')
org = get_object_or_400(Organization, pk=org_pk)
if self.user in org.admin_role:
return True
return False
@check_superuser
def can_change(self, obj, data):
# Prevent moving a team to a different organization.
org_pk = get_pk_from_dict(data, 'organization')
@ -640,20 +646,19 @@ class ProjectAccess(BaseAccess):
qs = self.model.accessible_objects(self.user, 'read_role')
return qs.select_related('modified_by', 'credential', 'current_job', 'last_job').all()
@check_superuser
def can_add(self, data):
if self.user.is_superuser:
return True
qs = Organization.accessible_objects(self.user, 'admin_role')
return qs.exists()
@check_superuser
def can_change(self, obj, data):
if self.user.is_superuser:
return True
return self.user in obj.admin_role
def can_delete(self, obj):
return self.can_change(obj, None)
@check_superuser
def can_start(self, obj):
return self.can_change(obj, {}) and obj.can_update
@ -674,9 +679,11 @@ class ProjectUpdateAccess(BaseAccess):
project_ids = set(self.user.get_queryset(Project).values_list('id', flat=True))
return qs.filter(project_id__in=project_ids)
@check_superuser
def can_cancel(self, obj):
return self.can_change(obj, {}) and obj.can_cancel
@check_superuser
def can_delete(self, obj):
return obj and self.user in obj.project.admin_role
@ -847,10 +854,8 @@ class JobAccess(BaseAccess):
def can_change(self, obj, data):
return obj.status == 'new' and self.can_read(obj) and self.can_add(data)
@check_superuser
def can_delete(self, obj):
# Allow org admins and superusers to delete jobs
if self.user.is_superuser:
return True
return self.user in obj.inventory.admin_role
def can_start(self, obj):
@ -1146,18 +1151,16 @@ class ScheduleAccess(BaseAccess):
UnifiedJobTemplate.objects.filter(Q(inventorysource__in=inventory_source_qs))
return qs.filter(unified_job_template__in=unified_qs)
@check_superuser
def can_read(self, obj):
if self.user.is_superuser:
return True
if obj and obj.unified_job_template:
job_class = obj.unified_job_template
return self.user.can_access(type(job_class), 'read', obj.unified_job_template)
else:
return False
@check_superuser
def can_add(self, data):
if self.user.is_superuser:
return True
pk = get_pk_from_dict(data, 'unified_job_template')
obj = get_object_or_400(UnifiedJobTemplate, pk=pk)
if obj:
@ -1165,18 +1168,16 @@ class ScheduleAccess(BaseAccess):
else:
return False
@check_superuser
def can_change(self, obj, data):
if self.user.is_superuser:
return True
if obj and obj.unified_job_template:
job_class = obj.unified_job_template
return self.user.can_access(type(job_class), 'change', job_class, None)
else:
return False
@check_superuser
def can_delete(self, obj):
if self.user.is_superuser:
return True
if obj and obj.unified_job_template:
job_class = obj.unified_job_template
return self.user.can_access(type(job_class), 'change', job_class, None)
@ -1195,25 +1196,22 @@ class NotifierAccess(BaseAccess):
return qs
return self.model.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role').all())
@check_superuser
def can_read(self, obj):
if self.user.is_superuser:
return True
if obj.organization is not None:
return self.user in obj.organization.admin_role
return False
@check_superuser
def can_add(self, data):
if self.user.is_superuser:
return True
if not data:
return Organization.accessible_objects(self.user, 'admin_role').exists()
org_pk = get_pk_from_dict(data, 'organization')
org = get_object_or_400(Organization, pk=org_pk)
return self.user in org.admin_role
@check_superuser
def can_change(self, obj, data):
if self.user.is_superuser:
return True
org_pk = get_pk_from_dict(data, 'organization')
if obj and org_pk and obj.organization.pk != org_pk:
org = get_object_or_400(Organization, pk=org_pk)
@ -1260,15 +1258,12 @@ class LabelAccess(BaseAccess):
organization__in=Organization.accessible_objects(self.user, 'read_role')
)
@check_superuser
def can_read(self, obj):
if self.user.is_superuser:
return True
return self.user in obj.organization.read_role
@check_superuser
def can_add(self, data):
if self.user.is_superuser:
return True
if not data or '_method' in data: # So the browseable API will work?
return True
@ -1276,10 +1271,8 @@ class LabelAccess(BaseAccess):
org = get_object_or_400(Organization, pk=org_pk)
return self.user in org.read_role
@check_superuser
def can_change(self, obj, data):
if self.user.is_superuser:
return True
if self.can_add(data) is False:
return False
@ -1376,26 +1369,10 @@ class CustomInventoryScriptAccess(BaseAccess):
return self.model.objects.distinct().all()
return self.model.accessible_objects(self.user, 'read_role').all()
@check_superuser
def can_read(self, obj):
if self.user.is_superuser:
return True
return self.user in obj.read_role
def can_add(self, data):
if self.user.is_superuser:
return True
return False
def can_change(self, obj, data):
if self.user.is_superuser:
return True
return False
def can_delete(self, obj):
if self.user.is_superuser:
return True
return False
class TowerSettingsAccess(BaseAccess):
'''
@ -1409,17 +1386,6 @@ class TowerSettingsAccess(BaseAccess):
model = TowerSettings
def get_queryset(self):
if self.user.is_superuser:
return self.model.objects.all()
return self.model.objects.none()
def can_change(self, obj, data):
return self.user.is_superuser
def can_delete(self, obj):
return self.user.is_superuser
class RoleAccess(BaseAccess):
'''
@ -1432,14 +1398,6 @@ class RoleAccess(BaseAccess):
model = Role
def get_queryset(self):
if self.user.is_superuser:
return self.model.objects.all()
return Role.objects.none()
def can_change(self, obj, data):
return self.user.is_superuser
def can_read(self, obj):
if not obj:
return False
@ -1463,9 +1421,8 @@ class RoleAccess(BaseAccess):
skip_sub_obj_read_check=False):
return self.can_unattach(obj, sub_obj, relationship)
@check_superuser
def can_unattach(self, obj, sub_obj, relationship):
if self.user.is_superuser:
return True
if obj.object_id and \
isinstance(obj.content_object, ResourceMixin) and \
self.user in obj.content_object.admin_role:

View File

@ -0,0 +1,21 @@
from django.contrib.auth.models import User
from awx.main.access import (
BaseAccess,
check_superuser,
)
def test_superuser(mocker):
user = mocker.MagicMock(spec=User, id=1, is_superuser=True)
access = BaseAccess(user)
can_add = check_superuser(BaseAccess.can_add)
assert can_add(access, None) is True
def test_not_superuser(mocker):
user = mocker.MagicMock(spec=User, id=1, is_superuser=False)
access = BaseAccess(user)
can_add = check_superuser(BaseAccess.can_add)
assert can_add(access, None) is False