diff --git a/awx/main/access.py b/awx/main/access.py index 7fe6a35eaa..021ef309ab 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -196,32 +196,40 @@ class BaseAccess(object): obj=None, mandatory=False): ''' Check permission for related field, in scenarios: - - creating a new resource, user must have permission to the - resource specified in `data` + - creating a new resource, user must have permission if + resource is specified in `data` - editing an existing resource, user must have permission to resource in `data`, as well as existing related resource on `obj` - If `obj` is not provided, we assume this is a new resource + If obj.field is null, this does not block the action - If field is not given, return the inverse of `mandatory` + If `mandatory` is set, new resources require the field and + existing field will always be checked ''' new = None changed = True + is_removal = False if data and 'reference_obj' in data: # Use reference object's related fields, if given new = getattr(data['reference_obj'], field) elif data and field in data: # Obtain the resource specified in `data` raw_value = data[field] - try: - new_pk = int(raw_value) - if new_pk < 1 or (obj and new_pk == getattr(obj, '%s_id' % field, None)): - changed = False - else: - # Get the new resource from the database - new = get_object_or_400(Model, pk=new_pk) - except (TypeError, ValueError): - if type(raw_value) == Model: - new = raw_value + if isinstance(raw_value, Model): + new = raw_value + elif raw_value is None: + new = None + is_removal = True + else: + try: + new_pk = int(raw_value) + # Avoid database query by comparing pk to model for similarity + if obj and new_pk == getattr(obj, '%s_id' % field, None): + changed = False + else: + # Get the new resource from the database + new = get_object_or_400(Model, pk=new_pk) + except (TypeError, ValueError): + raise ParseError(_("Bad data found in related field %s." % field)) elif data is None or field not in data: changed = False @@ -230,11 +238,13 @@ class BaseAccess(object): if obj and (changed or mandatory): current = getattr(obj, field) - # Resource not changed, like a PUT request - if (obj is None and new is None) or (new is not None and new == current): + if obj and new == current and (not is_removal): + # Resource not changed, like a PUT request changed = False - if not new and not current and mandatory: + if (not new) and (not obj) and mandatory: + # Restrict ability to create resource without required field + print ' superuser ' return self.user.is_superuser def user_has_resource_access(resource): @@ -242,12 +252,8 @@ class BaseAccess(object): if role is None: # Handle special case where resource does not have direct roles access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field] - if not self.user.can_access(type(resource), access_method_type, resource, None): - return False - else: - if self.user not in role: - return False - return True + return self.user.can_access(type(resource), access_method_type, resource, None) + return self.user in role if new and changed: if not user_has_resource_access(new): @@ -257,8 +263,7 @@ class BaseAccess(object): if not user_has_resource_access(current): return False # User lacks access to existing resource - # User has access to both, permission check passed - return True + return True # User has access to both, permission check passed def check_license(self, add_host=False, feature=None, check_expiration=True): validation_info = TaskEnhancer().validate_enhancements() @@ -1870,24 +1875,14 @@ class ScheduleAccess(BaseAccess): @check_superuser def can_add(self, data): - pk = get_pk_from_dict(data, 'unified_job_template') - obj = get_object_or_400(UnifiedJobTemplate, pk=pk) - if obj: - return self.user.can_access(type(obj), 'change', obj, None) - else: - return False + return self.check_related('unified_job_template', UnifiedJobTemplate, data, mandatory=True) @check_superuser def can_change(self, obj, data): - self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True) + return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True) - @check_superuser def can_delete(self, obj): - if obj and obj.unified_job_template: - job_class = obj.unified_job_template - return self.user.can_access(type(job_class), 'change', job_class, None) - else: - return False + return self.can_change(obj, {}) class NotificationTemplateAccess(BaseAccess): ''' diff --git a/awx/main/tests/functional/test_rbac_label.py b/awx/main/tests/functional/test_rbac_label.py index c4f4438e50..57700d17e2 100644 --- a/awx/main/tests/functional/test_rbac_label.py +++ b/awx/main/tests/functional/test_rbac_label.py @@ -4,8 +4,6 @@ from awx.main.access import ( LabelAccess, ) -from rest_framework.exceptions import ParseError - @pytest.mark.django_db def test_label_get_queryset_user(label, user): access = LabelAccess(user('user', False)) diff --git a/awx/main/tests/unit/test_access.py b/awx/main/tests/unit/test_access.py index 806f622690..d2b2412ca0 100644 --- a/awx/main/tests/unit/test_access.py +++ b/awx/main/tests/unit/test_access.py @@ -3,6 +3,7 @@ import mock from django.contrib.auth.models import User from django.forms.models import model_to_dict +from rest_framework.exceptions import ParseError from awx.main.access import ( BaseAccess, @@ -16,6 +17,90 @@ from awx.conf.license import LicenseForbids from awx.main.models import Credential, Inventory, Project, Role, Organization, Instance +@pytest.fixture +def user_unit(): + return User(username='rando', password='raginrando', email='rando@redhat.com') + +class TestRelatedFieldAccess: + + @pytest.fixture + def good_role(self, mocker): + return mocker.MagicMock(__contains__=lambda self, user: True) + + @pytest.fixture + def bad_role(self, mocker): + return mocker.MagicMock(__contains__=lambda self, user: False) + + @pytest.fixture + def resource_good(self, mocker, good_role): + return mocker.MagicMock(related=mocker.MagicMock(admin_role=good_role), + admin_role=good_role) + + @pytest.fixture + def resource_bad(self, mocker, bad_role): + return mocker.MagicMock(related=mocker.MagicMock(admin_role=bad_role), + admin_role=bad_role) + + @pytest.fixture + def access(self, user_unit): + return BaseAccess(user_unit) + + def test_new_optional_fail(self, access, resource_bad, mocker): + """ + User tries to create a new resource, but lacks permission + to the related resource they provided + """ + data = {'related': resource_bad} + assert not access.check_related('related', mocker.MagicMock, data) + + def test_new_with_bad_data(self, access, resource_bad, mocker): + data = {'related': 3.1415} + with pytest.raises(ParseError): + access.check_related('related', mocker.MagicMock, data) + + def test_new_mandatory_fail(self, access, resource_bad, mocker): + access.user.is_superuser = False + assert not access.check_related( + 'related', mocker.MagicMock, {}, mandatory=True) + assert not access.check_related( + 'related', mocker.MagicMock, {'resource': None}, mandatory=True) + + def test_existing_no_op(self, access, resource_bad, mocker): + """ + User edits a resource, but does not change related field + lack of access to related field does not block action + """ + data = {'related': resource_bad.related} + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) + + def test_existing_required_access(self, access, resource_bad, mocker): + assert not access.check_related( + 'related', mocker.MagicMock, {}, obj=resource_bad, mandatory=True) + + def test_existing_no_access_to_current( + self, access, resource_good, bad_role, mocker): + """ + User gives a valid related resource (like organization), but does + not have access to _existing_ related resource, so deny action + """ + resource_good.admin_role = bad_role + data = {'related': resource_good.related} + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource_good) + + def test_existing_no_access_to_new( + self, access, resource_good, bad_role, mocker): + resource_good.related.admin_role = bad_role + data = {'related': resource_good.related} + assert access.check_related( + 'related', mocker.MagicMock, data, obj=resource_good) + + def test_existing_not_allowed_to_remove(self, access, resource_bad, mocker): + data = {'related': None} + assert not access.check_related( + 'related', mocker.MagicMock, data, obj=resource_bad) + @pytest.fixture def job_template_with_ids(job_template_factory): # Create non-persisted objects with IDs to send to job_template_factory @@ -31,10 +116,6 @@ def job_template_with_ids(job_template_factory): persisted=False) return jt_objects.job_template -@pytest.fixture -def user_unit(): - return User(username='rando', password='raginrando', email='rando@redhat.com') - def test_superuser(mocker): user = mocker.MagicMock(spec=User, id=1, is_superuser=True) access = BaseAccess(user)