mirror of
https://github.com/ansible/awx.git
synced 2026-03-04 18:21:03 -03:30
update Access methods for superuser
This commit is contained in:
@@ -110,6 +110,18 @@ def check_user_access(user, model_class, action, *args, **kwargs):
|
|||||||
return result
|
return result
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def check_superuser(func):
|
||||||
|
'''
|
||||||
|
check_superuser is a decorator that provides a simple short circuit
|
||||||
|
for access checks. If the User object is a superuser, return True, otherwise
|
||||||
|
execute the logic of the can_access method.
|
||||||
|
'''
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
if self.user.is_superuser:
|
||||||
|
return True
|
||||||
|
return func(self, *args, **kwargs)
|
||||||
|
return wrapper
|
||||||
|
|
||||||
class BaseAccess(object):
|
class BaseAccess(object):
|
||||||
'''
|
'''
|
||||||
Base class for checking user access to a given model. Subclasses should
|
Base class for checking user access to a given model. Subclasses should
|
||||||
@@ -242,10 +254,8 @@ class UserAccess(BaseAccess):
|
|||||||
# that a user should be able to edit for themselves.
|
# that a user should be able to edit for themselves.
|
||||||
return bool(self.user == obj or self.can_admin(obj, data))
|
return bool(self.user == obj or self.can_admin(obj, data))
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_admin(self, obj, data):
|
def can_admin(self, obj, data):
|
||||||
# Admin implies changing all user fields.
|
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return Organization.objects.filter(member_role__members=obj, admin_role__members=self.user).exists()
|
return Organization.objects.filter(member_role__members=obj, admin_role__members=self.user).exists()
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
@@ -277,9 +287,8 @@ class OrganizationAccess(BaseAccess):
|
|||||||
qs = self.model.accessible_objects(self.user, 'read_role')
|
qs = self.model.accessible_objects(self.user, 'read_role')
|
||||||
return qs.select_related('created_by', 'modified_by').all()
|
return qs.select_related('created_by', 'modified_by').all()
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.admin_role
|
return self.user in obj.admin_role
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
@@ -312,27 +321,25 @@ class InventoryAccess(BaseAccess):
|
|||||||
qs = self.model.accessible_objects(self.user, 'read_role')
|
qs = self.model.accessible_objects(self.user, 'read_role')
|
||||||
return qs.select_related('created_by', 'modified_by', 'organization').all()
|
return qs.select_related('created_by', 'modified_by', 'organization').all()
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.read_role
|
return self.user in obj.read_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_use(self, obj):
|
def can_use(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.use_role
|
return self.user in obj.use_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
# If no data is specified, just checking for generic add permission?
|
# If no data is specified, just checking for generic add permission?
|
||||||
if not data:
|
if not data:
|
||||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
|
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
return self.user in org.admin_role
|
return self.user in org.admin_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Verify that the user has access to the new organization if moving an
|
# Verify that the user has access to the new organization if moving an
|
||||||
# inventory to a new organization.
|
# inventory to a new organization.
|
||||||
@@ -344,6 +351,7 @@ class InventoryAccess(BaseAccess):
|
|||||||
# Otherwise, just check for write permission.
|
# Otherwise, just check for write permission.
|
||||||
return self.user in obj.admin_role
|
return self.user in obj.admin_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_admin(self, obj, data):
|
def can_admin(self, obj, data):
|
||||||
# Verify that the user has access to the new organization if moving an
|
# Verify that the user has access to the new organization if moving an
|
||||||
# inventory to a new organization.
|
# inventory to a new organization.
|
||||||
@@ -555,6 +563,7 @@ class CredentialAccess(BaseAccess):
|
|||||||
qs = self.model.accessible_objects(self.user, 'read_role')
|
qs = self.model.accessible_objects(self.user, 'read_role')
|
||||||
return qs.select_related('created_by', 'modified_by').all()
|
return qs.select_related('created_by', 'modified_by').all()
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
return self.user in obj.read_role
|
return self.user in obj.read_role
|
||||||
|
|
||||||
@@ -562,14 +571,12 @@ class CredentialAccess(BaseAccess):
|
|||||||
# Access enforced in our view where we have context enough to make a decision
|
# Access enforced in our view where we have context enough to make a decision
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_use(self, obj):
|
def can_use(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.use_role
|
return self.user in obj.use_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.owner_role
|
return self.user in obj.owner_role
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
@@ -596,16 +603,15 @@ class TeamAccess(BaseAccess):
|
|||||||
qs = self.model.accessible_objects(self.user, 'read_role')
|
qs = self.model.accessible_objects(self.user, 'read_role')
|
||||||
return qs.select_related('created_by', 'modified_by', 'organization').all()
|
return qs.select_related('created_by', 'modified_by', 'organization').all()
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if self.user.is_superuser:
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
|
if self.user in org.admin_role:
|
||||||
return True
|
return True
|
||||||
else:
|
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
|
||||||
if self.user in org.admin_role:
|
|
||||||
return True
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
# Prevent moving a team to a different organization.
|
# Prevent moving a team to a different organization.
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
@@ -640,20 +646,19 @@ class ProjectAccess(BaseAccess):
|
|||||||
qs = self.model.accessible_objects(self.user, 'read_role')
|
qs = self.model.accessible_objects(self.user, 'read_role')
|
||||||
return qs.select_related('modified_by', 'credential', 'current_job', 'last_job').all()
|
return qs.select_related('modified_by', 'credential', 'current_job', 'last_job').all()
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
qs = Organization.accessible_objects(self.user, 'admin_role')
|
qs = Organization.accessible_objects(self.user, 'admin_role')
|
||||||
return qs.exists()
|
return qs.exists()
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.admin_role
|
return self.user in obj.admin_role
|
||||||
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
return self.can_change(obj, None)
|
return self.can_change(obj, None)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_start(self, obj):
|
def can_start(self, obj):
|
||||||
return self.can_change(obj, {}) and obj.can_update
|
return self.can_change(obj, {}) and obj.can_update
|
||||||
|
|
||||||
@@ -674,9 +679,11 @@ class ProjectUpdateAccess(BaseAccess):
|
|||||||
project_ids = set(self.user.get_queryset(Project).values_list('id', flat=True))
|
project_ids = set(self.user.get_queryset(Project).values_list('id', flat=True))
|
||||||
return qs.filter(project_id__in=project_ids)
|
return qs.filter(project_id__in=project_ids)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_cancel(self, obj):
|
def can_cancel(self, obj):
|
||||||
return self.can_change(obj, {}) and obj.can_cancel
|
return self.can_change(obj, {}) and obj.can_cancel
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
return obj and self.user in obj.project.admin_role
|
return obj and self.user in obj.project.admin_role
|
||||||
|
|
||||||
@@ -847,10 +854,8 @@ class JobAccess(BaseAccess):
|
|||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
return obj.status == 'new' and self.can_read(obj) and self.can_add(data)
|
return obj.status == 'new' and self.can_read(obj) and self.can_add(data)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
# Allow org admins and superusers to delete jobs
|
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.inventory.admin_role
|
return self.user in obj.inventory.admin_role
|
||||||
|
|
||||||
def can_start(self, obj):
|
def can_start(self, obj):
|
||||||
@@ -1146,18 +1151,16 @@ class ScheduleAccess(BaseAccess):
|
|||||||
UnifiedJobTemplate.objects.filter(Q(inventorysource__in=inventory_source_qs))
|
UnifiedJobTemplate.objects.filter(Q(inventorysource__in=inventory_source_qs))
|
||||||
return qs.filter(unified_job_template__in=unified_qs)
|
return qs.filter(unified_job_template__in=unified_qs)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
if obj and obj.unified_job_template:
|
if obj and obj.unified_job_template:
|
||||||
job_class = obj.unified_job_template
|
job_class = obj.unified_job_template
|
||||||
return self.user.can_access(type(job_class), 'read', obj.unified_job_template)
|
return self.user.can_access(type(job_class), 'read', obj.unified_job_template)
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
pk = get_pk_from_dict(data, 'unified_job_template')
|
pk = get_pk_from_dict(data, 'unified_job_template')
|
||||||
obj = get_object_or_400(UnifiedJobTemplate, pk=pk)
|
obj = get_object_or_400(UnifiedJobTemplate, pk=pk)
|
||||||
if obj:
|
if obj:
|
||||||
@@ -1165,18 +1168,16 @@ class ScheduleAccess(BaseAccess):
|
|||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
if obj and obj.unified_job_template:
|
if obj and obj.unified_job_template:
|
||||||
job_class = obj.unified_job_template
|
job_class = obj.unified_job_template
|
||||||
return self.user.can_access(type(job_class), 'change', job_class, None)
|
return self.user.can_access(type(job_class), 'change', job_class, None)
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
if obj and obj.unified_job_template:
|
if obj and obj.unified_job_template:
|
||||||
job_class = obj.unified_job_template
|
job_class = obj.unified_job_template
|
||||||
return self.user.can_access(type(job_class), 'change', job_class, None)
|
return self.user.can_access(type(job_class), 'change', job_class, None)
|
||||||
@@ -1195,25 +1196,22 @@ class NotifierAccess(BaseAccess):
|
|||||||
return qs
|
return qs
|
||||||
return self.model.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role').all())
|
return self.model.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role').all())
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
if obj.organization is not None:
|
if obj.organization is not None:
|
||||||
return self.user in obj.organization.admin_role
|
return self.user in obj.organization.admin_role
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
if not data:
|
if not data:
|
||||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
return self.user in org.admin_role
|
return self.user in org.admin_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
org_pk = get_pk_from_dict(data, 'organization')
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
if obj and org_pk and obj.organization.pk != org_pk:
|
if obj and org_pk and obj.organization.pk != org_pk:
|
||||||
org = get_object_or_400(Organization, pk=org_pk)
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
@@ -1260,15 +1258,12 @@ class LabelAccess(BaseAccess):
|
|||||||
organization__in=Organization.accessible_objects(self.user, 'read_role')
|
organization__in=Organization.accessible_objects(self.user, 'read_role')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.organization.read_role
|
return self.user in obj.organization.read_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
|
|
||||||
if not data or '_method' in data: # So the browseable API will work?
|
if not data or '_method' in data: # So the browseable API will work?
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -1276,10 +1271,8 @@ class LabelAccess(BaseAccess):
|
|||||||
org = get_object_or_400(Organization, pk=org_pk)
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
return self.user in org.read_role
|
return self.user in org.read_role
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
|
|
||||||
if self.can_add(data) is False:
|
if self.can_add(data) is False:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -1376,26 +1369,10 @@ class CustomInventoryScriptAccess(BaseAccess):
|
|||||||
return self.model.objects.distinct().all()
|
return self.model.objects.distinct().all()
|
||||||
return self.model.accessible_objects(self.user, 'read_role').all()
|
return self.model.accessible_objects(self.user, 'read_role').all()
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return self.user in obj.read_role
|
return self.user in obj.read_role
|
||||||
|
|
||||||
def can_add(self, data):
|
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def can_delete(self, obj):
|
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class TowerSettingsAccess(BaseAccess):
|
class TowerSettingsAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
@@ -1409,17 +1386,6 @@ class TowerSettingsAccess(BaseAccess):
|
|||||||
|
|
||||||
model = TowerSettings
|
model = TowerSettings
|
||||||
|
|
||||||
def get_queryset(self):
|
|
||||||
if self.user.is_superuser:
|
|
||||||
return self.model.objects.all()
|
|
||||||
return self.model.objects.none()
|
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
|
||||||
return self.user.is_superuser
|
|
||||||
|
|
||||||
def can_delete(self, obj):
|
|
||||||
return self.user.is_superuser
|
|
||||||
|
|
||||||
|
|
||||||
class RoleAccess(BaseAccess):
|
class RoleAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
@@ -1432,14 +1398,6 @@ class RoleAccess(BaseAccess):
|
|||||||
|
|
||||||
model = Role
|
model = Role
|
||||||
|
|
||||||
def get_queryset(self):
|
|
||||||
if self.user.is_superuser:
|
|
||||||
return self.model.objects.all()
|
|
||||||
return Role.objects.none()
|
|
||||||
|
|
||||||
def can_change(self, obj, data):
|
|
||||||
return self.user.is_superuser
|
|
||||||
|
|
||||||
def can_read(self, obj):
|
def can_read(self, obj):
|
||||||
if not obj:
|
if not obj:
|
||||||
return False
|
return False
|
||||||
@@ -1463,9 +1421,8 @@ class RoleAccess(BaseAccess):
|
|||||||
skip_sub_obj_read_check=False):
|
skip_sub_obj_read_check=False):
|
||||||
return self.can_unattach(obj, sub_obj, relationship)
|
return self.can_unattach(obj, sub_obj, relationship)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_unattach(self, obj, sub_obj, relationship):
|
def can_unattach(self, obj, sub_obj, relationship):
|
||||||
if self.user.is_superuser:
|
|
||||||
return True
|
|
||||||
if obj.object_id and \
|
if obj.object_id and \
|
||||||
isinstance(obj.content_object, ResourceMixin) and \
|
isinstance(obj.content_object, ResourceMixin) and \
|
||||||
self.user in obj.content_object.admin_role:
|
self.user in obj.content_object.admin_role:
|
||||||
|
|||||||
21
awx/main/tests/unit/test_access.py
Normal file
21
awx/main/tests/unit/test_access.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
from django.contrib.auth.models import User
|
||||||
|
from awx.main.access import (
|
||||||
|
BaseAccess,
|
||||||
|
check_superuser,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_superuser(mocker):
|
||||||
|
user = mocker.MagicMock(spec=User, id=1, is_superuser=True)
|
||||||
|
access = BaseAccess(user)
|
||||||
|
|
||||||
|
can_add = check_superuser(BaseAccess.can_add)
|
||||||
|
assert can_add(access, None) is True
|
||||||
|
|
||||||
|
def test_not_superuser(mocker):
|
||||||
|
user = mocker.MagicMock(spec=User, id=1, is_superuser=False)
|
||||||
|
access = BaseAccess(user)
|
||||||
|
|
||||||
|
can_add = check_superuser(BaseAccess.can_add)
|
||||||
|
assert can_add(access, None) is False
|
||||||
|
|
||||||
Reference in New Issue
Block a user