Merge pull request #3505 from AlanCoding/can_CRUD

Copy/edit rework
This commit is contained in:
Alan Rominger
2016-09-22 11:30:10 -04:00
committed by GitHub
77 changed files with 1579 additions and 451 deletions

View File

@@ -116,6 +116,18 @@ def check_user_access(user, model_class, action, *args, **kwargs):
return result
return False
def get_user_capabilities(user, instance, **kwargs):
'''
Returns a dictionary of capabilities the user has on the particular
instance. *NOTE* This is not a direct mapping of can_* methods into this
dictionary, it is intended to munge some queries in a way that is
convenient for the user interface to consume and hide or show various
actions in the interface.
'''
for access_class in access_registry.get(type(instance), []):
return access_class(user).get_user_capabilities(instance, **kwargs)
return None
def check_superuser(func):
'''
check_superuser is a decorator that provides a simple short circuit
@@ -207,6 +219,78 @@ class BaseAccess(object):
elif "features" not in validation_info:
raise LicenseForbids("Features not found in active license.")
def get_user_capabilities(self, obj, method_list=[], parent_obj=None):
if obj is None:
return {}
user_capabilities = {}
# Custom ordering to loop through methods so we can reuse earlier calcs
for display_method in ['edit', 'delete', 'start', 'schedule', 'copy', 'adhoc', 'unattach']:
if display_method not in method_list:
continue
# Validation consistency checks
if display_method == 'copy' and isinstance(obj, JobTemplate):
validation_errors, resources_needed_to_start = obj.resource_validation_data()
if validation_errors:
user_capabilities[display_method] = False
continue
elif display_method == 'start' and isinstance(obj, Group):
if obj.inventory_source and not obj.inventory_source._can_update():
user_capabilities[display_method] = False
continue
# Grab the answer from the cache, if available
if hasattr(obj, 'capabilities_cache') and display_method in obj.capabilities_cache:
user_capabilities[display_method] = obj.capabilities_cache[display_method]
continue
# Aliases for going form UI language to API language
if display_method == 'edit':
method = 'change'
elif display_method == 'copy':
method = 'add'
elif display_method == 'adhoc':
method = 'run_ad_hoc_commands'
else:
method = display_method
# Shortcuts in certain cases by deferring to earlier property
if display_method == 'schedule':
user_capabilities['schedule'] = user_capabilities['edit']
continue
elif display_method == 'delete' and not isinstance(obj, (User, UnifiedJob)):
user_capabilities['delete'] = user_capabilities['edit']
continue
elif display_method == 'copy' and isinstance(obj, (Group, Host)):
user_capabilities['copy'] = user_capabilities['edit']
continue
# Preprocessing before the access method is called
data = {}
if method == 'add':
if isinstance(obj, JobTemplate):
data['reference_obj'] = obj
# Compute permission
access_method = getattr(self, "can_%s" % method)
if method in ['change']: # 3 args
user_capabilities[display_method] = access_method(obj, data)
elif method in ['delete', 'start', 'run_ad_hoc_commands']: # 2 args
user_capabilities[display_method] = access_method(obj)
elif method in ['add']: # 2 args with data
user_capabilities[display_method] = access_method(data)
elif method in ['attach', 'unattach']: # parent/sub-object call
if type(parent_obj) == Team:
relationship = 'parents'
parent_obj = parent_obj.member_role
else:
relationship = 'members'
user_capabilities[display_method] = access_method(
obj, parent_obj, relationship, skip_sub_obj_read_check=True, data=data)
return user_capabilities
class UserAccess(BaseAccess):
'''
@@ -526,6 +610,12 @@ class GroupAccess(BaseAccess):
"active_jobs": active_jobs})
return True
def can_start(self, obj):
# Used as another alias to inventory_source start access for user_capabilities
if obj and obj.inventory_source:
return self.user.can_access(InventorySource, 'start', obj.inventory_source)
return False
class InventorySourceAccess(BaseAccess):
'''
I can see inventory sources whenever I can see their group or inventory.
@@ -594,6 +684,13 @@ class InventoryUpdateAccess(BaseAccess):
# Inventory cascade deletes to inventory update, descends from org admin
return self.user in obj.inventory_source.inventory.admin_role
def can_start(self, obj):
# For relaunching
if obj and obj.inventory_source:
access = InventorySourceAccess(self.user)
return access.can_start(obj.inventory_source)
return False
@check_superuser
def can_delete(self, obj):
return self.user in obj.inventory_source.inventory.admin_role
@@ -815,6 +912,12 @@ class ProjectUpdateAccess(BaseAccess):
# Project updates cascade delete with project, admin role descends from org admin
return self.user in obj.project.admin_role
def can_start(self, obj):
# for relaunching
if obj and obj.project:
return self.user in obj.project.update_role
return False
@check_superuser
def can_delete(self, obj):
return obj and self.user in obj.project.admin_role
@@ -855,7 +958,9 @@ class JobTemplateAccess(BaseAccess):
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
'''
if not data: # So the browseable API will work
return True
return (
Project.accessible_objects(self.user, 'use_role').exists() or
Inventory.accessible_objects(self.user, 'use_role').exists())
# if reference_obj is provided, determine if it can be coppied
reference_obj = data.pop('reference_obj', None)
@@ -1132,6 +1237,9 @@ class SystemJobAccess(BaseAccess):
'''
model = SystemJob
def can_start(self, obj):
return False # no relaunching of system jobs
# TODO:
class WorkflowJobTemplateNodeAccess(BaseAccess):
'''
@@ -1855,8 +1963,13 @@ class RoleAccess(BaseAccess):
@check_superuser
def can_unattach(self, obj, sub_obj, relationship, data=None, skip_sub_obj_read_check=False):
if not skip_sub_obj_read_check and relationship in ['members', 'member_role.parents']:
if not check_user_access(self.user, sub_obj.__class__, 'read', sub_obj):
if not skip_sub_obj_read_check and relationship in ['members', 'member_role.parents', 'parents']:
# If we are unattaching a team Role, check the Team read access
if relationship == 'parents':
sub_obj_resource = sub_obj.content_object
else:
sub_obj_resource = sub_obj
if not check_user_access(self.user, sub_obj_resource.__class__, 'read', sub_obj_resource):
return False
if isinstance(obj.content_object, ResourceMixin) and \

View File

@@ -3,12 +3,11 @@ import mock
# AWX
from awx.api.serializers import JobTemplateSerializer, JobLaunchSerializer
from awx.main.models.jobs import JobTemplate, Job
from awx.main.models.jobs import Job
from awx.main.models.projects import ProjectOptions
from awx.main.migrations import _save_password_keys as save_password_keys
# Django
from django.test.client import RequestFactory
from django.core.urlresolvers import reverse
from django.apps import apps
@@ -141,131 +140,7 @@ def test_job_template_role_user(post, organization_factory, job_template_factory
response = post(url, dict(id=jt_objects.job_template.execute_role.pk), objects.superusers.admin)
assert response.status_code == 204
# Test protection against limited set of validation problems
@pytest.mark.django_db
def test_bad_data_copy_edit(admin_user, project):
"""
If a required resource (inventory here) was deleted, copying not allowed
because doing so would caues a validation error
"""
jt_res = JobTemplate.objects.create(
job_type='run',
project=project,
inventory=None, ask_inventory_on_launch=False, # not allowed
credential=None, ask_credential_on_launch=True,
name='deploy-job-template'
)
serializer = JobTemplateSerializer(jt_res)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = admin_user
serializer.context['request'] = request
response = serializer.to_representation(jt_res)
assert not response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
# Tests for correspondence between view info and actual access
@pytest.mark.django_db
def test_admin_copy_edit(jt_copy_edit, admin_user):
"Absent a validation error, system admins can do everything"
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = admin_user
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_org_admin_copy_edit(jt_copy_edit, org_admin):
"Organization admins SHOULD be able to copy a JT firmly in their org"
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = org_admin
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_org_admin_foreign_cred_no_copy_edit(jt_copy_edit, org_admin, machine_credential):
"""
Organization admins without access to the 3 related resources:
SHOULD NOT be able to copy JT
SHOULD be able to edit that job template, for nonsensitive changes
"""
# Attach credential to JT that org admin can not use
jt_copy_edit.credential = machine_credential
jt_copy_edit.save()
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = org_admin
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_jt_admin_copy_edit(jt_copy_edit, rando):
"""
JT admins wihout access to associated resources SHOULD NOT be able to copy
SHOULD be able to make nonsensitive changes"""
# random user given JT admin access only
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = rando
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_proj_jt_admin_copy_edit(jt_copy_edit, rando):
"JT admins with access to associated resources SHOULD be able to copy"
# random user given JT and project admin abilities
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
jt_copy_edit.project.admin_role.members.add(rando)
jt_copy_edit.project.save()
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = rando
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
# Functional tests - create new JT with all returned fields, as the UI does
@pytest.mark.django_db
@mock.patch.object(ProjectOptions, "playbooks", project_playbooks)
def test_org_admin_copy_edit_functional(jt_copy_edit, org_admin, get, post):
get_response = get(reverse('api:job_template_detail', args=[jt_copy_edit.pk]), user=org_admin)
assert get_response.status_code == 200
assert get_response.data['summary_fields']['can_copy']
post_data = get_response.data
post_data['name'] = '%s @ 12:19:47 pm' % post_data['name']
post_response = post(reverse('api:job_template_list', args=[]), user=org_admin, data=post_data)
assert post_response.status_code == 201
assert post_response.data['name'] == 'copy-edit-job-template @ 12:19:47 pm'
@pytest.mark.django_db
@mock.patch.object(ProjectOptions, "playbooks", project_playbooks)
@@ -277,7 +152,6 @@ def test_jt_admin_copy_edit_functional(jt_copy_edit, rando, get, post):
get_response = get(reverse('api:job_template_detail', args=[jt_copy_edit.pk]), user=rando)
assert get_response.status_code == 200
assert not get_response.data['summary_fields']['can_copy']
post_data = get_response.data
post_data['name'] = '%s @ 12:19:47 pm' % post_data['name']

View File

@@ -0,0 +1,323 @@
import pytest
from django.core.urlresolvers import reverse
from django.test.client import RequestFactory
from awx.main.models.jobs import JobTemplate
from awx.main.models import Role, Group
from awx.main.access import (
access_registry,
get_user_capabilities
)
from awx.main.utils import cache_list_capabilities
from awx.api.serializers import JobTemplateSerializer
# This file covers special-cases of displays of user_capabilities
# general functionality should be covered fully by unit tests, see:
# awx/main/tests/unit/api/test_serializers.py ::
# TestJobTemplateSerializerGetSummaryFields.test_copy_edit_standard
# awx/main/tests/unit/test_access.py ::
# test_user_capabilities_method
@pytest.mark.django_db
class TestOptionsRBAC:
"""
Several endpoints are relied-upon by the UI to list POST as an
allowed action or not depending on whether the user has permission
to create a resource.
"""
def test_inventory_group_host_can_add(self, inventory, alice, options):
inventory.admin_role.members.add(alice)
response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), alice)
assert 'POST' in response.data['actions']
response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), alice)
assert 'POST' in response.data['actions']
def test_inventory_group_host_can_not_add(self, inventory, bob, options):
inventory.read_role.members.add(bob)
response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), bob)
assert 'POST' not in response.data['actions']
response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), bob)
assert 'POST' not in response.data['actions']
def test_user_list_can_add(self, org_member, org_admin, options):
response = options(reverse('api:user_list'), org_admin)
assert 'POST' in response.data['actions']
def test_user_list_can_not_add(self, org_member, org_admin, options):
response = options(reverse('api:user_list'), org_member)
assert 'POST' not in response.data['actions']
@pytest.mark.django_db
class TestJobTemplateCopyEdit:
"""
Tests contain scenarios that were raised as issues in the past,
which resulted from failed copy/edit actions even though the buttons
to do these actions were displayed.
"""
@pytest.fixture
def jt_copy_edit(self, job_template_factory, project):
objects = job_template_factory(
'copy-edit-job-template',
project=project)
return objects.job_template
def fake_context(self, user):
request = RequestFactory().get('/api/v1/resource/42/')
request.user = user
class FakeView(object):
pass
fake_view = FakeView()
fake_view.request = request
context = {}
context['view'] = fake_view
context['request'] = request
return context
def test_validation_bad_data_copy_edit(self, admin_user, project):
"""
If a required resource (inventory here) was deleted, copying not allowed
because doing so would caues a validation error
"""
jt_res = JobTemplate.objects.create(
job_type='run',
project=project,
inventory=None, ask_inventory_on_launch=False, # not allowed
credential=None, ask_credential_on_launch=True,
name='deploy-job-template'
)
serializer = JobTemplateSerializer(jt_res)
serializer.context = self.fake_context(admin_user)
response = serializer.to_representation(jt_res)
assert not response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
def test_sys_admin_copy_edit(self, jt_copy_edit, admin_user):
"Absent a validation error, system admins can do everything"
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = self.fake_context(admin_user)
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
def test_org_admin_copy_edit(self, jt_copy_edit, org_admin):
"Organization admins SHOULD be able to copy a JT firmly in their org"
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = self.fake_context(org_admin)
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
def test_org_admin_foreign_cred_no_copy_edit(self, jt_copy_edit, org_admin, machine_credential):
"""
Organization admins without access to the 3 related resources:
SHOULD NOT be able to copy JT
SHOULD be able to edit that job template, for nonsensitive changes
"""
# Attach credential to JT that org admin can not use
jt_copy_edit.credential = machine_credential
jt_copy_edit.save()
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = self.fake_context(org_admin)
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
def test_jt_admin_copy_edit(self, jt_copy_edit, rando):
"""
JT admins wihout access to associated resources SHOULD NOT be able to copy
SHOULD be able to make nonsensitive changes"""
# random user given JT admin access only
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = self.fake_context(rando)
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
def test_proj_jt_admin_copy_edit(self, jt_copy_edit, rando):
"JT admins with access to associated resources SHOULD be able to copy"
# random user given JT and project admin abilities
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
jt_copy_edit.project.admin_role.members.add(rando)
jt_copy_edit.project.save()
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = self.fake_context(rando)
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
@pytest.fixture
def mock_access_method(mocker):
mock_method = mocker.MagicMock()
mock_method.return_value = 'foobar'
mock_method.__name__ = 'bars' # Required for a logging statement
return mock_method
@pytest.mark.django_db
class TestAccessListCapabilities:
"""
Test that the access_list serializer shows the exact output of the RoleAccess.can_attach
- looks at /api/v1/inventories/N/access_list/
- test for types: direct, indirect, and team access
"""
extra_kwargs = dict(skip_sub_obj_read_check=False, data={})
def _assert_one_in_list(self, data, sublist='direct_access'):
"Establish that exactly 1 type of access exists so we know the entry is the right one"
assert len(data['results']) == 1
assert len(data['results'][0]['summary_fields'][sublist]) == 1
def test_access_list_direct_access_capability(
self, inventory, rando, get, mocker, mock_access_method):
inventory.admin_role.members.add(rando)
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:inventory_access_list', args=(inventory.id,)), rando)
mock_access_method.assert_called_once_with(inventory.admin_role, rando, 'members', **self.extra_kwargs)
self._assert_one_in_list(response.data)
direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
def test_access_list_indirect_access_capability(
self, inventory, organization, org_admin, get, mocker, mock_access_method):
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:inventory_access_list', args=(inventory.id,)), org_admin)
mock_access_method.assert_called_once_with(organization.admin_role, org_admin, 'members', **self.extra_kwargs)
self._assert_one_in_list(response.data, sublist='indirect_access')
indirect_access_list = response.data['results'][0]['summary_fields']['indirect_access']
assert indirect_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
def test_access_list_team_direct_access_capability(
self, inventory, team, team_member, get, mocker, mock_access_method):
team.member_role.children.add(inventory.admin_role)
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:inventory_access_list', args=(inventory.id,)), team_member)
mock_access_method.assert_called_once_with(inventory.admin_role, team.member_role, 'parents', **self.extra_kwargs)
self._assert_one_in_list(response.data)
direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
@pytest.mark.django_db
def test_team_roles_unattach(mocker, team, team_member, inventory, mock_access_method, get):
team.member_role.children.add(inventory.admin_role)
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:team_roles_list', args=(team.id,)), team_member)
# Did we assess whether team_member can remove team's permission to the inventory?
mock_access_method.assert_called_once_with(
inventory.admin_role, team.member_role, 'parents', skip_sub_obj_read_check=True, data={})
assert response.data['results'][0]['summary_fields']['user_capabilities']['unattach'] == 'foobar'
@pytest.mark.django_db
def test_user_roles_unattach(mocker, organization, alice, bob, mock_access_method, get):
# Add to same organization so that alice and bob can see each other
organization.member_role.members.add(alice)
organization.member_role.members.add(bob)
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:user_roles_list', args=(alice.id,)), bob)
# Did we assess whether bob can remove alice's permission to the inventory?
mock_access_method.assert_called_once_with(
organization.member_role, alice, 'members', skip_sub_obj_read_check=True, data={})
assert response.data['results'][0]['summary_fields']['user_capabilities']['unattach'] == 'foobar'
@pytest.mark.django_db
def test_team_roles_unattach_functional(team, team_member, inventory, get):
team.member_role.children.add(inventory.admin_role)
response = get(reverse('api:team_roles_list', args=(team.id,)), team_member)
# Team member should be able to remove access to inventory, becauase
# the inventory admin_role grants that ability
assert response.data['results'][0]['summary_fields']['user_capabilities']['unattach']
@pytest.mark.django_db
def test_user_roles_unattach_functional(organization, alice, bob, get):
organization.member_role.members.add(alice)
organization.member_role.members.add(bob)
response = get(reverse('api:user_roles_list', args=(alice.id,)), bob)
# Org members can not revoke the membership of other members
assert not response.data['results'][0]['summary_fields']['user_capabilities']['unattach']
@pytest.mark.django_db
def test_prefetch_jt_capabilities(job_template, rando):
job_template.execute_role.members.add(rando)
qs = JobTemplate.objects.all()
cache_list_capabilities(qs, ['admin', 'execute'], JobTemplate, rando)
assert qs[0].capabilities_cache == {'edit': False, 'start': True}
@pytest.mark.django_db
def test_prefetch_group_capabilities(group, rando):
group.inventory.adhoc_role.members.add(rando)
qs = Group.objects.all()
cache_list_capabilities(qs, ['inventory.admin', 'inventory.adhoc'], Group, rando)
assert qs[0].capabilities_cache == {'edit': False, 'adhoc': True}
@pytest.mark.django_db
def test_prefetch_jt_copy_capability(job_template, project, inventory, machine_credential, rando):
job_template.project = project
job_template.inventory = inventory
job_template.credential = machine_credential
job_template.save()
qs = JobTemplate.objects.all()
cache_list_capabilities(qs, [{'copy': [
'project.use', 'inventory.use', 'credential.use',
'cloud_credential.use', 'network_credential.use'
]}], JobTemplate, rando)
assert qs[0].capabilities_cache == {'copy': False}
project.use_role.members.add(rando)
inventory.use_role.members.add(rando)
machine_credential.use_role.members.add(rando)
cache_list_capabilities(qs, [{'copy': [
'project.use', 'inventory.use', 'credential.use',
'cloud_credential.use', 'network_credential.use'
]}], JobTemplate, rando)
assert qs[0].capabilities_cache == {'copy': True}
@pytest.mark.django_db
def test_group_update_capabilities_possible(group, inventory_source, admin_user):
group.inventory_source = inventory_source
group.save()
capabilities = get_user_capabilities(admin_user, group, method_list=['start'])
assert capabilities['start']
@pytest.mark.django_db
def test_group_update_capabilities_impossible(group, inventory_source, admin_user):
inventory_source.source = ""
inventory_source.save()
group.inventory_source = inventory_source
group.save()
capabilities = get_user_capabilities(admin_user, group, method_list=['start'])
assert not capabilities['start']

View File

@@ -6,9 +6,13 @@ import mock
from awx.api.serializers import (
JobTemplateSerializer,
)
from awx.api.views import JobTemplateDetail
from awx.main.models import (
Role,
User,
Job,
)
from rest_framework.test import APIRequestFactory
#DRF
from rest_framework import serializers
@@ -77,21 +81,33 @@ class TestJobTemplateSerializerGetSummaryFields():
summary = get_summary_fields_mock_and_run(JobTemplateSerializer, job_template)
assert 'survey' not in summary
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_copy_true(self, mocker, job_template):
pass
def test_copy_edit_standard(self, mocker, job_template_factory):
"""Verify that the exact output of the access.py methods
are put into the serializer user_capabilities"""
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_copy_false(self, mocker, job_template):
pass
jt_obj = job_template_factory('testJT', project='proj1', persisted=False).job_template
jt_obj.id = 5
jt_obj.admin_role = Role(id=9, role_field='admin_role')
jt_obj.execute_role = Role(id=8, role_field='execute_role')
jt_obj.read_role = Role(id=7, role_field='execute_role')
user = User(username="auser")
serializer = JobTemplateSerializer(job_template)
serializer.show_capabilities = ['copy', 'edit']
serializer._summary_field_labels = lambda self: []
serializer._recent_jobs = lambda self: []
request = APIRequestFactory().get('/api/v1/job_templates/42/')
request.user = user
view = JobTemplateDetail()
view.request = request
serializer.context['view'] = view
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_edit_true(self, mocker, job_template):
pass
with mocker.patch("awx.main.models.rbac.Role.get_description", return_value='Can eat pie'):
with mocker.patch("awx.main.access.JobTemplateAccess.can_change", return_value='foobar'):
with mocker.patch("awx.main.access.JobTemplateAccess.can_add", return_value='foo'):
response = serializer.get_summary_fields(jt_obj)
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_edit_false(self, mocker, job_template):
pass
assert response['user_capabilities']['copy'] == 'foo'
assert response['user_capabilities']['edit'] == 'foobar'
class TestJobTemplateSerializerValidation(object):

View File

@@ -110,3 +110,27 @@ def test_jt_can_add_bad_data(user_unit):
access = JobTemplateAccess(user_unit)
assert not access.can_add({'asdf': 'asdf'})
@pytest.mark.django_db
def test_user_capabilities_method():
"""Unit test to verify that the user_capabilities method will defer
to the appropriate sub-class methods of the access classes.
Note that normal output is True/False, but a string is returned
in these tests to establish uniqueness.
"""
class FooAccess(BaseAccess):
def can_change(self, obj, data):
return 'bar'
def can_add(self, data):
return 'foobar'
user = User(username='auser')
foo_access = FooAccess(user)
foo = object()
foo_capabilities = foo_access.get_user_capabilities(foo, ['edit', 'copy'])
assert foo_capabilities == {
'edit': 'bar',
'copy': 'foobar'
}

View File

@@ -33,7 +33,7 @@ logger = logging.getLogger('awx.main.utils')
__all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', 'memoize',
'get_ansible_version', 'get_ssh_version', 'get_awx_version', 'update_scm_url',
'get_type_for_model', 'get_model_for_type', 'to_python_boolean',
'get_type_for_model', 'get_model_for_type', 'cache_list_capabilities', 'to_python_boolean',
'ignore_inventory_computed_fields', 'ignore_inventory_group_removal',
'_inventory_updates', 'get_pk_from_dict', 'getattrd', 'NoDefaultProvided',
'get_current_apps', 'set_current_apps']
@@ -409,6 +409,72 @@ def get_model_for_type(type):
return ct_model
def cache_list_capabilities(page, prefetch_list, model, user):
'''
Given a `page` list of objects, the specified roles for the specified user
are save on each object in the list, using 1 query for each role type
Examples:
capabilities_prefetch = ['admin', 'execute']
--> prefetch the admin (edit) and execute (start) permissions for
items in list for current user
capabilities_prefetch = ['inventory.admin']
--> prefetch the related inventory FK permissions for current user,
and put it into the object's cache
capabilities_prefetch = [{'copy': ['inventory.admin', 'project.admin']}]
--> prefetch logical combination of admin permission to inventory AND
project, put into cache dictionary as "copy"
'''
from django.db.models import Q
page_ids = [obj.id for obj in page]
for obj in page:
obj.capabilities_cache = {}
for prefetch_entry in prefetch_list:
display_method = None
if type(prefetch_entry) is dict:
display_method = prefetch_entry.keys()[0]
paths = prefetch_entry[display_method]
else:
paths = prefetch_entry
if type(paths) is not list:
paths = [paths]
# Build the query for accessible_objects according the user & role(s)
qs_obj = None
for role_path in paths:
if '.' in role_path:
res_path = '__'.join(role_path.split('.')[:-1])
role_type = role_path.split('.')[-1]
if qs_obj is None:
qs_obj = model.objects
parent_model = model._meta.get_field(res_path).related_model
kwargs = {'%s__in' % res_path: parent_model.accessible_objects(user, '%s_role' % role_type)}
qs_obj = qs_obj.filter(Q(**kwargs) | Q(**{'%s__isnull' % res_path: True}))
else:
role_type = role_path
qs_obj = model.accessible_objects(user, '%s_role' % role_type)
if display_method is None:
# Role name translation to UI names for methods
display_method = role_type
if role_type == 'admin':
display_method = 'edit'
elif role_type in ['execute', 'update']:
display_method = 'start'
# Union that query with the list of items on page
ids_with_role = set(qs_obj.filter(pk__in=page_ids).values_list('pk', flat=True))
# Save data item-by-item
for obj in page:
obj.capabilities_cache[display_method] = False
if obj.pk in ids_with_role:
obj.capabilities_cache[display_method] = True
def get_system_task_capacity():
'''
Measure system memory and use it as a baseline for determining the system's capacity