diff --git a/awx/main/access.py b/awx/main/access.py index 0fea7b8852..d2180cadaa 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -192,6 +192,73 @@ class BaseAccess(object): def can_unattach(self, obj, sub_obj, relationship, data=None): return self.can_change(obj, data) + def check_related(self, field, Model, data, role_field='admin_role', + obj=None, mandatory=False): + ''' + Check permission for related field, in scenarios: + - creating a new resource, user must have permission if + resource is specified in `data` + - editing an existing resource, user must have permission to resource + in `data`, as well as existing related resource on `obj` + + If `mandatory` is set, new resources require the field and + existing field will always be checked + ''' + new = None + changed = True + if data and 'reference_obj' in data: + # Use reference object's related fields, if given + new = getattr(data['reference_obj'], field) + elif data and field in data: + # Obtain the resource specified in `data` + raw_value = data[field] + if isinstance(raw_value, Model): + new = raw_value + elif raw_value is None: + new = None + else: + try: + new_pk = int(raw_value) + # Avoid database query by comparing pk to model for similarity + if obj and new_pk == getattr(obj, '%s_id' % field, None): + changed = False + else: + # Get the new resource from the database + new = get_object_or_400(Model, pk=new_pk) + except (TypeError, ValueError): + raise ParseError(_("Bad data found in related field %s." % field)) + elif data is None or field not in data: + changed = False + + # Obtain existing related resource + current = None + if obj and (changed or mandatory): + current = getattr(obj, field) + + if obj and new == current: + # Resource not changed, like a PUT request + changed = False + + if (not new) and (not obj) and mandatory: + # Restrict ability to create resource without required field + return self.user.is_superuser + + def user_has_resource_access(resource): + role = getattr(resource, role_field, None) + if role is None: + # Handle special case where resource does not have direct roles + access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] + return self.user.can_access(type(resource), access_method_type, resource, None) + return self.user in role + + if new and changed and (not user_has_resource_access(new)): + return False # User lacks access to provided resource + + if current and (changed or mandatory) and (not user_has_resource_access(current)): + return False # User lacks access to existing resource + + return True # User has access to both, permission check passed + def check_license(self, add_host=False, feature=None, check_expiration=True): validation_info = TaskEnhancer().validate_enhancements() if ('test' in sys.argv or 'py.test' in sys.argv[0] or 'jenkins' in sys.argv) and not os.environ.get('SKIP_LICENSE_FIXUP_FOR_TEST', ''): @@ -461,9 +528,7 @@ class InventoryAccess(BaseAccess): if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) @check_superuser def can_change(self, obj, data): @@ -472,14 +537,8 @@ class InventoryAccess(BaseAccess): @check_superuser def can_admin(self, obj, data): # Verify that the user has access to the new organization if moving an - # inventory to a new organization. - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False - # Otherwise, just check for admin permission. - return self.user in obj.admin_role + # inventory to a new organization. Otherwise, just check for admin permission. + return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role def can_delete(self, obj): is_can_admin = self.can_admin(obj, None) @@ -523,9 +582,7 @@ class HostAccess(BaseAccess): return Inventory.accessible_objects(self.user, 'admin_role').exists() # Checks for admin or change permission on inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - inventory = get_object_or_400(Inventory, pk=inventory_pk) - if self.user not in inventory.admin_role: + if not self.check_related('inventory', Inventory, data): return False # Check to see if we have enough licenses @@ -574,9 +631,7 @@ class GroupAccess(BaseAccess): if not data or 'inventory' not in data: return False # Checks for admin or change permission on inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - inventory = get_object_or_400(Inventory, pk=inventory_pk) - return self.user in inventory.admin_role + return self.check_related('inventory', Inventory, data) def can_change(self, obj, data): # Prevent moving a group to a different inventory. @@ -795,11 +850,7 @@ class TeamAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - if self.user in org.admin_role: - return True - return False + return self.check_related('organization', Organization, data) def can_change(self, obj, data): # Prevent moving a team to a different organization. @@ -866,17 +917,12 @@ class ProjectAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - organization_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=organization_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False + if not self.check_related('organization', Organization, data): + return False return self.user in obj.admin_role def can_delete(self, obj): @@ -974,7 +1020,7 @@ class JobTemplateAccess(BaseAccess): Inventory.accessible_objects(self.user, 'use_role').exists()) # if reference_obj is provided, determine if it can be coppied - reference_obj = data.pop('reference_obj', None) + reference_obj = data.get('reference_obj', None) if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN: self.check_license(feature='system_tracking') @@ -996,22 +1042,16 @@ class JobTemplateAccess(BaseAccess): return None # If a credential is provided, the user should have use access to it. - credential = get_value(Credential, 'credential') - if credential: - if self.user not in credential.use_role: - return False + if not self.check_related('credential', Credential, data, role_field='use_role'): + return False # If a cloud credential is provided, the user should have use access. - cloud_credential = get_value(Credential, 'cloud_credential') - if cloud_credential: - if self.user not in cloud_credential.use_role: - return False + if not self.check_related('cloud_credential', Credential, data, role_field='use_role'): + return False # If a network credential is provided, the user should have use access. - network_credential = get_value(Credential, 'network_credential') - if network_credential: - if self.user not in network_credential.use_role: - return False + if not self.check_related('network_credential', Credential, data, role_field='use_role'): + return False # If an inventory is provided, the user should have use access. inventory = get_value(Inventory, 'inventory') @@ -1327,28 +1367,17 @@ class WorkflowJobTemplateNodeAccess(BaseAccess): return qs def can_use_prompted_resources(self, data): - cred_pk = data.get('credential', None) - inv_pk = data.get('inventory', None) - if cred_pk: - credential = get_object_or_400(Credential, pk=cred_pk) - if self.user not in credential.use_role: - return False - if inv_pk: - inventory = get_object_or_400(Inventory, pk=inv_pk) - if self.user not in inventory.use_role: - return False + if not self.check_related('credential', Credential, data): + return False + if not self.check_related('inventory', Inventory, data): + return False return True @check_superuser def can_add(self, data): if not data: # So the browseable API will work return True - wfjt_pk = data.get('workflow_job_template', None) - if wfjt_pk: - wfjt = get_object_or_400(WorkflowJobTemplate, pk=wfjt_pk) - if self.user not in wfjt.admin_role: - return False - else: + if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True): return False if not self.can_use_prompted_resources(data): return False @@ -1451,6 +1480,7 @@ class WorkflowJobTemplateAccess(BaseAccess): def can_read(self, obj): return self.user in obj.read_role + @check_superuser def can_add(self, data): ''' a user can create a job template if they are a superuser, an org admin @@ -1491,13 +1521,7 @@ class WorkflowJobTemplateAccess(BaseAccess): # if 'survey_enabled' in data and data['survey_enabled']: # self.check_license(feature='surveys') - org_pk = get_pk_from_dict(data, 'organization') - if not org_pk: - # only superusers can create or manage orphan WFJTs - return self.user.is_superuser - - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) def can_start(self, obj, validate_license=True): if validate_license: @@ -1520,21 +1544,8 @@ class WorkflowJobTemplateAccess(BaseAccess): if self.user.is_superuser: return True - if data is None: - return self.user in obj.admin_role - org_pk = get_pk_from_dict(data, 'organization') - if ('organization' not in data or - (org_pk is None and obj.organization is None) or - (obj.organization and obj.organization.pk == org_pk)): - # No organization changes - return self.user in obj.admin_role - - # If it already has an organization set, must be admin of the org to change it - if obj.organization and self.user not in obj.organization.admin_role: - return False - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role def can_delete(self, obj): is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role @@ -1616,19 +1627,13 @@ class AdHocCommandAccess(BaseAccess): self.check_license() # If a credential is provided, the user should have use access to it. - credential_pk = get_pk_from_dict(data, 'credential') - if credential_pk: - credential = get_object_or_400(Credential, pk=credential_pk) - if self.user not in credential.use_role: - return False + if not self.check_related('credential', Credential, data, role_field='use_role'): + return False # Check that the user has the run ad hoc command permission on the # given inventory. - inventory_pk = get_pk_from_dict(data, 'inventory') - if inventory_pk: - inventory = get_object_or_400(Inventory, pk=inventory_pk) - if self.user not in inventory.adhoc_role: - return False + if not self.check_related('inventory', Inventory, data, role_field='adhoc_role'): + return False return True @@ -1861,28 +1866,14 @@ class ScheduleAccess(BaseAccess): @check_superuser def can_add(self, data): - pk = get_pk_from_dict(data, 'unified_job_template') - obj = get_object_or_400(UnifiedJobTemplate, pk=pk) - if obj: - return self.user.can_access(type(obj), 'change', obj, None) - else: - return False + return self.check_related('unified_job_template', UnifiedJobTemplate, data, mandatory=True) @check_superuser def can_change(self, obj, data): - if obj and obj.unified_job_template: - job_class = obj.unified_job_template - return self.user.can_access(type(job_class), 'change', job_class, None) - else: - return False + return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True) - @check_superuser def can_delete(self, obj): - if obj and obj.unified_job_template: - job_class = obj.unified_job_template - return self.user.can_access(type(job_class), 'change', job_class, None) - else: - return False + return self.can_change(obj, {}) class NotificationTemplateAccess(BaseAccess): ''' @@ -1911,21 +1902,14 @@ class NotificationTemplateAccess(BaseAccess): def can_add(self, data): if not data: return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data, mandatory=True) @check_superuser def can_change(self, obj, data): if obj.organization is None: # only superusers are allowed to edit orphan notification templates return False - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False - return self.user in obj.organization.admin_role + return self.check_related('organization', Organization, data, obj=obj, mandatory=True) def can_admin(self, obj, data): return self.can_change(obj, data) @@ -1981,17 +1965,13 @@ class LabelAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return True - - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.member_role + return self.check_related('organization', Organization, data, role_field='member_role', mandatory=True) @check_superuser def can_change(self, obj, data): if self.can_add(data) is False: return False - - return self.user in obj.organization.admin_role + return self.check_related('organization', Organization, data, obj=obj, mandatory=True) def can_delete(self, obj): return self.can_change(obj, None) @@ -2087,18 +2067,11 @@ class CustomInventoryScriptAccess(BaseAccess): def can_add(self, data): if not data: # So the browseable API will work return Organization.accessible_objects(self.user, 'admin_role').exists() - org_pk = get_pk_from_dict(data, 'organization') - org = get_object_or_400(Organization, pk=org_pk) - return self.user in org.admin_role + return self.check_related('organization', Organization, data) @check_superuser def can_admin(self, obj, data=None): - org_pk = get_pk_from_dict(data, 'organization') - if obj and org_pk and obj.organization and obj.organization.pk != org_pk: - org = get_object_or_400(Organization, pk=org_pk) - if self.user not in org.admin_role: - return False - return self.user in obj.admin_role + return self.check_related('organization', Organization, data, obj=obj) @check_superuser def can_change(self, obj, data): diff --git a/awx/main/tests/functional/test_projects.py b/awx/main/tests/functional/test_projects.py index 40ea659432..0a8fec4e72 100644 --- a/awx/main/tests/functional/test_projects.py +++ b/awx/main/tests/functional/test_projects.py @@ -121,7 +121,7 @@ def test_create_project_null_organization(post, organization, admin): @pytest.mark.django_db() def test_create_project_null_organization_xfail(post, organization, org_admin): - post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=400) + post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=403) @pytest.mark.django_db() def test_patch_project_null_organization(patch, organization, project, admin): diff --git a/awx/main/tests/functional/test_rbac_job_templates.py b/awx/main/tests/functional/test_rbac_job_templates.py index 8905eb21a0..71412959c9 100644 --- a/awx/main/tests/functional/test_rbac_job_templates.py +++ b/awx/main/tests/functional/test_rbac_job_templates.py @@ -4,10 +4,12 @@ import pytest from awx.main.access import ( BaseAccess, JobTemplateAccess, + ScheduleAccess ) from awx.main.migrations import _rbac as rbac from awx.main.models import Permission from awx.main.models.jobs import JobTemplate +from awx.main.models.schedules import Schedule from django.apps import apps from django.core.urlresolvers import reverse @@ -247,3 +249,23 @@ def test_associate_label(label, user, job_template): job_template.admin_role.members.add(user('joe', False)) label.organization.read_role.members.add(user('joe', False)) assert access.can_attach(job_template, label, 'labels', None) + +@pytest.mark.django_db +def test_move_schedule_to_JT_no_access(job_template, rando): + schedule = Schedule.objects.create( + unified_job_template=job_template, + rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1') + job_template.admin_role.members.add(rando) + jt2 = JobTemplate.objects.create(name="other-jt") + access = ScheduleAccess(rando) + assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk)) + +@pytest.mark.django_db +def test_move_schedule_from_JT_no_access(job_template, rando): + schedule = Schedule.objects.create( + unified_job_template=job_template, + rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1') + jt2 = JobTemplate.objects.create(name="other-jt") + jt2.admin_role.members.add(rando) + access = ScheduleAccess(rando) + assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk)) diff --git a/awx/main/tests/functional/test_rbac_label.py b/awx/main/tests/functional/test_rbac_label.py index 98daa5cdec..57700d17e2 100644 --- a/awx/main/tests/functional/test_rbac_label.py +++ b/awx/main/tests/functional/test_rbac_label.py @@ -4,8 +4,6 @@ from awx.main.access import ( LabelAccess, ) -from rest_framework.exceptions import ParseError - @pytest.mark.django_db def test_label_get_queryset_user(label, user): access = LabelAccess(user('user', False)) @@ -54,8 +52,7 @@ def test_label_access_user(label, user): access = LabelAccess(user('user', False)) label.organization.member_role.members.add(user('user', False)) - with pytest.raises(ParseError): - access.can_add({'organization': None}) + assert not access.can_add({'organization': None}) assert not access.can_change(label, None) assert not access.can_delete(label) diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index 806f622690..83b07d3ef6 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -3,6 +3,7 @@ import mock from django.contrib.auth.models import User from django.forms.models import model_to_dict +from rest_framework.exceptions import ParseError from awx.main.access import ( BaseAccess, @@ -16,6 +17,96 @@ from awx.conf.license import LicenseForbids from awx.main.models import Credential, Inventory, Project, Role, Organization, Instance +@pytest.fixture +def user_unit(): + return User(username='rando', password='raginrando', email='rando@redhat.com') + +class TestRelatedFieldAccess: + + @pytest.fixture + def resource_good(self, mocker): + good_role = mocker.MagicMock(__contains__=lambda self, user: True) + return mocker.MagicMock(related=mocker.MagicMock(admin_role=good_role), + admin_role=good_role) + + @pytest.fixture + def resource_bad(self, mocker): + bad_role = mocker.MagicMock(__contains__=lambda self, user: False) + return mocker.MagicMock(related=mocker.MagicMock(admin_role=bad_role), + admin_role=bad_role) + + @pytest.fixture + def access(self, user_unit): + return BaseAccess(user_unit) + + def test_new_optional_fail(self, access, resource_bad, mocker): + """ + User tries to create a new resource, but lacks permission + to the related resource they provided + """ + data = {'related': resource_bad} + assert not access.check_related('related', mocker.MagicMock, data) + + def test_new_with_bad_data(self, access, mocker): + data = {'related': 3.1415} + with pytest.raises(ParseError): + access.check_related('related', mocker.MagicMock, data) + + def test_new_mandatory_fail(self, access, mocker): + access.user.is_superuser = False + assert not access.check_related( + 'related', mocker.MagicMock, {}, mandatory=True) + assert not access.check_related( + 'related', mocker.MagicMock, {'resource': None}, mandatory=True) + + def test_existing_no_op(self, access, resource_bad, mocker): + """ + User edits a resource, but does not change related field + lack of access to related field does not block action + """ + data = {'related': resource_bad.related} + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) + assert access.check_related( + 'related', mocker.MagicMock, {}, obj=resource_bad) + + def test_existing_required_access(self, access, resource_bad, mocker): + # no-op actions, but mandatory kwarg requires check to pass + assert not access.check_related( + 'related', mocker.MagicMock, {}, obj=resource_bad, mandatory=True) + assert not access.check_related( + 'related', mocker.MagicMock, {'related': resource_bad.related}, + obj=resource_bad, mandatory=True) + + def test_existing_no_access_to_current( + self, access, resource_good, resource_bad, mocker): + """ + User gives a valid related resource (like organization), but does + not have access to _existing_ related resource, so deny action + """ + data = {'related': resource_good} + assert not access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) + + def test_existing_no_access_to_new( + self, access, resource_good, resource_bad, mocker): + data = {'related': resource_bad} + assert not access.check_related( + 'related', mocker.MagicMock, data, obj=resource_good) + + def test_existing_not_allowed_to_remove(self, access, resource_bad, mocker): + data = {'related': None} + assert not access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) + + def test_existing_not_null_null(self, access, mocker): + resource = mocker.MagicMock(related=None) + data = {'related': None} + # Not changing anything by giving null when it is already-null + # important for PUT requests + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource, mandatory=True) + @pytest.fixture def job_template_with_ids(job_template_factory): # Create non-persisted objects with IDs to send to job_template_factory @@ -31,10 +122,6 @@ def job_template_with_ids(job_template_factory): persisted=False) return jt_objects.job_template -@pytest.fixture -def user_unit(): - return User(username='rando', password='raginrando', email='rando@redhat.com') - def test_superuser(mocker): user = mocker.MagicMock(spec=User, id=1, is_superuser=True) access = BaseAccess(user)