expand check_related to polymorphic related fields

This commit is contained in:
AlanCoding
2016-11-05 11:01:59 -04:00
parent db890b5759
commit 2ddda56f09
3 changed files with 48 additions and 28 deletions

View File

@@ -193,7 +193,7 @@ class BaseAccess(object):
return self.can_change(obj, data) return self.can_change(obj, data)
def check_related(self, field, Model, data, role_field='admin_role', def check_related(self, field, Model, data, role_field='admin_role',
obj=None, fail_on_missing=True, mandatory=False): obj=None, mandatory=False):
''' '''
Check permission for related field, in scenarios: Check permission for related field, in scenarios:
- creating a new resource, user must have permission to the - creating a new resource, user must have permission to the
@@ -205,53 +205,56 @@ class BaseAccess(object):
If field is not given, return the inverse of `mandatory` If field is not given, return the inverse of `mandatory`
''' '''
new = None new = None
new_pk = None
changed = True changed = True
if data and 'reference_obj' in data: if data and 'reference_obj' in data:
# Use reference object's related fields, if given # Use reference object's related fields, if given
new = getattr(data['reference_obj'], field) new = getattr(data['reference_obj'], field)
new_pk = getattr(new, 'pk', None)
elif data and field in data: elif data and field in data:
# Obtain the resource specified in `data` # Obtain the resource specified in `data`
raw_value = data[field] raw_value = data[field]
try: try:
new_pk = int(raw_value) new_pk = int(raw_value)
if new_pk < 1 or (obj and new_pk == getattr(obj, '%s_id' % field, None)):
changed = False
else:
# Get the new resource from the database
new = get_object_or_400(Model, pk=new_pk)
except (TypeError, ValueError): except (TypeError, ValueError):
if type(raw_value) == Model: if type(raw_value) == Model:
new = raw_value new = raw_value
new_pk = getattr(new, 'pk', None)
elif data is None or field not in data: elif data is None or field not in data:
changed = False changed = False
# Obtain existing related resource # Obtain existing related resource
current = None current = None
current_pk = None
if obj and (changed or mandatory): if obj and (changed or mandatory):
current = getattr(obj, field) current = getattr(obj, field)
current_pk = getattr(obj, '%s_id' % field, None)
# Define special case where resource is not changed, for example, a PUT request # Resource not changed, like a PUT request
if (current_pk and new_pk == current_pk) or (obj is None and new_pk is None): if (obj is None and new is None) or (new is not None and new == current):
changed = False changed = False
# Get the new resource from the database
if (new is None and changed) or (obj is None and mandatory):
if fail_on_missing:
new = get_object_or_400(Model, pk=new_pk)
else:
new = Model.objects.filter(pk=new_pk).first()
if not new and not current and mandatory: if not new and not current and mandatory:
return self.user.is_superuser return self.user.is_superuser
def user_has_resource_access(resource):
role = getattr(resource, role_field, None)
if role is None:
# Handle special case where resource does not have direct roles
access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field]
if not self.user.can_access(type(resource), access_method_type, resource, None):
return False
else:
if self.user not in role:
return False
return True
if new and changed: if new and changed:
new_role = getattr(new, role_field) if not user_has_resource_access(new):
if self.user not in new_role:
return False # User lacks access to provided resource return False # User lacks access to provided resource
if current and (changed or mandatory): if current and (changed or mandatory):
current_role = getattr(current, role_field) if not user_has_resource_access(current):
if self.user not in current_role:
return False # User lacks access to existing resource return False # User lacks access to existing resource
# User has access to both, permission check passed # User has access to both, permission check passed
@@ -918,7 +921,7 @@ class ProjectAccess(BaseAccess):
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return Organization.accessible_objects(self.user, 'admin_role').exists() return Organization.accessible_objects(self.user, 'admin_role').exists()
return self.check_related('organization', Organization, data, mandatory=True, fail_on_missing=False) return self.check_related('organization', Organization, data, mandatory=True)
@check_superuser @check_superuser
def can_change(self, obj, data): def can_change(self, obj, data):
@@ -1378,7 +1381,7 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
def can_add(self, data): def can_add(self, data):
if not data: # So the browseable API will work if not data: # So the browseable API will work
return True return True
if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, fail_on_missing=False): if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True):
return False return False
if not self.can_use_prompted_resources(data): if not self.can_use_prompted_resources(data):
return False return False
@@ -1876,11 +1879,7 @@ class ScheduleAccess(BaseAccess):
@check_superuser @check_superuser
def can_change(self, obj, data): def can_change(self, obj, data):
if obj and obj.unified_job_template: self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True)
job_class = obj.unified_job_template
return self.user.can_access(type(job_class), 'change', job_class, None)
else:
return False
@check_superuser @check_superuser
def can_delete(self, obj): def can_delete(self, obj):

View File

@@ -4,10 +4,12 @@ import pytest
from awx.main.access import ( from awx.main.access import (
BaseAccess, BaseAccess,
JobTemplateAccess, JobTemplateAccess,
ScheduleAccess
) )
from awx.main.migrations import _rbac as rbac from awx.main.migrations import _rbac as rbac
from awx.main.models import Permission from awx.main.models import Permission
from awx.main.models.jobs import JobTemplate from awx.main.models.jobs import JobTemplate
from awx.main.models.schedules import Schedule
from django.apps import apps from django.apps import apps
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
@@ -247,3 +249,23 @@ def test_associate_label(label, user, job_template):
job_template.admin_role.members.add(user('joe', False)) job_template.admin_role.members.add(user('joe', False))
label.organization.read_role.members.add(user('joe', False)) label.organization.read_role.members.add(user('joe', False))
assert access.can_attach(job_template, label, 'labels', None) assert access.can_attach(job_template, label, 'labels', None)
@pytest.mark.django_db
def test_move_schedule_to_JT_no_access(job_template, rando):
schedule = Schedule.objects.create(
unified_job_template=job_template,
rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1')
job_template.admin_role.members.add(rando)
jt2 = JobTemplate.objects.create(name="other-jt")
access = ScheduleAccess(rando)
assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk))
@pytest.mark.django_db
def test_move_schedule_from_JT_no_access(job_template, rando):
schedule = Schedule.objects.create(
unified_job_template=job_template,
rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1')
jt2 = JobTemplate.objects.create(name="other-jt")
jt2.admin_role.members.add(rando)
access = ScheduleAccess(rando)
assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk))

View File

@@ -54,8 +54,7 @@ def test_label_access_user(label, user):
access = LabelAccess(user('user', False)) access = LabelAccess(user('user', False))
label.organization.member_role.members.add(user('user', False)) label.organization.member_role.members.add(user('user', False))
with pytest.raises(ParseError): assert not access.can_add({'organization': None})
access.can_add({'organization': None})
assert not access.can_change(label, None) assert not access.can_change(label, None)
assert not access.can_delete(label) assert not access.can_delete(label)