diff --git a/awx/main/access.py b/awx/main/access.py index 8f29947394..7fe6a35eaa 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -193,7 +193,7 @@ class BaseAccess(object): return self.can_change(obj, data) def check_related(self, field, Model, data, role_field='admin_role', - obj=None, fail_on_missing=True, mandatory=False): + obj=None, mandatory=False): ''' Check permission for related field, in scenarios: - creating a new resource, user must have permission to the @@ -205,53 +205,56 @@ class BaseAccess(object): If field is not given, return the inverse of `mandatory` ''' new = None - new_pk = None changed = True if data and 'reference_obj' in data: # Use reference object's related fields, if given new = getattr(data['reference_obj'], field) - new_pk = getattr(new, 'pk', None) elif data and field in data: # Obtain the resource specified in `data` raw_value = data[field] try: new_pk = int(raw_value) + if new_pk < 1 or (obj and new_pk == getattr(obj, '%s_id' % field, None)): + changed = False + else: + # Get the new resource from the database + new = get_object_or_400(Model, pk=new_pk) except (TypeError, ValueError): if type(raw_value) == Model: new = raw_value - new_pk = getattr(new, 'pk', None) elif data is None or field not in data: changed = False # Obtain existing related resource current = None - current_pk = None if obj and (changed or mandatory): current = getattr(obj, field) - current_pk = getattr(obj, '%s_id' % field, None) - # Define special case where resource is not changed, for example, a PUT request - if (current_pk and new_pk == current_pk) or (obj is None and new_pk is None): + # Resource not changed, like a PUT request + if (obj is None and new is None) or (new is not None and new == current): changed = False - # Get the new resource from the database - if (new is None and changed) or (obj is None and mandatory): - if fail_on_missing: - new = get_object_or_400(Model, pk=new_pk) - else: - new = Model.objects.filter(pk=new_pk).first() - if not new and not current and mandatory: return self.user.is_superuser + def user_has_resource_access(resource): + role = getattr(resource, role_field, None) + if role is None: + # Handle special case where resource does not have direct roles + access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] + if not self.user.can_access(type(resource), access_method_type, resource, None): + return False + else: + if self.user not in role: + return False + return True + if new and changed: - new_role = getattr(new, role_field) - if self.user not in new_role: + if not user_has_resource_access(new): return False # User lacks access to provided resource if current and (changed or mandatory): - current_role = getattr(current, role_field) - if self.user not in current_role: + if not user_has_resource_access(current): return False # User lacks access to existing resource # User has access to both, permission check passed @@ -918,7 +921,7 @@ class ProjectAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - return self.check_related('organization', Organization, data, mandatory=True, fail_on_missing=False) + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): @@ -1378,7 +1381,7 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return True - if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, fail_on_missing=False): + if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True): return False if not self.can_use_prompted_resources(data): return False @@ -1876,11 +1879,7 @@ class ScheduleAccess(BaseAccess): @check_superuser def can_change(self, obj, data): - if obj and obj.unified_job_template: - job_class = obj.unified_job_template - return self.user.can_access(type(job_class), 'change', job_class, None) - else: - return False + self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True) @check_superuser def can_delete(self, obj): diff --git a/awx/main/tests/functional/test_rbac_job_templates.py b/awx/main/tests/functional/test_rbac_job_templates.py index 8905eb21a0..71412959c9 100644 --- a/awx/main/tests/functional/test_rbac_job_templates.py +++ b/awx/main/tests/functional/test_rbac_job_templates.py @@ -4,10 +4,12 @@ import pytest from awx.main.access import ( BaseAccess, JobTemplateAccess, + ScheduleAccess ) from awx.main.migrations import _rbac as rbac from awx.main.models import Permission from awx.main.models.jobs import JobTemplate +from awx.main.models.schedules import Schedule from django.apps import apps from django.core.urlresolvers import reverse @@ -247,3 +249,23 @@ def test_associate_label(label, user, job_template): job_template.admin_role.members.add(user('joe', False)) label.organization.read_role.members.add(user('joe', False)) assert access.can_attach(job_template, label, 'labels', None) + +@pytest.mark.django_db +def test_move_schedule_to_JT_no_access(job_template, rando): + schedule = Schedule.objects.create( + unified_job_template=job_template, + rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1') + job_template.admin_role.members.add(rando) + jt2 = JobTemplate.objects.create(name="other-jt") + access = ScheduleAccess(rando) + assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk)) + +@pytest.mark.django_db +def test_move_schedule_from_JT_no_access(job_template, rando): + schedule = Schedule.objects.create( + unified_job_template=job_template, + rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1') + jt2 = JobTemplate.objects.create(name="other-jt") + jt2.admin_role.members.add(rando) + access = ScheduleAccess(rando) + assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk)) diff --git a/awx/main/tests/functional/test_rbac_label.py b/awx/main/tests/functional/test_rbac_label.py index 98daa5cdec..c4f4438e50 100644 --- a/awx/main/tests/functional/test_rbac_label.py +++ b/awx/main/tests/functional/test_rbac_label.py @@ -54,8 +54,7 @@ def test_label_access_user(label, user): access = LabelAccess(user('user', False)) label.organization.member_role.members.add(user('user', False)) - with pytest.raises(ParseError): - access.can_add({'organization': None}) + assert not access.can_add({'organization': None}) assert not access.can_change(label, None) assert not access.can_delete(label)