RBAC copy/edit displays test refactor

This commit is contained in:
AlanCoding
2016-09-07 17:08:07 -04:00
parent 1ca7ce1bd4
commit 9da00c2d38
8 changed files with 278 additions and 186 deletions

View File

@@ -1539,6 +1539,7 @@ class RoleSerializer(BaseSerializer):
class ResourceAccessListElementSerializer(UserSerializer): class ResourceAccessListElementSerializer(UserSerializer):
show_capabilities = [] # Clear fields from UserSerializer parent class
def to_representation(self, user): def to_representation(self, user):
''' '''
@@ -1564,14 +1565,12 @@ class ResourceAccessListElementSerializer(UserSerializer):
def format_role_perm(role): def format_role_perm(role):
role_dict = { 'id': role.id, 'name': role.name, 'description': role.description} role_dict = { 'id': role.id, 'name': role.name, 'description': role.description}
try: if role.content_type is not None:
role_dict['resource_name'] = role.content_object.name role_dict['resource_name'] = role.content_object.name
role_dict['resource_type'] = role.content_type.name role_dict['resource_type'] = role.content_type.name
role_dict['related'] = reverse_gfk(role.content_object) role_dict['related'] = reverse_gfk(role.content_object)
role_dict['user_capabilities'] = {'unattach': requesting_user.can_access( role_dict['user_capabilities'] = {'unattach': requesting_user.can_access(
Role, 'unattach', role, user, 'members', data={}, skip_sub_obj_read_check=False)} Role, 'unattach', role, user, 'members', data={}, skip_sub_obj_read_check=False)}
except:
pass
return { 'role': role_dict, 'descendant_roles': get_roles_on_resource(obj, role)} return { 'role': role_dict, 'descendant_roles': get_roles_on_resource(obj, role)}
def format_team_role_perm(team_role, permissive_role_ids): def format_team_role_perm(team_role, permissive_role_ids):
@@ -1584,14 +1583,12 @@ class ResourceAccessListElementSerializer(UserSerializer):
'team_id': team_role.object_id, 'team_id': team_role.object_id,
'team_name': team_role.content_object.name 'team_name': team_role.content_object.name
} }
try: if role.content_type is not None:
role_dict['resource_name'] = role.content_object.name role_dict['resource_name'] = role.content_object.name
role_dict['resource_type'] = role.content_type.name role_dict['resource_type'] = role.content_type.name
role_dict['related'] = reverse_gfk(role.content_object) role_dict['related'] = reverse_gfk(role.content_object)
role_dict['user_capabilities'] = {'unattach': requesting_user.can_access( role_dict['user_capabilities'] = {'unattach': requesting_user.can_access(
Role, 'unattach', role, team_role, 'parents', data={}, skip_sub_obj_read_check=False)} Role, 'unattach', role, team_role, 'parents', data={}, skip_sub_obj_read_check=False)}
except:
pass
ret.append({ 'role': role_dict, 'descendant_roles': get_roles_on_resource(obj, team_role)}) ret.append({ 'role': role_dict, 'descendant_roles': get_roles_on_resource(obj, team_role)})
return ret return ret
@@ -1885,6 +1882,7 @@ class JobTemplateSerializer(UnifiedJobTemplateSerializer, JobOptionsSerializer):
d['survey'] = dict(title=obj.survey_spec['name'], description=obj.survey_spec['description']) d['survey'] = dict(title=obj.survey_spec['name'], description=obj.survey_spec['description'])
request = self.context.get('request', None) request = self.context.get('request', None)
# Remove the can_copy and can_edit fields when dependencies are fully removed
# Check for conditions that would create a validation error if coppied # Check for conditions that would create a validation error if coppied
validation_errors, resources_needed_to_start = obj.resource_validation_data() validation_errors, resources_needed_to_start = obj.resource_validation_data()

View File

@@ -249,13 +249,16 @@ class BaseAccess(object):
elif display_method == 'delete' and not isinstance(obj, (User, UnifiedJob)): elif display_method == 'delete' and not isinstance(obj, (User, UnifiedJob)):
user_capabilities['delete'] = user_capabilities['edit'] user_capabilities['delete'] = user_capabilities['edit']
continue continue
if display_method == 'copy' and isinstance(obj, JobTemplate):
validation_errors, resources_needed_to_start = obj.resource_validation_data()
if validation_errors:
user_capabilities['copy'] = False
continue
# Preprocessing before the access method is called # Preprocessing before the access method is called
data = None data = {}
if isinstance(obj, JobTemplate): if method == 'add' and isinstance(obj, JobTemplate):
data = {'reference_obj': obj} data['reference_obj'] = obj
elif method == 'add':
data = {}
# Compute permission # Compute permission
access_method = getattr(self, "can_%s" % method) access_method = getattr(self, "can_%s" % method)

View File

@@ -1,31 +0,0 @@
import pytest
from django.core.urlresolvers import reverse
@pytest.mark.django_db
def test_inventory_group_host_can_add(inventory, alice, options):
inventory.admin_role.members.add(alice)
response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), alice)
assert 'POST' in response.data['actions']
response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), alice)
assert 'POST' in response.data['actions']
@pytest.mark.django_db
def test_inventory_group_host_can_not_add(inventory, bob, options):
inventory.read_role.members.add(bob)
response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), bob)
assert 'POST' not in response.data['actions']
response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), bob)
assert 'POST' not in response.data['actions']
@pytest.mark.django_db
def test_user_list_can_add(org_member, org_admin, options):
response = options(reverse('api:user_list'), org_admin)
assert 'POST' in response.data['actions']
@pytest.mark.django_db
def test_user_list_can_not_add(org_member, org_admin, options):
response = options(reverse('api:user_list'), org_member)
assert 'POST' not in response.data['actions']

View File

@@ -3,12 +3,11 @@ import mock
# AWX # AWX
from awx.api.serializers import JobTemplateSerializer, JobLaunchSerializer from awx.api.serializers import JobTemplateSerializer, JobLaunchSerializer
from awx.main.models.jobs import JobTemplate, Job from awx.main.models.jobs import Job
from awx.main.models.projects import ProjectOptions from awx.main.models.projects import ProjectOptions
from awx.main.migrations import _save_password_keys as save_password_keys from awx.main.migrations import _save_password_keys as save_password_keys
# Django # Django
from django.test.client import RequestFactory
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.apps import apps from django.apps import apps
@@ -141,131 +140,7 @@ def test_job_template_role_user(post, organization_factory, job_template_factory
response = post(url, dict(id=jt_objects.job_template.execute_role.pk), objects.superusers.admin) response = post(url, dict(id=jt_objects.job_template.execute_role.pk), objects.superusers.admin)
assert response.status_code == 204 assert response.status_code == 204
# Test protection against limited set of validation problems
@pytest.mark.django_db
def test_bad_data_copy_edit(admin_user, project):
"""
If a required resource (inventory here) was deleted, copying not allowed
because doing so would caues a validation error
"""
jt_res = JobTemplate.objects.create(
job_type='run',
project=project,
inventory=None, ask_inventory_on_launch=False, # not allowed
credential=None, ask_credential_on_launch=True,
name='deploy-job-template'
)
serializer = JobTemplateSerializer(jt_res)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = admin_user
serializer.context['request'] = request
response = serializer.to_representation(jt_res)
assert not response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
# Tests for correspondence between view info and actual access
@pytest.mark.django_db
def test_admin_copy_edit(jt_copy_edit, admin_user):
"Absent a validation error, system admins can do everything"
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = admin_user
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_org_admin_copy_edit(jt_copy_edit, org_admin):
"Organization admins SHOULD be able to copy a JT firmly in their org"
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = org_admin
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_org_admin_foreign_cred_no_copy_edit(jt_copy_edit, org_admin, machine_credential):
"""
Organization admins without access to the 3 related resources:
SHOULD NOT be able to copy JT
SHOULD be able to edit that job template, for nonsensitive changes
"""
# Attach credential to JT that org admin can not use
jt_copy_edit.credential = machine_credential
jt_copy_edit.save()
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = org_admin
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_jt_admin_copy_edit(jt_copy_edit, rando):
"""
JT admins wihout access to associated resources SHOULD NOT be able to copy
SHOULD be able to make nonsensitive changes"""
# random user given JT admin access only
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = rando
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
@pytest.mark.django_db
def test_proj_jt_admin_copy_edit(jt_copy_edit, rando):
"JT admins with access to associated resources SHOULD be able to copy"
# random user given JT and project admin abilities
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
jt_copy_edit.project.admin_role.members.add(rando)
jt_copy_edit.project.save()
# Serializer can_copy/can_edit fields
serializer = JobTemplateSerializer(jt_copy_edit)
request = RequestFactory().get('/api/v1/job_templates/12/')
request.user = rando
serializer.context['request'] = request
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['can_copy']
assert response['summary_fields']['can_edit']
# Functional tests - create new JT with all returned fields, as the UI does
@pytest.mark.django_db
@mock.patch.object(ProjectOptions, "playbooks", project_playbooks)
def test_org_admin_copy_edit_functional(jt_copy_edit, org_admin, get, post):
get_response = get(reverse('api:job_template_detail', args=[jt_copy_edit.pk]), user=org_admin)
assert get_response.status_code == 200
assert get_response.data['summary_fields']['can_copy']
post_data = get_response.data
post_data['name'] = '%s @ 12:19:47 pm' % post_data['name']
post_response = post(reverse('api:job_template_list', args=[]), user=org_admin, data=post_data)
assert post_response.status_code == 201
assert post_response.data['name'] == 'copy-edit-job-template @ 12:19:47 pm'
@pytest.mark.django_db @pytest.mark.django_db
@mock.patch.object(ProjectOptions, "playbooks", project_playbooks) @mock.patch.object(ProjectOptions, "playbooks", project_playbooks)
@@ -277,7 +152,6 @@ def test_jt_admin_copy_edit_functional(jt_copy_edit, rando, get, post):
get_response = get(reverse('api:job_template_detail', args=[jt_copy_edit.pk]), user=rando) get_response = get(reverse('api:job_template_detail', args=[jt_copy_edit.pk]), user=rando)
assert get_response.status_code == 200 assert get_response.status_code == 200
assert not get_response.data['summary_fields']['can_copy']
post_data = get_response.data post_data = get_response.data
post_data['name'] = '%s @ 12:19:47 pm' % post_data['name'] post_data['name'] = '%s @ 12:19:47 pm' % post_data['name']

View File

@@ -0,0 +1,212 @@
import pytest
from django.core.urlresolvers import reverse
from django.test.client import RequestFactory
from awx.main.models.jobs import JobTemplate
from awx.main.models import Role
from awx.api.serializers import JobTemplateSerializer
from awx.main.access import access_registry
# This file covers special-cases of displays of user_capabilities
# general functionality should be covered fully by unit tests, see:
# awx/main/tests/unit/api/test_serializers.py ::
# TestJobTemplateSerializerGetSummaryFields.test_copy_edit_standard
# awx/main/tests/unit/test_access.py ::
# test_user_capabilities_method
class FakeView(object):
pass
@pytest.fixture
def jt_copy_edit(job_template_factory, project):
objects = job_template_factory(
'copy-edit-job-template',
project=project)
return objects.job_template
@pytest.mark.django_db
def test_inventory_group_host_can_add(inventory, alice, options):
inventory.admin_role.members.add(alice)
response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), alice)
assert 'POST' in response.data['actions']
response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), alice)
assert 'POST' in response.data['actions']
@pytest.mark.django_db
def test_inventory_group_host_can_not_add(inventory, bob, options):
inventory.read_role.members.add(bob)
response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), bob)
assert 'POST' not in response.data['actions']
response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), bob)
assert 'POST' not in response.data['actions']
@pytest.mark.django_db
def test_user_list_can_add(org_member, org_admin, options):
response = options(reverse('api:user_list'), org_admin)
assert 'POST' in response.data['actions']
@pytest.mark.django_db
def test_user_list_can_not_add(org_member, org_admin, options):
response = options(reverse('api:user_list'), org_member)
assert 'POST' not in response.data['actions']
def fake_context(user):
request = RequestFactory().get('/api/v1/resource/42/')
request.user = user
fake_view = FakeView()
fake_view.request = request
context = {}
context['view'] = fake_view
context['request'] = request
return context
# Test protection against limited set of validation problems
@pytest.mark.django_db
def test_bad_data_copy_edit(admin_user, project):
"""
If a required resource (inventory here) was deleted, copying not allowed
because doing so would caues a validation error
"""
jt_res = JobTemplate.objects.create(
job_type='run',
project=project,
inventory=None, ask_inventory_on_launch=False, # not allowed
credential=None, ask_credential_on_launch=True,
name='deploy-job-template'
)
serializer = JobTemplateSerializer(jt_res)
serializer.context = fake_context(admin_user)
response = serializer.to_representation(jt_res)
assert not response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
# Tests for correspondence between view info and intended access
@pytest.mark.django_db
def test_sys_admin_copy_edit(jt_copy_edit, admin_user):
"Absent a validation error, system admins can do everything"
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = fake_context(admin_user)
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
@pytest.mark.django_db
def test_org_admin_copy_edit(jt_copy_edit, org_admin):
"Organization admins SHOULD be able to copy a JT firmly in their org"
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = fake_context(org_admin)
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
@pytest.mark.django_db
def test_org_admin_foreign_cred_no_copy_edit(jt_copy_edit, org_admin, machine_credential):
"""
Organization admins without access to the 3 related resources:
SHOULD NOT be able to copy JT
SHOULD be able to edit that job template, for nonsensitive changes
"""
# Attach credential to JT that org admin can not use
jt_copy_edit.credential = machine_credential
jt_copy_edit.save()
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = fake_context(org_admin)
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
@pytest.mark.django_db
def test_jt_admin_copy_edit(jt_copy_edit, rando):
"""
JT admins wihout access to associated resources SHOULD NOT be able to copy
SHOULD be able to make nonsensitive changes"""
# random user given JT admin access only
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = fake_context(rando)
response = serializer.to_representation(jt_copy_edit)
assert not response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
@pytest.mark.django_db
def test_proj_jt_admin_copy_edit(jt_copy_edit, rando):
"JT admins with access to associated resources SHOULD be able to copy"
# random user given JT and project admin abilities
jt_copy_edit.admin_role.members.add(rando)
jt_copy_edit.save()
jt_copy_edit.project.admin_role.members.add(rando)
jt_copy_edit.project.save()
serializer = JobTemplateSerializer(jt_copy_edit)
serializer.context = fake_context(rando)
response = serializer.to_representation(jt_copy_edit)
assert response['summary_fields']['user_capabilities']['copy']
assert response['summary_fields']['user_capabilities']['edit']
@pytest.mark.django_db
class TestAccessListCapabilities:
@pytest.fixture
def mock_access_method(self, mocker):
"Mocking this requires extra work because of the logging statement"
mock_method = mocker.MagicMock()
mock_method.return_value = 'foobar'
mock_method.__name__ = 'bars'
return mock_method
def _assert_one_in_list(self, data, sublist='direct_access'):
assert len(data['results']) == 1
assert len(data['results'][0]['summary_fields'][sublist]) == 1
def test_access_list_direct_access_capability(self, inventory, rando, get, mocker, mock_access_method):
"""Test that the access_list serializer shows the exact output of the
RoleAccess.can_attach method in the direct_access list"""
inventory.admin_role.members.add(rando)
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:inventory_access_list', args=(inventory.id,)), rando)
self._assert_one_in_list(response.data)
direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
def test_access_list_indirect_access_capability(self, inventory, admin_user, get, mocker, mock_access_method):
"""Test the display of unattach access for a singleton permission"""
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:inventory_access_list', args=(inventory.id,)), admin_user)
self._assert_one_in_list(response.data, sublist='indirect_access')
indirect_access_list = response.data['results'][0]['summary_fields']['indirect_access']
assert indirect_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
def test_access_list_team_direct_access_capability(self, inventory, team, team_member, get, mocker, mock_access_method):
"""Test the display of unattach access for team-based permissions
this happens in a difference place in the serializer code from the user permission"""
team.member_role.children.add(inventory.admin_role)
with mocker.patch.object(access_registry[Role][0], 'can_unattach', mock_access_method):
response = get(reverse('api:inventory_access_list', args=(inventory.id,)), team_member)
self._assert_one_in_list(response.data)
direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
@pytest.mark.django_db
def test_team_roles_unattach(mocker):
pass
@pytest.mark.django_db
def test_user_roles_unattach(mocker):
pass

View File

@@ -11,7 +11,9 @@ from awx.api.serializers import (
JobOptionsSerializer, JobOptionsSerializer,
CustomInventoryScriptSerializer, CustomInventoryScriptSerializer,
) )
from awx.api.views import JobTemplateDetail
from awx.main.models import ( from awx.main.models import (
Role,
Label, Label,
Job, Job,
CustomInventoryScript, CustomInventoryScript,
@@ -123,21 +125,32 @@ class TestJobTemplateSerializerGetSummaryFields(GetSummaryFieldsMixin):
summary = self._mock_and_run(JobTemplateSerializer, job_template) summary = self._mock_and_run(JobTemplateSerializer, job_template)
assert 'survey' not in summary assert 'survey' not in summary
@pytest.mark.skip(reason="RBAC needs to land") def test_copy_edit_standard(self, mocker, job_template_factory):
def test_can_copy_true(self, mocker, job_template): """Verify that the exact output of the access.py methods
pass are put into the serializer user_capabilities"""
@pytest.mark.skip(reason="RBAC needs to land") jt_obj = job_template_factory('testJT', project='proj1', persisted=False).job_template
def test_can_copy_false(self, mocker, job_template): jt_obj.id = 5
pass jt_obj.admin_role = Role(id=9, role_field='admin_role')
jt_obj.execute_role = Role(id=8, role_field='execute_role')
jt_obj.read_role = Role(id=7, role_field='execute_role')
user = User(username="auser")
serializer = JobTemplateSerializer(job_template)
serializer.show_capabilities = ['copy', 'edit']
serializer._summary_field_labels = lambda self: []
serializer._recent_jobs = lambda self: []
request = APIRequestFactory().get('/api/v1/job_templates/42/')
request.user = user
view = JobTemplateDetail()
view.request = request
serializer.context['view'] = view
@pytest.mark.skip(reason="RBAC needs to land") with mocker.patch("awx.main.access.JobTemplateAccess.can_change", return_value='foobar'):
def test_can_edit_true(self, mocker, job_template): with mocker.patch("awx.main.access.JobTemplateAccess.can_add", return_value='foo'):
pass response = serializer.get_summary_fields(jt_obj)
@pytest.mark.skip(reason="RBAC needs to land") assert response['user_capabilities']['copy'] == 'foo'
def test_can_edit_false(self, mocker, job_template): assert response['user_capabilities']['edit'] == 'foobar'
pass
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {}) @mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
@mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {}) @mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {})

View File

@@ -110,3 +110,27 @@ def test_jt_can_add_bad_data(user_unit):
access = JobTemplateAccess(user_unit) access = JobTemplateAccess(user_unit)
assert not access.can_add({'asdf': 'asdf'}) assert not access.can_add({'asdf': 'asdf'})
@pytest.mark.django_db
def test_user_capabilities_method():
"""Unit test to verify that the user_capabilities method will defer
to the appropriate sub-class methods of the access classes.
Note that normal output is True/False, but a string is returned
in these tests to establish uniqueness.
"""
class FooAccess(BaseAccess):
def can_change(self, obj, data):
return 'bar'
def can_add(self, data):
return 'foobar'
user = User(username='auser')
foo_access = FooAccess(user)
foo = object()
foo_capabilities = foo_access.get_user_capabilities(foo, ['edit', 'copy'])
assert foo_capabilities == {
'edit': 'bar',
'copy': 'foobar'
}

View File

@@ -415,7 +415,6 @@ def cache_list_capabilities(page, role_types, model, user):
are save on each object in the list, using 1 query for each role type are save on each object in the list, using 1 query for each role type
''' '''
page_ids = [obj.id for obj in page] page_ids = [obj.id for obj in page]
id_lists = {}
for obj in page: for obj in page:
obj.capabilities_cache = {} obj.capabilities_cache = {}