From 5c2c46c5aadab64a00228cf6ee940fdfb74b8a85 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Tue, 1 Nov 2016 13:33:16 -0400 Subject: [PATCH 1/8] Consolidate related field access checks into BaseAccess method --- awx/main/access.py | 200 ++++++++++----------- awx/main/tests/functional/test_projects.py | 2 +- 2 files changed, 96 insertions(+), 106 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 0fea7b8852..4d7228a375 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -192,6 +192,69 @@ class BaseAccess(object): def can_unattach(self, obj, sub_obj, relationship, data=None): return self.can_change(obj, data) + def check_related(self, field, Model, data, role_field='admin_role', + obj=None, fail_on_missing=True, mandatory=False): + ''' + Check permission for related field, in scenarios: + - creating a new resource, user must have permission to the + resource specified in `data` + - editing an existing resource, user must have permission to resource + in `data`, as well as existing related resource on `obj` + If `obj` is not provided, we assume this is a new resource + If obj.field is null, this does not block the action + If field is not given, return the inverse of `mandatory` + ''' + new = None + new_pk = None + if data and 'reference_obj' in data: + # Use reference object's related fields, if given + new = getattr(data['reference_obj'], field) + new_pk = getattr(new, 'pk', None) + elif data and field in data: + # Obtain the resource specified in `data` + raw_value = data[field] + try: + new_pk = int(raw_value) + except (TypeError, ValueError): + if type(raw_value) == Model: + new = raw_value + new_pk = getattr(new, 'pk', None) + + # Obtain existing related resource + current = None + current_pk = None + if obj: + current = getattr(obj, field) + current_pk = getattr(obj, '%s_id' % field, None) + + # Define special case where resource is not changed + changed = True + if (new_pk is None or new_pk == current_pk) and (new is None or new == current): + changed = False + + # Get the new resource from the database + if new is None and new_pk is not None: + if fail_on_missing: + new = get_object_or_400(Model, pk=new_pk) + else: + new = Model.objects.filter(pk=new_pk).first() + + if not new and not current and mandatory: + return self.user.is_superuser + + if new and changed: + new_role = getattr(new, role_field) + if self.user not in new_role: + return False # User lacks access to provided resource + + if current and (changed or mandatory): + current_role = getattr(current, role_field) + if self.user not in current_role: + return False # User lacks access to existing resource + + # User has access to both, permission check passed + return True + def check_license(self, add_host=False, feature=None, check_expiration=True): validation_info = TaskEnhancer().validate_enhancements() if ('test' in sys.argv or 'py.test' in sys.argv[0] or 'jenkins' in sys.argv) and not os.environ.get('SKIP_LICENSE_FIXUP_FOR_TEST', ''): @@ -461,9 +524,7 @@ class InventoryAccess(BaseAccess): if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) @check_superuser def can_change(self, obj, data): @@ -473,11 +534,8 @@ class InventoryAccess(BaseAccess): def can_admin(self, obj, data): # Verify that the user has access to the new organization if moving an # inventory to a new organization. - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False + if not self.check_related('organization', Organization, data, obj=obj): + return False # Otherwise, just check for admin permission. return self.user in obj.admin_role @@ -523,9 +581,7 @@ class HostAccess(BaseAccess): return Inventory.accessible_objects(self.user, 'admin_role').exists() # Checks for admin or change permission on inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - inventory = get_object_or_400(Inventory, pk=inventory_pk) - if self.user not in inventory.admin_role: + if not self.check_related('inventory', Inventory, data): return False # Check to see if we have enough licenses @@ -574,9 +630,7 @@ class GroupAccess(BaseAccess): if not data or 'inventory' not in data: return False # Checks for admin or change permission on inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - inventory = get_object_or_400(Inventory, pk=inventory_pk) - return self.user in inventory.admin_role + return self.check_related('inventory', Inventory, data) def can_change(self, obj, data): # Prevent moving a group to a different inventory. @@ -795,11 +849,7 @@ class TeamAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - if self.user in org.admin_role: - return True - return False + return self.check_related('organization', Organization, data) def can_change(self, obj, data): # Prevent moving a team to a different organization. @@ -866,17 +916,12 @@ class ProjectAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - organization_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=organization_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False + if not self.check_related('organization', Organization, data): + return False return self.user in obj.admin_role def can_delete(self, obj): @@ -974,7 +1019,7 @@ class JobTemplateAccess(BaseAccess): Inventory.accessible_objects(self.user, 'use_role').exists()) # if reference_obj is provided, determine if it can be coppied - reference_obj = data.pop('reference_obj', None) + reference_obj = data.get('reference_obj', None) if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN: self.check_license(feature='system_tracking') @@ -996,22 +1041,16 @@ class JobTemplateAccess(BaseAccess): return None # If a credential is provided, the user should have use access to it. - credential = get_value(Credential, 'credential') - if credential: - if self.user not in credential.use_role: - return False + if not self.check_related('credential', Credential, data, role_field='use_role'): + return False # If a cloud credential is provided, the user should have use access. - cloud_credential = get_value(Credential, 'cloud_credential') - if cloud_credential: - if self.user not in cloud_credential.use_role: - return False + if not self.check_related('cloud_credential', Credential, data, role_field='use_role'): + return False # If a network credential is provided, the user should have use access. - network_credential = get_value(Credential, 'network_credential') - if network_credential: - if self.user not in network_credential.use_role: - return False + if not self.check_related('network_credential', Credential, data, role_field='use_role'): + return False # If an inventory is provided, the user should have use access. inventory = get_value(Inventory, 'inventory') @@ -1327,28 +1366,17 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): return qs def can_use_prompted_resources(self, data): - cred_pk = data.get('credential', None) - inv_pk = data.get('inventory', None) - if cred_pk: - credential = get_object_or_400(Credential, pk=cred_pk) - if self.user not in credential.use_role: - return False - if inv_pk: - inventory = get_object_or_400(Inventory, pk=inv_pk) - if self.user not in inventory.use_role: - return False + if not self.check_related('credential', Credential, data): + return False + if not self.check_related('inventory', Inventory, data): + return False return True @check_superuser def can_add(self, data): if not data: # So the browseable API will work return True - wfjt_pk = data.get('workflow_job_template', None) - if wfjt_pk: - wfjt = get_object_or_400(WorkflowJobTemplate, pk=wfjt_pk) - if self.user not in wfjt.admin_role: - return False - else: + if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, fail_on_missing=False): return False if not self.can_use_prompted_resources(data): return False @@ -1451,6 +1479,7 @@ class WorkflowJobTemplateAccess(BaseAccess): def can_read(self, obj): return self.user in obj.read_role + @check_superuser def can_add(self, data): ''' a user can create a job template if they are a superuser, an org admin @@ -1491,13 +1520,7 @@ class WorkflowJobTemplateAccess(BaseAccess): # if 'survey_enabled' in data and data['survey_enabled']: # self.check_license(feature='surveys') - org_pk = get_pk_from_dict(data, 'organization') - if not org_pk: - # only superusers can create or manage orphan WFJTs - return self.user.is_superuser - - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) def can_start(self, obj, validate_license=True): if validate_license: @@ -1520,21 +1543,8 @@ class WorkflowJobTemplateAccess(BaseAccess): if self.user.is_superuser: return True - if data is None: - return self.user in obj.admin_role - org_pk = get_pk_from_dict(data, 'organization') - if ('organization' not in data or - (org_pk is None and obj.organization is None) or - (obj.organization and obj.organization.pk == org_pk)): - # No organization changes - return self.user in obj.admin_role - - # If it already has an organization set, must be admin of the org to change it - if obj.organization and self.user not in obj.organization.admin_role: - return False - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role def can_delete(self, obj): is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role @@ -1616,19 +1626,13 @@ class AdHocCommandAccess(BaseAccess): self.check_license() # If a credential is provided, the user should have use access to it. - credential_pk = get_pk_from_dict(data, 'credential') - if credential_pk: - credential = get_object_or_400(Credential, pk=credential_pk) - if self.user not in credential.use_role: - return False + if not self.check_related('credential', Credential, data, role_field='use_role'): + return False # Check that the user has the run ad hoc command permission on the # given inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - if inventory_pk: - inventory = get_object_or_400(Inventory, pk=inventory_pk) - if self.user not in inventory.adhoc_role: - return False + if not self.check_related('inventory', Inventory, data, role_field='adhoc_role'): + return False return True @@ -1911,21 +1915,14 @@ class NotificationTemplateAccess(BaseAccess): def can_add(self, data): if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): if obj.organization is None: # only superusers are allowed to edit orphan notification templates return False - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False - return self.user in obj.organization.admin_role + return self.check_related('organization', Organization, data, obj=obj, mandatory=True) def can_admin(self, obj, data): return self.can_change(obj, data) @@ -2087,18 +2084,11 @@ class CustomInventoryScriptAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) @check_superuser def can_admin(self, obj, data=None): - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False - return self.user in obj.admin_role + return self.check_related('organization', Organization, data, obj=obj) @check_superuser def can_change(self, obj, data): diff --git a/awx/main/tests/functional/test_projects.py b/awx/main/tests/functional/test_projects.py index 40ea659432..0a8fec4e72 100644 --- a/awx/main/tests/functional/test_projects.py +++ b/awx/main/tests/functional/test_projects.py @@ -121,7 +121,7 @@ def test_create_project_null_organization(post, organization, admin): @pytest.mark.django_db() def test_create_project_null_organization_xfail(post, organization, org_admin): - post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=400) + post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=403) @pytest.mark.django_db() def test_patch_project_null_organization(patch, organization, project, admin): From db890b575992bd39caccf22bfde952e01ff248cb Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Wed, 2 Nov 2016 11:11:37 -0400 Subject: [PATCH 2/8] interpret absent related field as a unchanged state in permissions check --- awx/main/access.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 4d7228a375..8f29947394 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -206,6 +206,7 @@ class BaseAccess(object): ''' new = None new_pk = None + changed = True if data and 'reference_obj' in data: # Use reference object's related fields, if given new = getattr(data['reference_obj'], field) @@ -219,21 +220,22 @@ class BaseAccess(object): if type(raw_value) == Model: new = raw_value new_pk = getattr(new, 'pk', None) + elif data is None or field not in data: + changed = False # Obtain existing related resource current = None current_pk = None - if obj: + if obj and (changed or mandatory): current = getattr(obj, field) current_pk = getattr(obj, '%s_id' % field, None) - # Define special case where resource is not changed - changed = True - if (new_pk is None or new_pk == current_pk) and (new is None or new == current): + # Define special case where resource is not changed, for example, a PUT request + if (current_pk and new_pk == current_pk) or (obj is None and new_pk is None): changed = False # Get the new resource from the database - if new is None and new_pk is not None: + if (new is None and changed) or (obj is None and mandatory): if fail_on_missing: new = get_object_or_400(Model, pk=new_pk) else: @@ -916,7 +918,7 @@ class ProjectAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - return self.check_related('organization', Organization, data, mandatory=True) + return self.check_related('organization', Organization, data, mandatory=True, fail_on_missing=False) @check_superuser def can_change(self, obj, data): @@ -1978,17 +1980,13 @@ class LabelAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return True - - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.member_role + return self.check_related('organization', Organization, data, role_field='member_role', mandatory=True) @check_superuser def can_change(self, obj, data): if self.can_add(data) is False: return False - - return self.user in obj.organization.admin_role + return self.check_related('organization', Organization, data, obj=obj, mandatory=True) def can_delete(self, obj): return self.can_change(obj, None) From 2ddda56f09eebf7e4302282ba743e6db401ca1f5 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Sat, 5 Nov 2016 11:01:59 -0400 Subject: [PATCH 3/8] expand check_related to polymorphic related fields --- awx/main/access.py | 51 +++++++++---------- .../functional/test_rbac_job_templates.py | 22 ++++++++ awx/main/tests/functional/test_rbac_label.py | 3 +- 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 8f29947394..7fe6a35eaa 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -193,7 +193,7 @@ class BaseAccess(object): return self.can_change(obj, data) def check_related(self, field, Model, data, role_field='admin_role', - obj=None, fail_on_missing=True, mandatory=False): + obj=None, mandatory=False): ''' Check permission for related field, in scenarios: - creating a new resource, user must have permission to the @@ -205,53 +205,56 @@ class BaseAccess(object): If field is not given, return the inverse of `mandatory` ''' new = None - new_pk = None changed = True if data and 'reference_obj' in data: # Use reference object's related fields, if given new = getattr(data['reference_obj'], field) - new_pk = getattr(new, 'pk', None) elif data and field in data: # Obtain the resource specified in `data` raw_value = data[field] try: new_pk = int(raw_value) + if new_pk < 1 or (obj and new_pk == getattr(obj, '%s_id' % field, None)): + changed = False + else: + # Get the new resource from the database + new = get_object_or_400(Model, pk=new_pk) except (TypeError, ValueError): if type(raw_value) == Model: new = raw_value - new_pk = getattr(new, 'pk', None) elif data is None or field not in data: changed = False # Obtain existing related resource current = None - current_pk = None if obj and (changed or mandatory): current = getattr(obj, field) - current_pk = getattr(obj, '%s_id' % field, None) - # Define special case where resource is not changed, for example, a PUT request - if (current_pk and new_pk == current_pk) or (obj is None and new_pk is None): + # Resource not changed, like a PUT request + if (obj is None and new is None) or (new is not None and new == current): changed = False - # Get the new resource from the database - if (new is None and changed) or (obj is None and mandatory): - if fail_on_missing: - new = get_object_or_400(Model, pk=new_pk) - else: - new = Model.objects.filter(pk=new_pk).first() - if not new and not current and mandatory: return self.user.is_superuser + def user_has_resource_access(resource): + role = getattr(resource, role_field, None) + if role is None: + # Handle special case where resource does not have direct roles + access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] + if not self.user.can_access(type(resource), access_method_type, resource, None): + return False + else: + if self.user not in role: + return False + return True + if new and changed: - new_role = getattr(new, role_field) - if self.user not in new_role: + if not user_has_resource_access(new): return False # User lacks access to provided resource if current and (changed or mandatory): - current_role = getattr(current, role_field) - if self.user not in current_role: + if not user_has_resource_access(current): return False # User lacks access to existing resource # User has access to both, permission check passed @@ -918,7 +921,7 @@ class ProjectAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - return self.check_related('organization', Organization, data, mandatory=True, fail_on_missing=False) + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): @@ -1378,7 +1381,7 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return True - if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, fail_on_missing=False): + if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True): return False if not self.can_use_prompted_resources(data): return False @@ -1876,11 +1879,7 @@ class ScheduleAccess(BaseAccess): @check_superuser def can_change(self, obj, data): - if obj and obj.unified_job_template: - job_class = obj.unified_job_template - return self.user.can_access(type(job_class), 'change', job_class, None) - else: - return False + self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True) @check_superuser def can_delete(self, obj): diff --git a/awx/main/tests/functional/test_rbac_job_templates.py b/awx/main/tests/functional/test_rbac_job_templates.py index 8905eb21a0..71412959c9 100644 --- a/awx/main/tests/functional/test_rbac_job_templates.py +++ b/awx/main/tests/functional/test_rbac_job_templates.py @@ -4,10 +4,12 @@ import pytest from awx.main.access import ( BaseAccess, JobTemplateAccess, + ScheduleAccess ) from awx.main.migrations import _rbac as rbac from awx.main.models import Permission from awx.main.models.jobs import JobTemplate +from awx.main.models.schedules import Schedule from django.apps import apps from django.core.urlresolvers import reverse @@ -247,3 +249,23 @@ def test_associate_label(label, user, job_template): job_template.admin_role.members.add(user('joe', False)) label.organization.read_role.members.add(user('joe', False)) assert access.can_attach(job_template, label, 'labels', None) + +@pytest.mark.django_db +def test_move_schedule_to_JT_no_access(job_template, rando): + schedule = Schedule.objects.create( + unified_job_template=job_template, + rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1') + job_template.admin_role.members.add(rando) + jt2 = JobTemplate.objects.create(name="other-jt") + access = ScheduleAccess(rando) + assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk)) + +@pytest.mark.django_db +def test_move_schedule_from_JT_no_access(job_template, rando): + schedule = Schedule.objects.create( + unified_job_template=job_template, + rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1') + jt2 = JobTemplate.objects.create(name="other-jt") + jt2.admin_role.members.add(rando) + access = ScheduleAccess(rando) + assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk)) diff --git a/awx/main/tests/functional/test_rbac_label.py b/awx/main/tests/functional/test_rbac_label.py index 98daa5cdec..c4f4438e50 100644 --- a/awx/main/tests/functional/test_rbac_label.py +++ b/awx/main/tests/functional/test_rbac_label.py @@ -54,8 +54,7 @@ def test_label_access_user(label, user): access = LabelAccess(user('user', False)) label.organization.member_role.members.add(user('user', False)) - with pytest.raises(ParseError): - access.can_add({'organization': None}) + assert not access.can_add({'organization': None}) assert not access.can_change(label, None) assert not access.can_delete(label) From 2279e1e8dc46f5bf095c5eeb090432b96b0a946b Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Sat, 5 Nov 2016 13:45:07 -0400 Subject: [PATCH 4/8] unit test related field checks, finialize scenarios --- awx/main/access.py | 71 ++++++++-------- awx/main/tests/functional/test_rbac_label.py | 2 - awx/main/tests/unit/test_access.py | 89 +++++++++++++++++++- 3 files changed, 118 insertions(+), 44 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 7fe6a35eaa..021ef309ab 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -196,32 +196,40 @@ class BaseAccess(object): obj=None, mandatory=False): ''' Check permission for related field, in scenarios: - - creating a new resource, user must have permission to the - resource specified in `data` + - creating a new resource, user must have permission if + resource is specified in `data` - editing an existing resource, user must have permission to resource in `data`, as well as existing related resource on `obj` - If `obj` is not provided, we assume this is a new resource + If obj.field is null, this does not block the action - If field is not given, return the inverse of `mandatory` + If `mandatory` is set, new resources require the field and + existing field will always be checked ''' new = None changed = True + is_removal = False if data and 'reference_obj' in data: # Use reference object's related fields, if given new = getattr(data['reference_obj'], field) elif data and field in data: # Obtain the resource specified in `data` raw_value = data[field] - try: - new_pk = int(raw_value) - if new_pk < 1 or (obj and new_pk == getattr(obj, '%s_id' % field, None)): - changed = False - else: - # Get the new resource from the database - new = get_object_or_400(Model, pk=new_pk) - except (TypeError, ValueError): - if type(raw_value) == Model: - new = raw_value + if isinstance(raw_value, Model): + new = raw_value + elif raw_value is None: + new = None + is_removal = True + else: + try: + new_pk = int(raw_value) + # Avoid database query by comparing pk to model for similarity + if obj and new_pk == getattr(obj, '%s_id' % field, None): + changed = False + else: + # Get the new resource from the database + new = get_object_or_400(Model, pk=new_pk) + except (TypeError, ValueError): + raise ParseError(_("Bad data found in related field %s." % field)) elif data is None or field not in data: changed = False @@ -230,11 +238,13 @@ class BaseAccess(object): if obj and (changed or mandatory): current = getattr(obj, field) - # Resource not changed, like a PUT request - if (obj is None and new is None) or (new is not None and new == current): + if obj and new == current and (not is_removal): + # Resource not changed, like a PUT request changed = False - if not new and not current and mandatory: + if (not new) and (not obj) and mandatory: + # Restrict ability to create resource without required field + print ' superuser ' return self.user.is_superuser def user_has_resource_access(resource): @@ -242,12 +252,8 @@ class BaseAccess(object): if role is None: # Handle special case where resource does not have direct roles access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] - if not self.user.can_access(type(resource), access_method_type, resource, None): - return False - else: - if self.user not in role: - return False - return True + return self.user.can_access(type(resource), access_method_type, resource, None) + return self.user in role if new and changed: if not user_has_resource_access(new): @@ -257,8 +263,7 @@ class BaseAccess(object): if not user_has_resource_access(current): return False # User lacks access to existing resource - # User has access to both, permission check passed - return True + return True # User has access to both, permission check passed def check_license(self, add_host=False, feature=None, check_expiration=True): validation_info = TaskEnhancer().validate_enhancements() @@ -1870,24 +1875,14 @@ class ScheduleAccess(BaseAccess): @check_superuser def can_add(self, data): - pk = get_pk_from_dict(data, 'unified_job_template') - obj = get_object_or_400(UnifiedJobTemplate, pk=pk) - if obj: - return self.user.can_access(type(obj), 'change', obj, None) - else: - return False + return self.check_related('unified_job_template', UnifiedJobTemplate, data, mandatory=True) @check_superuser def can_change(self, obj, data): - self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True) + return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True) - @check_superuser def can_delete(self, obj): - if obj and obj.unified_job_template: - job_class = obj.unified_job_template - return self.user.can_access(type(job_class), 'change', job_class, None) - else: - return False + return self.can_change(obj, {}) class NotificationTemplateAccess(BaseAccess): ''' diff --git a/awx/main/tests/functional/test_rbac_label.py b/awx/main/tests/functional/test_rbac_label.py index c4f4438e50..57700d17e2 100644 --- a/awx/main/tests/functional/test_rbac_label.py +++ b/awx/main/tests/functional/test_rbac_label.py @@ -4,8 +4,6 @@ from awx.main.access import ( LabelAccess, ) -from rest_framework.exceptions import ParseError - @pytest.mark.django_db def test_label_get_queryset_user(label, user): access = LabelAccess(user('user', False)) diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index 806f622690..d2b2412ca0 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -3,6 +3,7 @@ import mock from django.contrib.auth.models import User from django.forms.models import model_to_dict +from rest_framework.exceptions import ParseError from awx.main.access import ( BaseAccess, @@ -16,6 +17,90 @@ from awx.conf.license import LicenseForbids from awx.main.models import Credential, Inventory, Project, Role, Organization, Instance +@pytest.fixture +def user_unit(): + return User(username='rando', password='raginrando', email='rando@redhat.com') + +class TestRelatedFieldAccess: + + @pytest.fixture + def good_role(self, mocker): + return mocker.MagicMock(__contains__=lambda self, user: True) + + @pytest.fixture + def bad_role(self, mocker): + return mocker.MagicMock(__contains__=lambda self, user: False) + + @pytest.fixture + def resource_good(self, mocker, good_role): + return mocker.MagicMock(related=mocker.MagicMock(admin_role=good_role), + admin_role=good_role) + + @pytest.fixture + def resource_bad(self, mocker, bad_role): + return mocker.MagicMock(related=mocker.MagicMock(admin_role=bad_role), + admin_role=bad_role) + + @pytest.fixture + def access(self, user_unit): + return BaseAccess(user_unit) + + def test_new_optional_fail(self, access, resource_bad, mocker): + """ + User tries to create a new resource, but lacks permission + to the related resource they provided + """ + data = {'related': resource_bad} + assert not access.check_related('related', mocker.MagicMock, data) + + def test_new_with_bad_data(self, access, resource_bad, mocker): + data = {'related': 3.1415} + with pytest.raises(ParseError): + access.check_related('related', mocker.MagicMock, data) + + def test_new_mandatory_fail(self, access, resource_bad, mocker): + access.user.is_superuser = False + assert not access.check_related( + 'related', mocker.MagicMock, {}, mandatory=True) + assert not access.check_related( + 'related', mocker.MagicMock, {'resource': None}, mandatory=True) + + def test_existing_no_op(self, access, resource_bad, mocker): + """ + User edits a resource, but does not change related field + lack of access to related field does not block action + """ + data = {'related': resource_bad.related} + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) + + def test_existing_required_access(self, access, resource_bad, mocker): + assert not access.check_related( + 'related', mocker.MagicMock, {}, obj=resource_bad, mandatory=True) + + def test_existing_no_access_to_current( + self, access, resource_good, bad_role, mocker): + """ + User gives a valid related resource (like organization), but does + not have access to _existing_ related resource, so deny action + """ + resource_good.admin_role = bad_role + data = {'related': resource_good.related} + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource_good) + + def test_existing_no_access_to_new( + self, access, resource_good, bad_role, mocker): + resource_good.related.admin_role = bad_role + data = {'related': resource_good.related} + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource_good) + + def test_existing_not_allowed_to_remove(self, access, resource_bad, mocker): + data = {'related': None} + assert not access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) + @pytest.fixture def job_template_with_ids(job_template_factory): # Create non-persisted objects with IDs to send to job_template_factory @@ -31,10 +116,6 @@ def job_template_with_ids(job_template_factory): persisted=False) return jt_objects.job_template -@pytest.fixture -def user_unit(): - return User(username='rando', password='raginrando', email='rando@redhat.com') - def test_superuser(mocker): user = mocker.MagicMock(spec=User, id=1, is_superuser=True) access = BaseAccess(user) From f2846101a314601d542ace8846a7b32b235b47f7 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Sat, 5 Nov 2016 14:24:04 -0400 Subject: [PATCH 5/8] deal with a special null null case --- awx/main/access.py | 4 +--- awx/main/tests/unit/test_access.py | 8 ++++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 021ef309ab..8f77698791 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -207,7 +207,6 @@ class BaseAccess(object): ''' new = None changed = True - is_removal = False if data and 'reference_obj' in data: # Use reference object's related fields, if given new = getattr(data['reference_obj'], field) @@ -218,7 +217,6 @@ class BaseAccess(object): new = raw_value elif raw_value is None: new = None - is_removal = True else: try: new_pk = int(raw_value) @@ -238,7 +236,7 @@ class BaseAccess(object): if obj and (changed or mandatory): current = getattr(obj, field) - if obj and new == current and (not is_removal): + if obj and new == current: # Resource not changed, like a PUT request changed = False diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index d2b2412ca0..10b2328033 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -101,6 +101,14 @@ class TestRelatedFieldAccess: assert not access.check_related( 'related', mocker.MagicMock, data, obj=resource_bad) + def test_existing_not_null_null(self, access, bad_role, mocker): + resource = mocker.MagicMock(related=None) + data = {'related': None} + # Not changing anything by giving null when it is already-null + # important for PUT requests + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource, mandatory=True) + @pytest.fixture def job_template_with_ids(job_template_factory): # Create non-persisted objects with IDs to send to job_template_factory From cae2cdaad892304c3d1b57fa1cb54356e70885e0 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 7 Nov 2016 08:45:50 -0500 Subject: [PATCH 6/8] check related method small fixups --- awx/main/access.py | 12 ++++-------- awx/main/tests/unit/test_access.py | 6 +++--- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 8f77698791..1c907f67fa 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -201,7 +201,6 @@ class BaseAccess(object): - editing an existing resource, user must have permission to resource in `data`, as well as existing related resource on `obj` - If obj.field is null, this does not block the action If `mandatory` is set, new resources require the field and existing field will always be checked ''' @@ -242,7 +241,6 @@ class BaseAccess(object): if (not new) and (not obj) and mandatory: # Restrict ability to create resource without required field - print ' superuser ' return self.user.is_superuser def user_has_resource_access(resource): @@ -253,13 +251,11 @@ class BaseAccess(object): return self.user.can_access(type(resource), access_method_type, resource, None) return self.user in role - if new and changed: - if not user_has_resource_access(new): - return False # User lacks access to provided resource + if new and changed and (not user_has_resource_access(new)): + return False # User lacks access to provided resource - if current and (changed or mandatory): - if not user_has_resource_access(current): - return False # User lacks access to existing resource + if current and (changed or mandatory) and (not user_has_resource_access(current)): + return False # User lacks access to existing resource return True # User has access to both, permission check passed diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index 10b2328033..728b73954e 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -53,12 +53,12 @@ class TestRelatedFieldAccess: data = {'related': resource_bad} assert not access.check_related('related', mocker.MagicMock, data) - def test_new_with_bad_data(self, access, resource_bad, mocker): + def test_new_with_bad_data(self, access, mocker): data = {'related': 3.1415} with pytest.raises(ParseError): access.check_related('related', mocker.MagicMock, data) - def test_new_mandatory_fail(self, access, resource_bad, mocker): + def test_new_mandatory_fail(self, access, mocker): access.user.is_superuser = False assert not access.check_related( 'related', mocker.MagicMock, {}, mandatory=True) @@ -101,7 +101,7 @@ class TestRelatedFieldAccess: assert not access.check_related( 'related', mocker.MagicMock, data, obj=resource_bad) - def test_existing_not_null_null(self, access, bad_role, mocker): + def test_existing_not_null_null(self, access, mocker): resource = mocker.MagicMock(related=None) data = {'related': None} # Not changing anything by giving null when it is already-null From 2afdc04ac02bdedf3a95246ab1aff5ec45e3324e Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 7 Nov 2016 11:56:11 -0500 Subject: [PATCH 7/8] cleanup inventory admin access syntax --- awx/main/access.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 1c907f67fa..d2180cadaa 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -537,11 +537,8 @@ class InventoryAccess(BaseAccess): @check_superuser def can_admin(self, obj, data): # Verify that the user has access to the new organization if moving an - # inventory to a new organization. - if not self.check_related('organization', Organization, data, obj=obj): - return False - # Otherwise, just check for admin permission. - return self.user in obj.admin_role + # inventory to a new organization. Otherwise, just check for admin permission. + return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role def can_delete(self, obj): is_can_admin = self.can_admin(obj, None) From 9c30058abe59ef7bc53d620cefb5c817bfc64512 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 7 Nov 2016 16:07:52 -0500 Subject: [PATCH 8/8] improvements to related access unit tests --- awx/main/tests/unit/test_access.py | 36 ++++++++++++++---------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index 728b73954e..83b07d3ef6 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -24,20 +24,14 @@ def user_unit(): class TestRelatedFieldAccess: @pytest.fixture - def good_role(self, mocker): - return mocker.MagicMock(__contains__=lambda self, user: True) - - @pytest.fixture - def bad_role(self, mocker): - return mocker.MagicMock(__contains__=lambda self, user: False) - - @pytest.fixture - def resource_good(self, mocker, good_role): + def resource_good(self, mocker): + good_role = mocker.MagicMock(__contains__=lambda self, user: True) return mocker.MagicMock(related=mocker.MagicMock(admin_role=good_role), admin_role=good_role) @pytest.fixture - def resource_bad(self, mocker, bad_role): + def resource_bad(self, mocker): + bad_role = mocker.MagicMock(__contains__=lambda self, user: False) return mocker.MagicMock(related=mocker.MagicMock(admin_role=bad_role), admin_role=bad_role) @@ -73,27 +67,31 @@ class TestRelatedFieldAccess: data = {'related': resource_bad.related} assert access.check_related( 'related', mocker.MagicMock, data, obj=resource_bad) + assert access.check_related( + 'related', mocker.MagicMock, {}, obj=resource_bad) def test_existing_required_access(self, access, resource_bad, mocker): + # no-op actions, but mandatory kwarg requires check to pass assert not access.check_related( 'related', mocker.MagicMock, {}, obj=resource_bad, mandatory=True) + assert not access.check_related( + 'related', mocker.MagicMock, {'related': resource_bad.related}, + obj=resource_bad, mandatory=True) def test_existing_no_access_to_current( - self, access, resource_good, bad_role, mocker): + self, access, resource_good, resource_bad, mocker): """ User gives a valid related resource (like organization), but does not have access to _existing_ related resource, so deny action """ - resource_good.admin_role = bad_role - data = {'related': resource_good.related} - assert access.check_related( - 'related', mocker.MagicMock, data, obj=resource_good) + data = {'related': resource_good} + assert not access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) def test_existing_no_access_to_new( - self, access, resource_good, bad_role, mocker): - resource_good.related.admin_role = bad_role - data = {'related': resource_good.related} - assert access.check_related( + self, access, resource_good, resource_bad, mocker): + data = {'related': resource_bad} + assert not access.check_related( 'related', mocker.MagicMock, data, obj=resource_good) def test_existing_not_allowed_to_remove(self, access, resource_bad, mocker):