From 5c2c46c5aadab64a00228cf6ee940fdfb74b8a85 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Tue, 1 Nov 2016 13:33:16 -0400 Subject: [PATCH] Consolidate related field access checks into BaseAccess method --- awx/main/access.py | 200 ++++++++++----------- awx/main/tests/functional/test_projects.py | 2 +- 2 files changed, 96 insertions(+), 106 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 0fea7b8852..4d7228a375 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -192,6 +192,69 @@ class BaseAccess(object): def can_unattach(self, obj, sub_obj, relationship, data=None): return self.can_change(obj, data) + def check_related(self, field, Model, data, role_field='admin_role', + obj=None, fail_on_missing=True, mandatory=False): + ''' + Check permission for related field, in scenarios: + - creating a new resource, user must have permission to the + resource specified in `data` + - editing an existing resource, user must have permission to resource + in `data`, as well as existing related resource on `obj` + If `obj` is not provided, we assume this is a new resource + If obj.field is null, this does not block the action + If field is not given, return the inverse of `mandatory` + ''' + new = None + new_pk = None + if data and 'reference_obj' in data: + # Use reference object's related fields, if given + new = getattr(data['reference_obj'], field) + new_pk = getattr(new, 'pk', None) + elif data and field in data: + # Obtain the resource specified in `data` + raw_value = data[field] + try: + new_pk = int(raw_value) + except (TypeError, ValueError): + if type(raw_value) == Model: + new = raw_value + new_pk = getattr(new, 'pk', None) + + # Obtain existing related resource + current = None + current_pk = None + if obj: + current = getattr(obj, field) + current_pk = getattr(obj, '%s_id' % field, None) + + # Define special case where resource is not changed + changed = True + if (new_pk is None or new_pk == current_pk) and (new is None or new == current): + changed = False + + # Get the new resource from the database + if new is None and new_pk is not None: + if fail_on_missing: + new = get_object_or_400(Model, pk=new_pk) + else: + new = Model.objects.filter(pk=new_pk).first() + + if not new and not current and mandatory: + return self.user.is_superuser + + if new and changed: + new_role = getattr(new, role_field) + if self.user not in new_role: + return False # User lacks access to provided resource + + if current and (changed or mandatory): + current_role = getattr(current, role_field) + if self.user not in current_role: + return False # User lacks access to existing resource + + # User has access to both, permission check passed + return True + def check_license(self, add_host=False, feature=None, check_expiration=True): validation_info = TaskEnhancer().validate_enhancements() if ('test' in sys.argv or 'py.test' in sys.argv[0] or 'jenkins' in sys.argv) and not os.environ.get('SKIP_LICENSE_FIXUP_FOR_TEST', ''): @@ -461,9 +524,7 @@ class InventoryAccess(BaseAccess): if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) @check_superuser def can_change(self, obj, data): @@ -473,11 +534,8 @@ class InventoryAccess(BaseAccess): def can_admin(self, obj, data): # Verify that the user has access to the new organization if moving an # inventory to a new organization. - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False + if not self.check_related('organization', Organization, data, obj=obj): + return False # Otherwise, just check for admin permission. return self.user in obj.admin_role @@ -523,9 +581,7 @@ class HostAccess(BaseAccess): return Inventory.accessible_objects(self.user, 'admin_role').exists() # Checks for admin or change permission on inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - inventory = get_object_or_400(Inventory, pk=inventory_pk) - if self.user not in inventory.admin_role: + if not self.check_related('inventory', Inventory, data): return False # Check to see if we have enough licenses @@ -574,9 +630,7 @@ class GroupAccess(BaseAccess): if not data or 'inventory' not in data: return False # Checks for admin or change permission on inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - inventory = get_object_or_400(Inventory, pk=inventory_pk) - return self.user in inventory.admin_role + return self.check_related('inventory', Inventory, data) def can_change(self, obj, data): # Prevent moving a group to a different inventory. @@ -795,11 +849,7 @@ class TeamAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - if self.user in org.admin_role: - return True - return False + return self.check_related('organization', Organization, data) def can_change(self, obj, data): # Prevent moving a team to a different organization. @@ -866,17 +916,12 @@ class ProjectAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - organization_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=organization_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False + if not self.check_related('organization', Organization, data): + return False return self.user in obj.admin_role def can_delete(self, obj): @@ -974,7 +1019,7 @@ class JobTemplateAccess(BaseAccess): Inventory.accessible_objects(self.user, 'use_role').exists()) # if reference_obj is provided, determine if it can be coppied - reference_obj = data.pop('reference_obj', None) + reference_obj = data.get('reference_obj', None) if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN: self.check_license(feature='system_tracking') @@ -996,22 +1041,16 @@ class JobTemplateAccess(BaseAccess): return None # If a credential is provided, the user should have use access to it. - credential = get_value(Credential, 'credential') - if credential: - if self.user not in credential.use_role: - return False + if not self.check_related('credential', Credential, data, role_field='use_role'): + return False # If a cloud credential is provided, the user should have use access. - cloud_credential = get_value(Credential, 'cloud_credential') - if cloud_credential: - if self.user not in cloud_credential.use_role: - return False + if not self.check_related('cloud_credential', Credential, data, role_field='use_role'): + return False # If a network credential is provided, the user should have use access. - network_credential = get_value(Credential, 'network_credential') - if network_credential: - if self.user not in network_credential.use_role: - return False + if not self.check_related('network_credential', Credential, data, role_field='use_role'): + return False # If an inventory is provided, the user should have use access. inventory = get_value(Inventory, 'inventory') @@ -1327,28 +1366,17 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): return qs def can_use_prompted_resources(self, data): - cred_pk = data.get('credential', None) - inv_pk = data.get('inventory', None) - if cred_pk: - credential = get_object_or_400(Credential, pk=cred_pk) - if self.user not in credential.use_role: - return False - if inv_pk: - inventory = get_object_or_400(Inventory, pk=inv_pk) - if self.user not in inventory.use_role: - return False + if not self.check_related('credential', Credential, data): + return False + if not self.check_related('inventory', Inventory, data): + return False return True @check_superuser def can_add(self, data): if not data: # So the browseable API will work return True - wfjt_pk = data.get('workflow_job_template', None) - if wfjt_pk: - wfjt = get_object_or_400(WorkflowJobTemplate, pk=wfjt_pk) - if self.user not in wfjt.admin_role: - return False - else: + if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, fail_on_missing=False): return False if not self.can_use_prompted_resources(data): return False @@ -1451,6 +1479,7 @@ class WorkflowJobTemplateAccess(BaseAccess): def can_read(self, obj): return self.user in obj.read_role + @check_superuser def can_add(self, data): ''' a user can create a job template if they are a superuser, an org admin @@ -1491,13 +1520,7 @@ class WorkflowJobTemplateAccess(BaseAccess): # if 'survey_enabled' in data and data['survey_enabled']: # self.check_license(feature='surveys') - org_pk = get_pk_from_dict(data, 'organization') - if not org_pk: - # only superusers can create or manage orphan WFJTs - return self.user.is_superuser - - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) def can_start(self, obj, validate_license=True): if validate_license: @@ -1520,21 +1543,8 @@ class WorkflowJobTemplateAccess(BaseAccess): if self.user.is_superuser: return True - if data is None: - return self.user in obj.admin_role - org_pk = get_pk_from_dict(data, 'organization') - if ('organization' not in data or - (org_pk is None and obj.organization is None) or - (obj.organization and obj.organization.pk == org_pk)): - # No organization changes - return self.user in obj.admin_role - - # If it already has an organization set, must be admin of the org to change it - if obj.organization and self.user not in obj.organization.admin_role: - return False - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role def can_delete(self, obj): is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role @@ -1616,19 +1626,13 @@ class AdHocCommandAccess(BaseAccess): self.check_license() # If a credential is provided, the user should have use access to it. - credential_pk = get_pk_from_dict(data, 'credential') - if credential_pk: - credential = get_object_or_400(Credential, pk=credential_pk) - if self.user not in credential.use_role: - return False + if not self.check_related('credential', Credential, data, role_field='use_role'): + return False # Check that the user has the run ad hoc command permission on the # given inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - if inventory_pk: - inventory = get_object_or_400(Inventory, pk=inventory_pk) - if self.user not in inventory.adhoc_role: - return False + if not self.check_related('inventory', Inventory, data, role_field='adhoc_role'): + return False return True @@ -1911,21 +1915,14 @@ class NotificationTemplateAccess(BaseAccess): def can_add(self, data): if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): if obj.organization is None: # only superusers are allowed to edit orphan notification templates return False - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False - return self.user in obj.organization.admin_role + return self.check_related('organization', Organization, data, obj=obj, mandatory=True) def can_admin(self, obj, data): return self.can_change(obj, data) @@ -2087,18 +2084,11 @@ class CustomInventoryScriptAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) @check_superuser def can_admin(self, obj, data=None): - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False - return self.user in obj.admin_role + return self.check_related('organization', Organization, data, obj=obj) @check_superuser def can_change(self, obj, data): diff --git a/awx/main/tests/functional/test_projects.py b/awx/main/tests/functional/test_projects.py index 40ea659432..0a8fec4e72 100644 --- a/awx/main/tests/functional/test_projects.py +++ b/awx/main/tests/functional/test_projects.py @@ -121,7 +121,7 @@ def test_create_project_null_organization(post, organization, admin): @pytest.mark.django_db() def test_create_project_null_organization_xfail(post, organization, org_admin): - post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=400) + post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=403) @pytest.mark.django_db() def test_patch_project_null_organization(patch, organization, project, admin):