diff --git a/awx/main/access.py b/awx/main/access.py index 7b28b89ae8..ae99592afe 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -110,6 +110,18 @@ def check_user_access(user, model_class, action, *args, **kwargs): return result return False +def check_superuser(func): + ''' + check_superuser is a decorator that provides a simple short circuit + for access checks. If the User object is a superuser, return True, otherwise + execute the logic of the can_access method. + ''' + def wrapper(self, *args, **kwargs): + if self.user.is_superuser: + return True + return func(self, *args, **kwargs) + return wrapper + class BaseAccess(object): ''' Base class for checking user access to a given model. Subclasses should @@ -242,10 +254,8 @@ class UserAccess(BaseAccess): # that a user should be able to edit for themselves. return bool(self.user == obj or self.can_admin(obj, data)) + @check_superuser def can_admin(self, obj, data): - # Admin implies changing all user fields. - if self.user.is_superuser: - return True return Organization.objects.filter(member_role__members=obj, admin_role__members=self.user).exists() def can_delete(self, obj): @@ -277,9 +287,8 @@ class OrganizationAccess(BaseAccess): qs = self.model.accessible_objects(self.user, 'read_role') return qs.select_related('created_by', 'modified_by').all() + @check_superuser def can_change(self, obj, data): - if self.user.is_superuser: - return True return self.user in obj.admin_role def can_delete(self, obj): @@ -312,27 +321,25 @@ class InventoryAccess(BaseAccess): qs = self.model.accessible_objects(self.user, 'read_role') return qs.select_related('created_by', 'modified_by', 'organization').all() + @check_superuser def can_read(self, obj): - if self.user.is_superuser: - return True return self.user in obj.read_role + @check_superuser def can_use(self, obj): - if self.user.is_superuser: - return True return self.user in obj.use_role + @check_superuser def can_add(self, data): # If no data is specified, just checking for generic add permission? if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() - if self.user.is_superuser: - return True org_pk = get_pk_from_dict(data, 'organization') org = get_object_or_400(Organization, pk=org_pk) return self.user in org.admin_role + @check_superuser def can_change(self, obj, data): # Verify that the user has access to the new organization if moving an # inventory to a new organization. @@ -344,6 +351,7 @@ class InventoryAccess(BaseAccess): # Otherwise, just check for write permission. return self.user in obj.admin_role + @check_superuser def can_admin(self, obj, data): # Verify that the user has access to the new organization if moving an # inventory to a new organization. @@ -555,6 +563,7 @@ class CredentialAccess(BaseAccess): qs = self.model.accessible_objects(self.user, 'read_role') return qs.select_related('created_by', 'modified_by').all() + @check_superuser def can_read(self, obj): return self.user in obj.read_role @@ -562,14 +571,12 @@ class CredentialAccess(BaseAccess): # Access enforced in our view where we have context enough to make a decision return True + @check_superuser def can_use(self, obj): - if self.user.is_superuser: - return True return self.user in obj.use_role + @check_superuser def can_change(self, obj, data): - if self.user.is_superuser: - return True return self.user in obj.owner_role def can_delete(self, obj): @@ -596,16 +603,15 @@ class TeamAccess(BaseAccess): qs = self.model.accessible_objects(self.user, 'read_role') return qs.select_related('created_by', 'modified_by', 'organization').all() + @check_superuser def can_add(self, data): - if self.user.is_superuser: + org_pk = get_pk_from_dict(data, 'organization') + org = get_object_or_400(Organization, pk=org_pk) + if self.user in org.admin_role: return True - else: - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - if self.user in org.admin_role: - return True return False + @check_superuser def can_change(self, obj, data): # Prevent moving a team to a different organization. org_pk = get_pk_from_dict(data, 'organization') @@ -640,20 +646,19 @@ class ProjectAccess(BaseAccess): qs = self.model.accessible_objects(self.user, 'read_role') return qs.select_related('modified_by', 'credential', 'current_job', 'last_job').all() + @check_superuser def can_add(self, data): - if self.user.is_superuser: - return True qs = Organization.accessible_objects(self.user, 'admin_role') return qs.exists() + @check_superuser def can_change(self, obj, data): - if self.user.is_superuser: - return True return self.user in obj.admin_role def can_delete(self, obj): return self.can_change(obj, None) + @check_superuser def can_start(self, obj): return self.can_change(obj, {}) and obj.can_update @@ -674,9 +679,11 @@ class ProjectUpdateAccess(BaseAccess): project_ids = set(self.user.get_queryset(Project).values_list('id', flat=True)) return qs.filter(project_id__in=project_ids) + @check_superuser def can_cancel(self, obj): return self.can_change(obj, {}) and obj.can_cancel + @check_superuser def can_delete(self, obj): return obj and self.user in obj.project.admin_role @@ -847,10 +854,8 @@ class JobAccess(BaseAccess): def can_change(self, obj, data): return obj.status == 'new' and self.can_read(obj) and self.can_add(data) + @check_superuser def can_delete(self, obj): - # Allow org admins and superusers to delete jobs - if self.user.is_superuser: - return True return self.user in obj.inventory.admin_role def can_start(self, obj): @@ -1146,18 +1151,16 @@ class ScheduleAccess(BaseAccess): UnifiedJobTemplate.objects.filter(Q(inventorysource__in=inventory_source_qs)) return qs.filter(unified_job_template__in=unified_qs) + @check_superuser def can_read(self, obj): - if self.user.is_superuser: - return True if obj and obj.unified_job_template: job_class = obj.unified_job_template return self.user.can_access(type(job_class), 'read', obj.unified_job_template) else: return False + @check_superuser def can_add(self, data): - if self.user.is_superuser: - return True pk = get_pk_from_dict(data, 'unified_job_template') obj = get_object_or_400(UnifiedJobTemplate, pk=pk) if obj: @@ -1165,18 +1168,16 @@ class ScheduleAccess(BaseAccess): else: return False + @check_superuser def can_change(self, obj, data): - if self.user.is_superuser: - return True if obj and obj.unified_job_template: job_class = obj.unified_job_template return self.user.can_access(type(job_class), 'change', job_class, None) else: return False + @check_superuser def can_delete(self, obj): - if self.user.is_superuser: - return True if obj and obj.unified_job_template: job_class = obj.unified_job_template return self.user.can_access(type(job_class), 'change', job_class, None) @@ -1195,25 +1196,22 @@ class NotifierAccess(BaseAccess): return qs return self.model.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role').all()) + @check_superuser def can_read(self, obj): - if self.user.is_superuser: - return True if obj.organization is not None: return self.user in obj.organization.admin_role return False + @check_superuser def can_add(self, data): - if self.user.is_superuser: - return True if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() org_pk = get_pk_from_dict(data, 'organization') org = get_object_or_400(Organization, pk=org_pk) return self.user in org.admin_role + @check_superuser def can_change(self, obj, data): - if self.user.is_superuser: - return True org_pk = get_pk_from_dict(data, 'organization') if obj and org_pk and obj.organization.pk != org_pk: org = get_object_or_400(Organization, pk=org_pk) @@ -1260,15 +1258,12 @@ class LabelAccess(BaseAccess): organization__in=Organization.accessible_objects(self.user, 'read_role') ) + @check_superuser def can_read(self, obj): - if self.user.is_superuser: - return True return self.user in obj.organization.read_role + @check_superuser def can_add(self, data): - if self.user.is_superuser: - return True - if not data or '_method' in data: # So the browseable API will work? return True @@ -1276,10 +1271,8 @@ class LabelAccess(BaseAccess): org = get_object_or_400(Organization, pk=org_pk) return self.user in org.read_role + @check_superuser def can_change(self, obj, data): - if self.user.is_superuser: - return True - if self.can_add(data) is False: return False @@ -1376,26 +1369,10 @@ class CustomInventoryScriptAccess(BaseAccess): return self.model.objects.distinct().all() return self.model.accessible_objects(self.user, 'read_role').all() + @check_superuser def can_read(self, obj): - if self.user.is_superuser: - return True return self.user in obj.read_role - def can_add(self, data): - if self.user.is_superuser: - return True - return False - - def can_change(self, obj, data): - if self.user.is_superuser: - return True - return False - - def can_delete(self, obj): - if self.user.is_superuser: - return True - return False - class TowerSettingsAccess(BaseAccess): ''' @@ -1409,17 +1386,6 @@ class TowerSettingsAccess(BaseAccess): model = TowerSettings - def get_queryset(self): - if self.user.is_superuser: - return self.model.objects.all() - return self.model.objects.none() - - def can_change(self, obj, data): - return self.user.is_superuser - - def can_delete(self, obj): - return self.user.is_superuser - class RoleAccess(BaseAccess): ''' @@ -1432,14 +1398,6 @@ class RoleAccess(BaseAccess): model = Role - def get_queryset(self): - if self.user.is_superuser: - return self.model.objects.all() - return Role.objects.none() - - def can_change(self, obj, data): - return self.user.is_superuser - def can_read(self, obj): if not obj: return False @@ -1463,9 +1421,8 @@ class RoleAccess(BaseAccess): skip_sub_obj_read_check=False): return self.can_unattach(obj, sub_obj, relationship) + @check_superuser def can_unattach(self, obj, sub_obj, relationship): - if self.user.is_superuser: - return True if obj.object_id and \ isinstance(obj.content_object, ResourceMixin) and \ self.user in obj.content_object.admin_role: diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py new file mode 100644 index 0000000000..4885f334cd --- /dev/null +++ b/awx/main/tests/unit/test_access.py @@ -0,0 +1,21 @@ +from django.contrib.auth.models import User +from awx.main.access import ( + BaseAccess, + check_superuser, +) + + +def test_superuser(mocker): + user = mocker.MagicMock(spec=User, id=1, is_superuser=True) + access = BaseAccess(user) + + can_add = check_superuser(BaseAccess.can_add) + assert can_add(access, None) is True + +def test_not_superuser(mocker): + user = mocker.MagicMock(spec=User, id=1, is_superuser=False) + access = BaseAccess(user) + + can_add = check_superuser(BaseAccess.can_add) + assert can_add(access, None) is False +