mirror of
https://github.com/ansible/awx.git
synced 2026-01-22 15:08:03 -03:30
Merge pull request #3877 from AlanCoding/related_access
[RBAC] Refactor of ForeignKey access checks
This commit is contained in:
commit
3c60e254b9
@ -192,6 +192,73 @@ class BaseAccess(object):
|
||||
def can_unattach(self, obj, sub_obj, relationship, data=None):
|
||||
return self.can_change(obj, data)
|
||||
|
||||
def check_related(self, field, Model, data, role_field='admin_role',
|
||||
obj=None, mandatory=False):
|
||||
'''
|
||||
Check permission for related field, in scenarios:
|
||||
- creating a new resource, user must have permission if
|
||||
resource is specified in `data`
|
||||
- editing an existing resource, user must have permission to resource
|
||||
in `data`, as well as existing related resource on `obj`
|
||||
|
||||
If `mandatory` is set, new resources require the field and
|
||||
existing field will always be checked
|
||||
'''
|
||||
new = None
|
||||
changed = True
|
||||
if data and 'reference_obj' in data:
|
||||
# Use reference object's related fields, if given
|
||||
new = getattr(data['reference_obj'], field)
|
||||
elif data and field in data:
|
||||
# Obtain the resource specified in `data`
|
||||
raw_value = data[field]
|
||||
if isinstance(raw_value, Model):
|
||||
new = raw_value
|
||||
elif raw_value is None:
|
||||
new = None
|
||||
else:
|
||||
try:
|
||||
new_pk = int(raw_value)
|
||||
# Avoid database query by comparing pk to model for similarity
|
||||
if obj and new_pk == getattr(obj, '%s_id' % field, None):
|
||||
changed = False
|
||||
else:
|
||||
# Get the new resource from the database
|
||||
new = get_object_or_400(Model, pk=new_pk)
|
||||
except (TypeError, ValueError):
|
||||
raise ParseError(_("Bad data found in related field %s." % field))
|
||||
elif data is None or field not in data:
|
||||
changed = False
|
||||
|
||||
# Obtain existing related resource
|
||||
current = None
|
||||
if obj and (changed or mandatory):
|
||||
current = getattr(obj, field)
|
||||
|
||||
if obj and new == current:
|
||||
# Resource not changed, like a PUT request
|
||||
changed = False
|
||||
|
||||
if (not new) and (not obj) and mandatory:
|
||||
# Restrict ability to create resource without required field
|
||||
return self.user.is_superuser
|
||||
|
||||
def user_has_resource_access(resource):
|
||||
role = getattr(resource, role_field, None)
|
||||
if role is None:
|
||||
# Handle special case where resource does not have direct roles
|
||||
access_method_type = {'admin_role': 'change', 'execute_role': 'start'}[role_field]
|
||||
return self.user.can_access(type(resource), access_method_type, resource, None)
|
||||
return self.user in role
|
||||
|
||||
if new and changed and (not user_has_resource_access(new)):
|
||||
return False # User lacks access to provided resource
|
||||
|
||||
if current and (changed or mandatory) and (not user_has_resource_access(current)):
|
||||
return False # User lacks access to existing resource
|
||||
|
||||
return True # User has access to both, permission check passed
|
||||
|
||||
def check_license(self, add_host=False, feature=None, check_expiration=True):
|
||||
validation_info = TaskEnhancer().validate_enhancements()
|
||||
if ('test' in sys.argv or 'py.test' in sys.argv[0] or 'jenkins' in sys.argv) and not os.environ.get('SKIP_LICENSE_FIXUP_FOR_TEST', ''):
|
||||
@ -461,9 +528,7 @@ class InventoryAccess(BaseAccess):
|
||||
if not data:
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
return self.user in org.admin_role
|
||||
return self.check_related('organization', Organization, data)
|
||||
|
||||
@check_superuser
|
||||
def can_change(self, obj, data):
|
||||
@ -472,14 +537,8 @@ class InventoryAccess(BaseAccess):
|
||||
@check_superuser
|
||||
def can_admin(self, obj, data):
|
||||
# Verify that the user has access to the new organization if moving an
|
||||
# inventory to a new organization.
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
if obj and org_pk and obj.organization.pk != org_pk:
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
if self.user not in org.admin_role:
|
||||
return False
|
||||
# Otherwise, just check for admin permission.
|
||||
return self.user in obj.admin_role
|
||||
# inventory to a new organization. Otherwise, just check for admin permission.
|
||||
return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role
|
||||
|
||||
def can_delete(self, obj):
|
||||
is_can_admin = self.can_admin(obj, None)
|
||||
@ -523,9 +582,7 @@ class HostAccess(BaseAccess):
|
||||
return Inventory.accessible_objects(self.user, 'admin_role').exists()
|
||||
|
||||
# Checks for admin or change permission on inventory.
|
||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||
if self.user not in inventory.admin_role:
|
||||
if not self.check_related('inventory', Inventory, data):
|
||||
return False
|
||||
|
||||
# Check to see if we have enough licenses
|
||||
@ -574,9 +631,7 @@ class GroupAccess(BaseAccess):
|
||||
if not data or 'inventory' not in data:
|
||||
return False
|
||||
# Checks for admin or change permission on inventory.
|
||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||
return self.user in inventory.admin_role
|
||||
return self.check_related('inventory', Inventory, data)
|
||||
|
||||
def can_change(self, obj, data):
|
||||
# Prevent moving a group to a different inventory.
|
||||
@ -795,11 +850,7 @@ class TeamAccess(BaseAccess):
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
if self.user in org.admin_role:
|
||||
return True
|
||||
return False
|
||||
return self.check_related('organization', Organization, data)
|
||||
|
||||
def can_change(self, obj, data):
|
||||
# Prevent moving a team to a different organization.
|
||||
@ -866,17 +917,12 @@ class ProjectAccess(BaseAccess):
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
organization_pk = get_pk_from_dict(data, 'organization')
|
||||
org = get_object_or_400(Organization, pk=organization_pk)
|
||||
return self.user in org.admin_role
|
||||
return self.check_related('organization', Organization, data, mandatory=True)
|
||||
|
||||
@check_superuser
|
||||
def can_change(self, obj, data):
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
if obj and org_pk and obj.organization and obj.organization.pk != org_pk:
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
if self.user not in org.admin_role:
|
||||
return False
|
||||
if not self.check_related('organization', Organization, data):
|
||||
return False
|
||||
return self.user in obj.admin_role
|
||||
|
||||
def can_delete(self, obj):
|
||||
@ -974,7 +1020,7 @@ class JobTemplateAccess(BaseAccess):
|
||||
Inventory.accessible_objects(self.user, 'use_role').exists())
|
||||
|
||||
# if reference_obj is provided, determine if it can be coppied
|
||||
reference_obj = data.pop('reference_obj', None)
|
||||
reference_obj = data.get('reference_obj', None)
|
||||
|
||||
if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN:
|
||||
self.check_license(feature='system_tracking')
|
||||
@ -996,22 +1042,16 @@ class JobTemplateAccess(BaseAccess):
|
||||
return None
|
||||
|
||||
# If a credential is provided, the user should have use access to it.
|
||||
credential = get_value(Credential, 'credential')
|
||||
if credential:
|
||||
if self.user not in credential.use_role:
|
||||
return False
|
||||
if not self.check_related('credential', Credential, data, role_field='use_role'):
|
||||
return False
|
||||
|
||||
# If a cloud credential is provided, the user should have use access.
|
||||
cloud_credential = get_value(Credential, 'cloud_credential')
|
||||
if cloud_credential:
|
||||
if self.user not in cloud_credential.use_role:
|
||||
return False
|
||||
if not self.check_related('cloud_credential', Credential, data, role_field='use_role'):
|
||||
return False
|
||||
|
||||
# If a network credential is provided, the user should have use access.
|
||||
network_credential = get_value(Credential, 'network_credential')
|
||||
if network_credential:
|
||||
if self.user not in network_credential.use_role:
|
||||
return False
|
||||
if not self.check_related('network_credential', Credential, data, role_field='use_role'):
|
||||
return False
|
||||
|
||||
# If an inventory is provided, the user should have use access.
|
||||
inventory = get_value(Inventory, 'inventory')
|
||||
@ -1327,28 +1367,17 @@ class WorkflowJobTemplateNodeAccess(BaseAccess):
|
||||
return qs
|
||||
|
||||
def can_use_prompted_resources(self, data):
|
||||
cred_pk = data.get('credential', None)
|
||||
inv_pk = data.get('inventory', None)
|
||||
if cred_pk:
|
||||
credential = get_object_or_400(Credential, pk=cred_pk)
|
||||
if self.user not in credential.use_role:
|
||||
return False
|
||||
if inv_pk:
|
||||
inventory = get_object_or_400(Inventory, pk=inv_pk)
|
||||
if self.user not in inventory.use_role:
|
||||
return False
|
||||
if not self.check_related('credential', Credential, data):
|
||||
return False
|
||||
if not self.check_related('inventory', Inventory, data):
|
||||
return False
|
||||
return True
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return True
|
||||
wfjt_pk = data.get('workflow_job_template', None)
|
||||
if wfjt_pk:
|
||||
wfjt = get_object_or_400(WorkflowJobTemplate, pk=wfjt_pk)
|
||||
if self.user not in wfjt.admin_role:
|
||||
return False
|
||||
else:
|
||||
if not self.check_related('workflow_job_template', WorkflowJobTemplate, data, mandatory=True):
|
||||
return False
|
||||
if not self.can_use_prompted_resources(data):
|
||||
return False
|
||||
@ -1451,6 +1480,7 @@ class WorkflowJobTemplateAccess(BaseAccess):
|
||||
def can_read(self, obj):
|
||||
return self.user in obj.read_role
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
'''
|
||||
a user can create a job template if they are a superuser, an org admin
|
||||
@ -1491,13 +1521,7 @@ class WorkflowJobTemplateAccess(BaseAccess):
|
||||
# if 'survey_enabled' in data and data['survey_enabled']:
|
||||
# self.check_license(feature='surveys')
|
||||
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
if not org_pk:
|
||||
# only superusers can create or manage orphan WFJTs
|
||||
return self.user.is_superuser
|
||||
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
return self.user in org.admin_role
|
||||
return self.check_related('organization', Organization, data)
|
||||
|
||||
def can_start(self, obj, validate_license=True):
|
||||
if validate_license:
|
||||
@ -1520,21 +1544,8 @@ class WorkflowJobTemplateAccess(BaseAccess):
|
||||
|
||||
if self.user.is_superuser:
|
||||
return True
|
||||
if data is None:
|
||||
return self.user in obj.admin_role
|
||||
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
if ('organization' not in data or
|
||||
(org_pk is None and obj.organization is None) or
|
||||
(obj.organization and obj.organization.pk == org_pk)):
|
||||
# No organization changes
|
||||
return self.user in obj.admin_role
|
||||
|
||||
# If it already has an organization set, must be admin of the org to change it
|
||||
if obj.organization and self.user not in obj.organization.admin_role:
|
||||
return False
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
return self.user in org.admin_role
|
||||
return self.check_related('organization', Organization, data, obj=obj) and self.user in obj.admin_role
|
||||
|
||||
def can_delete(self, obj):
|
||||
is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role
|
||||
@ -1616,19 +1627,13 @@ class AdHocCommandAccess(BaseAccess):
|
||||
self.check_license()
|
||||
|
||||
# If a credential is provided, the user should have use access to it.
|
||||
credential_pk = get_pk_from_dict(data, 'credential')
|
||||
if credential_pk:
|
||||
credential = get_object_or_400(Credential, pk=credential_pk)
|
||||
if self.user not in credential.use_role:
|
||||
return False
|
||||
if not self.check_related('credential', Credential, data, role_field='use_role'):
|
||||
return False
|
||||
|
||||
# Check that the user has the run ad hoc command permission on the
|
||||
# given inventory.
|
||||
inventory_pk = get_pk_from_dict(data, 'inventory')
|
||||
if inventory_pk:
|
||||
inventory = get_object_or_400(Inventory, pk=inventory_pk)
|
||||
if self.user not in inventory.adhoc_role:
|
||||
return False
|
||||
if not self.check_related('inventory', Inventory, data, role_field='adhoc_role'):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@ -1861,28 +1866,14 @@ class ScheduleAccess(BaseAccess):
|
||||
|
||||
@check_superuser
|
||||
def can_add(self, data):
|
||||
pk = get_pk_from_dict(data, 'unified_job_template')
|
||||
obj = get_object_or_400(UnifiedJobTemplate, pk=pk)
|
||||
if obj:
|
||||
return self.user.can_access(type(obj), 'change', obj, None)
|
||||
else:
|
||||
return False
|
||||
return self.check_related('unified_job_template', UnifiedJobTemplate, data, mandatory=True)
|
||||
|
||||
@check_superuser
|
||||
def can_change(self, obj, data):
|
||||
if obj and obj.unified_job_template:
|
||||
job_class = obj.unified_job_template
|
||||
return self.user.can_access(type(job_class), 'change', job_class, None)
|
||||
else:
|
||||
return False
|
||||
return self.check_related('unified_job_template', UnifiedJobTemplate, data, obj=obj, mandatory=True)
|
||||
|
||||
@check_superuser
|
||||
def can_delete(self, obj):
|
||||
if obj and obj.unified_job_template:
|
||||
job_class = obj.unified_job_template
|
||||
return self.user.can_access(type(job_class), 'change', job_class, None)
|
||||
else:
|
||||
return False
|
||||
return self.can_change(obj, {})
|
||||
|
||||
class NotificationTemplateAccess(BaseAccess):
|
||||
'''
|
||||
@ -1911,21 +1902,14 @@ class NotificationTemplateAccess(BaseAccess):
|
||||
def can_add(self, data):
|
||||
if not data:
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
return self.user in org.admin_role
|
||||
return self.check_related('organization', Organization, data, mandatory=True)
|
||||
|
||||
@check_superuser
|
||||
def can_change(self, obj, data):
|
||||
if obj.organization is None:
|
||||
# only superusers are allowed to edit orphan notification templates
|
||||
return False
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
if obj and org_pk and obj.organization.pk != org_pk:
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
if self.user not in org.admin_role:
|
||||
return False
|
||||
return self.user in obj.organization.admin_role
|
||||
return self.check_related('organization', Organization, data, obj=obj, mandatory=True)
|
||||
|
||||
def can_admin(self, obj, data):
|
||||
return self.can_change(obj, data)
|
||||
@ -1981,17 +1965,13 @@ class LabelAccess(BaseAccess):
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return True
|
||||
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
return self.user in org.member_role
|
||||
return self.check_related('organization', Organization, data, role_field='member_role', mandatory=True)
|
||||
|
||||
@check_superuser
|
||||
def can_change(self, obj, data):
|
||||
if self.can_add(data) is False:
|
||||
return False
|
||||
|
||||
return self.user in obj.organization.admin_role
|
||||
return self.check_related('organization', Organization, data, obj=obj, mandatory=True)
|
||||
|
||||
def can_delete(self, obj):
|
||||
return self.can_change(obj, None)
|
||||
@ -2087,18 +2067,11 @@ class CustomInventoryScriptAccess(BaseAccess):
|
||||
def can_add(self, data):
|
||||
if not data: # So the browseable API will work
|
||||
return Organization.accessible_objects(self.user, 'admin_role').exists()
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
return self.user in org.admin_role
|
||||
return self.check_related('organization', Organization, data)
|
||||
|
||||
@check_superuser
|
||||
def can_admin(self, obj, data=None):
|
||||
org_pk = get_pk_from_dict(data, 'organization')
|
||||
if obj and org_pk and obj.organization and obj.organization.pk != org_pk:
|
||||
org = get_object_or_400(Organization, pk=org_pk)
|
||||
if self.user not in org.admin_role:
|
||||
return False
|
||||
return self.user in obj.admin_role
|
||||
return self.check_related('organization', Organization, data, obj=obj)
|
||||
|
||||
@check_superuser
|
||||
def can_change(self, obj, data):
|
||||
|
||||
@ -121,7 +121,7 @@ def test_create_project_null_organization(post, organization, admin):
|
||||
|
||||
@pytest.mark.django_db()
|
||||
def test_create_project_null_organization_xfail(post, organization, org_admin):
|
||||
post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=400)
|
||||
post(reverse('api:project_list'), { 'name': 't', 'organization': None}, org_admin, expect=403)
|
||||
|
||||
@pytest.mark.django_db()
|
||||
def test_patch_project_null_organization(patch, organization, project, admin):
|
||||
|
||||
@ -4,10 +4,12 @@ import pytest
|
||||
from awx.main.access import (
|
||||
BaseAccess,
|
||||
JobTemplateAccess,
|
||||
ScheduleAccess
|
||||
)
|
||||
from awx.main.migrations import _rbac as rbac
|
||||
from awx.main.models import Permission
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
from awx.main.models.schedules import Schedule
|
||||
from django.apps import apps
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
@ -247,3 +249,23 @@ def test_associate_label(label, user, job_template):
|
||||
job_template.admin_role.members.add(user('joe', False))
|
||||
label.organization.read_role.members.add(user('joe', False))
|
||||
assert access.can_attach(job_template, label, 'labels', None)
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_move_schedule_to_JT_no_access(job_template, rando):
|
||||
schedule = Schedule.objects.create(
|
||||
unified_job_template=job_template,
|
||||
rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1')
|
||||
job_template.admin_role.members.add(rando)
|
||||
jt2 = JobTemplate.objects.create(name="other-jt")
|
||||
access = ScheduleAccess(rando)
|
||||
assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk))
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_move_schedule_from_JT_no_access(job_template, rando):
|
||||
schedule = Schedule.objects.create(
|
||||
unified_job_template=job_template,
|
||||
rrule='DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1')
|
||||
jt2 = JobTemplate.objects.create(name="other-jt")
|
||||
jt2.admin_role.members.add(rando)
|
||||
access = ScheduleAccess(rando)
|
||||
assert not access.can_change(schedule, data=dict(unified_job_template=jt2.pk))
|
||||
|
||||
@ -4,8 +4,6 @@ from awx.main.access import (
|
||||
LabelAccess,
|
||||
)
|
||||
|
||||
from rest_framework.exceptions import ParseError
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_label_get_queryset_user(label, user):
|
||||
access = LabelAccess(user('user', False))
|
||||
@ -54,8 +52,7 @@ def test_label_access_user(label, user):
|
||||
access = LabelAccess(user('user', False))
|
||||
label.organization.member_role.members.add(user('user', False))
|
||||
|
||||
with pytest.raises(ParseError):
|
||||
access.can_add({'organization': None})
|
||||
assert not access.can_add({'organization': None})
|
||||
assert not access.can_change(label, None)
|
||||
assert not access.can_delete(label)
|
||||
|
||||
|
||||
@ -3,6 +3,7 @@ import mock
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.forms.models import model_to_dict
|
||||
from rest_framework.exceptions import ParseError
|
||||
|
||||
from awx.main.access import (
|
||||
BaseAccess,
|
||||
@ -16,6 +17,96 @@ from awx.conf.license import LicenseForbids
|
||||
from awx.main.models import Credential, Inventory, Project, Role, Organization, Instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user_unit():
|
||||
return User(username='rando', password='raginrando', email='rando@redhat.com')
|
||||
|
||||
class TestRelatedFieldAccess:
|
||||
|
||||
@pytest.fixture
|
||||
def resource_good(self, mocker):
|
||||
good_role = mocker.MagicMock(__contains__=lambda self, user: True)
|
||||
return mocker.MagicMock(related=mocker.MagicMock(admin_role=good_role),
|
||||
admin_role=good_role)
|
||||
|
||||
@pytest.fixture
|
||||
def resource_bad(self, mocker):
|
||||
bad_role = mocker.MagicMock(__contains__=lambda self, user: False)
|
||||
return mocker.MagicMock(related=mocker.MagicMock(admin_role=bad_role),
|
||||
admin_role=bad_role)
|
||||
|
||||
@pytest.fixture
|
||||
def access(self, user_unit):
|
||||
return BaseAccess(user_unit)
|
||||
|
||||
def test_new_optional_fail(self, access, resource_bad, mocker):
|
||||
"""
|
||||
User tries to create a new resource, but lacks permission
|
||||
to the related resource they provided
|
||||
"""
|
||||
data = {'related': resource_bad}
|
||||
assert not access.check_related('related', mocker.MagicMock, data)
|
||||
|
||||
def test_new_with_bad_data(self, access, mocker):
|
||||
data = {'related': 3.1415}
|
||||
with pytest.raises(ParseError):
|
||||
access.check_related('related', mocker.MagicMock, data)
|
||||
|
||||
def test_new_mandatory_fail(self, access, mocker):
|
||||
access.user.is_superuser = False
|
||||
assert not access.check_related(
|
||||
'related', mocker.MagicMock, {}, mandatory=True)
|
||||
assert not access.check_related(
|
||||
'related', mocker.MagicMock, {'resource': None}, mandatory=True)
|
||||
|
||||
def test_existing_no_op(self, access, resource_bad, mocker):
|
||||
"""
|
||||
User edits a resource, but does not change related field
|
||||
lack of access to related field does not block action
|
||||
"""
|
||||
data = {'related': resource_bad.related}
|
||||
assert access.check_related(
|
||||
'related', mocker.MagicMock, data, obj=resource_bad)
|
||||
assert access.check_related(
|
||||
'related', mocker.MagicMock, {}, obj=resource_bad)
|
||||
|
||||
def test_existing_required_access(self, access, resource_bad, mocker):
|
||||
# no-op actions, but mandatory kwarg requires check to pass
|
||||
assert not access.check_related(
|
||||
'related', mocker.MagicMock, {}, obj=resource_bad, mandatory=True)
|
||||
assert not access.check_related(
|
||||
'related', mocker.MagicMock, {'related': resource_bad.related},
|
||||
obj=resource_bad, mandatory=True)
|
||||
|
||||
def test_existing_no_access_to_current(
|
||||
self, access, resource_good, resource_bad, mocker):
|
||||
"""
|
||||
User gives a valid related resource (like organization), but does
|
||||
not have access to _existing_ related resource, so deny action
|
||||
"""
|
||||
data = {'related': resource_good}
|
||||
assert not access.check_related(
|
||||
'related', mocker.MagicMock, data, obj=resource_bad)
|
||||
|
||||
def test_existing_no_access_to_new(
|
||||
self, access, resource_good, resource_bad, mocker):
|
||||
data = {'related': resource_bad}
|
||||
assert not access.check_related(
|
||||
'related', mocker.MagicMock, data, obj=resource_good)
|
||||
|
||||
def test_existing_not_allowed_to_remove(self, access, resource_bad, mocker):
|
||||
data = {'related': None}
|
||||
assert not access.check_related(
|
||||
'related', mocker.MagicMock, data, obj=resource_bad)
|
||||
|
||||
def test_existing_not_null_null(self, access, mocker):
|
||||
resource = mocker.MagicMock(related=None)
|
||||
data = {'related': None}
|
||||
# Not changing anything by giving null when it is already-null
|
||||
# important for PUT requests
|
||||
assert access.check_related(
|
||||
'related', mocker.MagicMock, data, obj=resource, mandatory=True)
|
||||
|
||||
@pytest.fixture
|
||||
def job_template_with_ids(job_template_factory):
|
||||
# Create non-persisted objects with IDs to send to job_template_factory
|
||||
@ -31,10 +122,6 @@ def job_template_with_ids(job_template_factory):
|
||||
persisted=False)
|
||||
return jt_objects.job_template
|
||||
|
||||
@pytest.fixture
|
||||
def user_unit():
|
||||
return User(username='rando', password='raginrando', email='rando@redhat.com')
|
||||
|
||||
def test_superuser(mocker):
|
||||
user = mocker.MagicMock(spec=User, id=1, is_superuser=True)
|
||||
access = BaseAccess(user)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user