mirror of
https://github.com/ansible/awx.git
synced 2026-05-08 09:57:35 -02:30
unit test related field checks, finialize scenarios
This commit is contained in:
@@ -196,32 +196,40 @@ class BaseAccess(object):
|
|||||||
obj=None, mandatory=False):
|
obj=None, mandatory=False):
|
||||||
'''
|
'''
|
||||||
Check permission for related field, in scenarios:
|
Check permission for related field, in scenarios:
|
||||||
- creating a new resource, user must have permission to the
|
- creating a new resource, user must have permission if
|
||||||
resource specified in `data`
|
resource is specified in `data`
|
||||||
- editing an existing resource, user must have permission to resource
|
- editing an existing resource, user must have permission to resource
|
||||||
in `data`, as well as existing related resource on `obj`
|
in `data`, as well as existing related resource on `obj`
|
||||||
If `obj` is not provided, we assume this is a new resource
|
|
||||||
If obj.field is null, this does not block the action
|
If obj.field is null, this does not block the action
|
||||||
If field is not given, return the inverse of `mandatory`
|
If `mandatory` is set, new resources require the field and
|
||||||
|
existing field will always be checked
|
||||||
'''
|
'''
|
||||||
new = None
|
new = None
|
||||||
changed = True
|
changed = True
|
||||||
|
is_removal = False
|
||||||
if data and 'reference_obj' in data:
|
if data and 'reference_obj' in data:
|
||||||
# Use reference object's related fields, if given
|
# Use reference object's related fields, if given
|
||||||
new = getattr(data['reference_obj'], field)
|
new = getattr(data['reference_obj'], field)
|
||||||
elif data and field in data:
|
elif data and field in data:
|
||||||
# Obtain the resource specified in `data`
|
# Obtain the resource specified in `data`
|
||||||
raw_value = data[field]
|
raw_value = data[field]
|
||||||
try:
|
if isinstance(raw_value, Model):
|
||||||
new_pk = int(raw_value)
|
new = raw_value
|
||||||
if new_pk < 1 or (obj and new_pk == getattr(obj, '%s_id' % field, None)):
|
elif raw_value is None:
|
||||||
changed = False
|
new = None
|
||||||
else:
|
is_removal = True
|
||||||
# Get the new resource from the database
|
else:
|
||||||
new = get_object_or_400(Model, pk=new_pk)
|
try:
|
||||||
except (TypeError, ValueError):
|
new_pk = int(raw_value)
|
||||||
if type(raw_value) == Model:
|
# Avoid database query by comparing pk to model for similarity
|
||||||
new = raw_value
|
if obj and new_pk == getattr(obj, '%s_id' % field, None):
|
||||||
|
changed = False
|
||||||
|
else:
|
||||||
|
# Get the new resource from the database
|
||||||
|
new = get_object_or_400(Model, pk=new_pk)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
raise ParseError(_("Bad data found in related field %s." % field))
|
||||||
elif data is None or field not in data:
|
elif data is None or field not in data:
|
||||||
changed = False
|
changed = False
|
||||||
|
|
||||||
@@ -230,11 +238,13 @@ class BaseAccess(object):
|
|||||||
if obj and (changed or mandatory):
|
if obj and (changed or mandatory):
|
||||||
current = getattr(obj, field)
|
current = getattr(obj, field)
|
||||||
|
|
||||||
# Resource not changed, like a PUT request
|
if obj and new == current and (not is_removal):
|
||||||
if (obj is None and new is None) or (new is not None and new == current):
|
# Resource not changed, like a PUT request
|
||||||
changed = False
|
changed = False
|
||||||
|
|
||||||
if not new and not current and mandatory:
|
if (not new) and (not obj) and mandatory:
|
||||||
|
# Restrict ability to create resource without required field
|
||||||
|
print ' superuser '
|
||||||
return self.user.is_superuser
|
return self.user.is_superuser
|
||||||
|
|
||||||
def user_has_resource_access(resource):
|
def user_has_resource_access(resource):
|
||||||
@@ -242,12 +252,8 @@ class BaseAccess(object):
|
|||||||
if role is None:
|
if role is None:
|
||||||
# Handle special case where resource does not have direct roles
|
# Handle special case where resource does not have direct roles
|
||||||
access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field]
|
access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field]
|
||||||
if not self.user.can_access(type(resource), access_method_type, resource, None):
|
return self.user.can_access(type(resource), access_method_type, resource, None)
|
||||||
return False
|
return self.user in role
|
||||||
else:
|
|
||||||
if self.user not in role:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
if new and changed:
|
if new and changed:
|
||||||
if not user_has_resource_access(new):
|
if not user_has_resource_access(new):
|
||||||
@@ -257,8 +263,7 @@ class BaseAccess(object):
|
|||||||
if not user_has_resource_access(current):
|
if not user_has_resource_access(current):
|
||||||
return False # User lacks access to existing resource
|
return False # User lacks access to existing resource
|
||||||
|
|
||||||
# User has access to both, permission check passed
|
return True # User has access to both, permission check passed
|
||||||
return True
|
|
||||||
|
|
||||||
def check_license(self, add_host=False, feature=None, check_expiration=True):
|
def check_license(self, add_host=False, feature=None, check_expiration=True):
|
||||||
validation_info = TaskEnhancer().validate_enhancements()
|
validation_info = TaskEnhancer().validate_enhancements()
|
||||||
@@ -1870,24 +1875,14 @@ class ScheduleAccess(BaseAccess):
|
|||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
pk = get_pk_from_dict(data, 'unified_job_template')
|
return self.check_related('unified_job_template', UnifiedJobTemplate, data, mandatory=True)
|
||||||
obj = get_object_or_400(UnifiedJobTemplate, pk=pk)
|
|
||||||
if obj:
|
|
||||||
return self.user.can_access(type(obj), 'change', obj, None)
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
@check_superuser
|
@check_superuser
|
||||||
def can_change(self, obj, data):
|
def can_change(self, obj, data):
|
||||||
self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True)
|
return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True)
|
||||||
|
|
||||||
@check_superuser
|
|
||||||
def can_delete(self, obj):
|
def can_delete(self, obj):
|
||||||
if obj and obj.unified_job_template:
|
return self.can_change(obj, {})
|
||||||
job_class = obj.unified_job_template
|
|
||||||
return self.user.can_access(type(job_class), 'change', job_class, None)
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
class NotificationTemplateAccess(BaseAccess):
|
class NotificationTemplateAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
|
|||||||
@@ -4,8 +4,6 @@ from awx.main.access import (
|
|||||||
LabelAccess,
|
LabelAccess,
|
||||||
)
|
)
|
||||||
|
|
||||||
from rest_framework.exceptions import ParseError
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_label_get_queryset_user(label, user):
|
def test_label_get_queryset_user(label, user):
|
||||||
access = LabelAccess(user('user', False))
|
access = LabelAccess(user('user', False))
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import mock
|
|||||||
|
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
from django.forms.models import model_to_dict
|
from django.forms.models import model_to_dict
|
||||||
|
from rest_framework.exceptions import ParseError
|
||||||
|
|
||||||
from awx.main.access import (
|
from awx.main.access import (
|
||||||
BaseAccess,
|
BaseAccess,
|
||||||
@@ -16,6 +17,90 @@ from awx.conf.license import LicenseForbids
|
|||||||
from awx.main.models import Credential, Inventory, Project, Role, Organization, Instance
|
from awx.main.models import Credential, Inventory, Project, Role, Organization, Instance
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def user_unit():
|
||||||
|
return User(username='rando', password='raginrando', email='rando@redhat.com')
|
||||||
|
|
||||||
|
class TestRelatedFieldAccess:
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def good_role(self, mocker):
|
||||||
|
return mocker.MagicMock(__contains__=lambda self, user: True)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def bad_role(self, mocker):
|
||||||
|
return mocker.MagicMock(__contains__=lambda self, user: False)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def resource_good(self, mocker, good_role):
|
||||||
|
return mocker.MagicMock(related=mocker.MagicMock(admin_role=good_role),
|
||||||
|
admin_role=good_role)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def resource_bad(self, mocker, bad_role):
|
||||||
|
return mocker.MagicMock(related=mocker.MagicMock(admin_role=bad_role),
|
||||||
|
admin_role=bad_role)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def access(self, user_unit):
|
||||||
|
return BaseAccess(user_unit)
|
||||||
|
|
||||||
|
def test_new_optional_fail(self, access, resource_bad, mocker):
|
||||||
|
"""
|
||||||
|
User tries to create a new resource, but lacks permission
|
||||||
|
to the related resource they provided
|
||||||
|
"""
|
||||||
|
data = {'related': resource_bad}
|
||||||
|
assert not access.check_related('related', mocker.MagicMock, data)
|
||||||
|
|
||||||
|
def test_new_with_bad_data(self, access, resource_bad, mocker):
|
||||||
|
data = {'related': 3.1415}
|
||||||
|
with pytest.raises(ParseError):
|
||||||
|
access.check_related('related', mocker.MagicMock, data)
|
||||||
|
|
||||||
|
def test_new_mandatory_fail(self, access, resource_bad, mocker):
|
||||||
|
access.user.is_superuser = False
|
||||||
|
assert not access.check_related(
|
||||||
|
'related', mocker.MagicMock, {}, mandatory=True)
|
||||||
|
assert not access.check_related(
|
||||||
|
'related', mocker.MagicMock, {'resource': None}, mandatory=True)
|
||||||
|
|
||||||
|
def test_existing_no_op(self, access, resource_bad, mocker):
|
||||||
|
"""
|
||||||
|
User edits a resource, but does not change related field
|
||||||
|
lack of access to related field does not block action
|
||||||
|
"""
|
||||||
|
data = {'related': resource_bad.related}
|
||||||
|
assert access.check_related(
|
||||||
|
'related', mocker.MagicMock, data, obj=resource_bad)
|
||||||
|
|
||||||
|
def test_existing_required_access(self, access, resource_bad, mocker):
|
||||||
|
assert not access.check_related(
|
||||||
|
'related', mocker.MagicMock, {}, obj=resource_bad, mandatory=True)
|
||||||
|
|
||||||
|
def test_existing_no_access_to_current(
|
||||||
|
self, access, resource_good, bad_role, mocker):
|
||||||
|
"""
|
||||||
|
User gives a valid related resource (like organization), but does
|
||||||
|
not have access to _existing_ related resource, so deny action
|
||||||
|
"""
|
||||||
|
resource_good.admin_role = bad_role
|
||||||
|
data = {'related': resource_good.related}
|
||||||
|
assert access.check_related(
|
||||||
|
'related', mocker.MagicMock, data, obj=resource_good)
|
||||||
|
|
||||||
|
def test_existing_no_access_to_new(
|
||||||
|
self, access, resource_good, bad_role, mocker):
|
||||||
|
resource_good.related.admin_role = bad_role
|
||||||
|
data = {'related': resource_good.related}
|
||||||
|
assert access.check_related(
|
||||||
|
'related', mocker.MagicMock, data, obj=resource_good)
|
||||||
|
|
||||||
|
def test_existing_not_allowed_to_remove(self, access, resource_bad, mocker):
|
||||||
|
data = {'related': None}
|
||||||
|
assert not access.check_related(
|
||||||
|
'related', mocker.MagicMock, data, obj=resource_bad)
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def job_template_with_ids(job_template_factory):
|
def job_template_with_ids(job_template_factory):
|
||||||
# Create non-persisted objects with IDs to send to job_template_factory
|
# Create non-persisted objects with IDs to send to job_template_factory
|
||||||
@@ -31,10 +116,6 @@ def job_template_with_ids(job_template_factory):
|
|||||||
persisted=False)
|
persisted=False)
|
||||||
return jt_objects.job_template
|
return jt_objects.job_template
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def user_unit():
|
|
||||||
return User(username='rando', password='raginrando', email='rando@redhat.com')
|
|
||||||
|
|
||||||
def test_superuser(mocker):
|
def test_superuser(mocker):
|
||||||
user = mocker.MagicMock(spec=User, id=1, is_superuser=True)
|
user = mocker.MagicMock(spec=User, id=1, is_superuser=True)
|
||||||
access = BaseAccess(user)
|
access = BaseAccess(user)
|
||||||
|
|||||||
Reference in New Issue
Block a user