diff --git a/awx/api/views.py b/awx/api/views.py index 300ae3e209..1d0c3895c5 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2205,7 +2205,10 @@ class JobTemplateList(ListCreateAPIView): model = JobTemplate serializer_class = JobTemplateSerializer always_allow_superuser = False - capabilities_prefetch = ['admin', 'execute'] + capabilities_prefetch = [ + 'admin', 'execute', + {'copy': ['project.use', 'inventory.use', 'credential.use', 'cloud_credential.use', 'network_credential.use']} + ] def post(self, request, *args, **kwargs): ret = super(JobTemplateList, self).post(request, *args, **kwargs) diff --git a/awx/main/tests/functional/api/test_rbac_displays.py b/awx/main/tests/functional/api/test_rbac_displays.py index 5424d06616..8ed9e7ade9 100644 --- a/awx/main/tests/functional/api/test_rbac_displays.py +++ b/awx/main/tests/functional/api/test_rbac_displays.py @@ -16,147 +16,148 @@ from awx.api.serializers import JobTemplateSerializer # awx/main/tests/unit/test_access.py :: # test_user_capabilities_method -class FakeView(object): - pass - -@pytest.fixture -def jt_copy_edit(job_template_factory, project): - objects = job_template_factory( - 'copy-edit-job-template', - project=project) - return objects.job_template - @pytest.mark.django_db -def test_inventory_group_host_can_add(inventory, alice, options): - inventory.admin_role.members.add(alice) - - response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), alice) - assert 'POST' in response.data['actions'] - response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), alice) - assert 'POST' in response.data['actions'] - -@pytest.mark.django_db -def test_inventory_group_host_can_not_add(inventory, bob, options): - inventory.read_role.members.add(bob) - - response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), bob) - assert 'POST' not in response.data['actions'] - response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), bob) - assert 'POST' not in response.data['actions'] - -@pytest.mark.django_db -def test_user_list_can_add(org_member, org_admin, options): - response = options(reverse('api:user_list'), org_admin) - assert 'POST' in response.data['actions'] - -@pytest.mark.django_db -def test_user_list_can_not_add(org_member, org_admin, options): - response = options(reverse('api:user_list'), org_member) - assert 'POST' not in response.data['actions'] - - -def fake_context(user): - request = RequestFactory().get('/api/v1/resource/42/') - request.user = user - fake_view = FakeView() - fake_view.request = request - context = {} - context['view'] = fake_view - context['request'] = request - return context - -# Test protection against limited set of validation problems - -@pytest.mark.django_db -def test_bad_data_copy_edit(admin_user, project): +class TestOptionsRBAC: """ - If a required resource (inventory here) was deleted, copying not allowed - because doing so would caues a validation error + Several endpoints are relied-upon by the UI to list POST as an + allowed action or not depending on whether the user has permission + to create a resource. """ - jt_res = JobTemplate.objects.create( - job_type='run', - project=project, - inventory=None, ask_inventory_on_launch=False, # not allowed - credential=None, ask_credential_on_launch=True, - name='deploy-job-template' - ) - serializer = JobTemplateSerializer(jt_res) - serializer.context = fake_context(admin_user) - response = serializer.to_representation(jt_res) - assert not response['summary_fields']['user_capabilities']['copy'] - assert response['summary_fields']['user_capabilities']['edit'] + def test_inventory_group_host_can_add(self, inventory, alice, options): + inventory.admin_role.members.add(alice) + + response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), alice) + assert 'POST' in response.data['actions'] + response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), alice) + assert 'POST' in response.data['actions'] + + def test_inventory_group_host_can_not_add(self, inventory, bob, options): + inventory.read_role.members.add(bob) + + response = options(reverse('api:inventory_hosts_list', args=[inventory.pk]), bob) + assert 'POST' not in response.data['actions'] + response = options(reverse('api:inventory_groups_list', args=[inventory.pk]), bob) + assert 'POST' not in response.data['actions'] + + def test_user_list_can_add(self, org_member, org_admin, options): + response = options(reverse('api:user_list'), org_admin) + assert 'POST' in response.data['actions'] + + def test_user_list_can_not_add(self, org_member, org_admin, options): + response = options(reverse('api:user_list'), org_member) + assert 'POST' not in response.data['actions'] -# Tests for correspondence between view info and intended access @pytest.mark.django_db -def test_sys_admin_copy_edit(jt_copy_edit, admin_user): - "Absent a validation error, system admins can do everything" - serializer = JobTemplateSerializer(jt_copy_edit) - serializer.context = fake_context(admin_user) - response = serializer.to_representation(jt_copy_edit) - assert response['summary_fields']['user_capabilities']['copy'] - assert response['summary_fields']['user_capabilities']['edit'] - -@pytest.mark.django_db -def test_org_admin_copy_edit(jt_copy_edit, org_admin): - "Organization admins SHOULD be able to copy a JT firmly in their org" - serializer = JobTemplateSerializer(jt_copy_edit) - serializer.context = fake_context(org_admin) - response = serializer.to_representation(jt_copy_edit) - assert response['summary_fields']['user_capabilities']['copy'] - assert response['summary_fields']['user_capabilities']['edit'] - -@pytest.mark.django_db -def test_org_admin_foreign_cred_no_copy_edit(jt_copy_edit, org_admin, machine_credential): +class TestJobTemplateCopyEdit: """ - Organization admins without access to the 3 related resources: - SHOULD NOT be able to copy JT - SHOULD be able to edit that job template, for nonsensitive changes + Tests contain scenarios that were raised as issues in the past, + which resulted from failed copy/edit actions even though the buttons + to do these actions were displayed. """ - # Attach credential to JT that org admin can not use - jt_copy_edit.credential = machine_credential - jt_copy_edit.save() + @pytest.fixture + def jt_copy_edit(self, job_template_factory, project): + objects = job_template_factory( + 'copy-edit-job-template', + project=project) + return objects.job_template - serializer = JobTemplateSerializer(jt_copy_edit) - serializer.context = fake_context(org_admin) - response = serializer.to_representation(jt_copy_edit) - assert not response['summary_fields']['user_capabilities']['copy'] - assert response['summary_fields']['user_capabilities']['edit'] + def fake_context(self, user): + request = RequestFactory().get('/api/v1/resource/42/') + request.user = user + class FakeView(object): + pass + fake_view = FakeView() + fake_view.request = request + context = {} + context['view'] = fake_view + context['request'] = request + return context -@pytest.mark.django_db -def test_jt_admin_copy_edit(jt_copy_edit, rando): - """ - JT admins wihout access to associated resources SHOULD NOT be able to copy - SHOULD be able to make nonsensitive changes""" + def test_validation_bad_data_copy_edit(self, admin_user, project): + """ + If a required resource (inventory here) was deleted, copying not allowed + because doing so would caues a validation error + """ - # random user given JT admin access only - jt_copy_edit.admin_role.members.add(rando) - jt_copy_edit.save() + jt_res = JobTemplate.objects.create( + job_type='run', + project=project, + inventory=None, ask_inventory_on_launch=False, # not allowed + credential=None, ask_credential_on_launch=True, + name='deploy-job-template' + ) + serializer = JobTemplateSerializer(jt_res) + serializer.context = self.fake_context(admin_user) + response = serializer.to_representation(jt_res) + assert not response['summary_fields']['user_capabilities']['copy'] + assert response['summary_fields']['user_capabilities']['edit'] - serializer = JobTemplateSerializer(jt_copy_edit) - serializer.context = fake_context(rando) - response = serializer.to_representation(jt_copy_edit) - assert not response['summary_fields']['user_capabilities']['copy'] - assert response['summary_fields']['user_capabilities']['edit'] + def test_sys_admin_copy_edit(self, jt_copy_edit, admin_user): + "Absent a validation error, system admins can do everything" + serializer = JobTemplateSerializer(jt_copy_edit) + serializer.context = self.fake_context(admin_user) + response = serializer.to_representation(jt_copy_edit) + assert response['summary_fields']['user_capabilities']['copy'] + assert response['summary_fields']['user_capabilities']['edit'] -@pytest.mark.django_db -def test_proj_jt_admin_copy_edit(jt_copy_edit, rando): - "JT admins with access to associated resources SHOULD be able to copy" + def test_org_admin_copy_edit(self, jt_copy_edit, org_admin): + "Organization admins SHOULD be able to copy a JT firmly in their org" + serializer = JobTemplateSerializer(jt_copy_edit) + serializer.context = self.fake_context(org_admin) + response = serializer.to_representation(jt_copy_edit) + assert response['summary_fields']['user_capabilities']['copy'] + assert response['summary_fields']['user_capabilities']['edit'] - # random user given JT and project admin abilities - jt_copy_edit.admin_role.members.add(rando) - jt_copy_edit.save() - jt_copy_edit.project.admin_role.members.add(rando) - jt_copy_edit.project.save() + def test_org_admin_foreign_cred_no_copy_edit(self, jt_copy_edit, org_admin, machine_credential): + """ + Organization admins without access to the 3 related resources: + SHOULD NOT be able to copy JT + SHOULD be able to edit that job template, for nonsensitive changes + """ - serializer = JobTemplateSerializer(jt_copy_edit) - serializer.context = fake_context(rando) - response = serializer.to_representation(jt_copy_edit) - assert response['summary_fields']['user_capabilities']['copy'] - assert response['summary_fields']['user_capabilities']['edit'] + # Attach credential to JT that org admin can not use + jt_copy_edit.credential = machine_credential + jt_copy_edit.save() + + serializer = JobTemplateSerializer(jt_copy_edit) + serializer.context = self.fake_context(org_admin) + response = serializer.to_representation(jt_copy_edit) + assert not response['summary_fields']['user_capabilities']['copy'] + assert response['summary_fields']['user_capabilities']['edit'] + + def test_jt_admin_copy_edit(self, jt_copy_edit, rando): + """ + JT admins wihout access to associated resources SHOULD NOT be able to copy + SHOULD be able to make nonsensitive changes""" + + # random user given JT admin access only + jt_copy_edit.admin_role.members.add(rando) + jt_copy_edit.save() + + serializer = JobTemplateSerializer(jt_copy_edit) + serializer.context = self.fake_context(rando) + response = serializer.to_representation(jt_copy_edit) + assert not response['summary_fields']['user_capabilities']['copy'] + assert response['summary_fields']['user_capabilities']['edit'] + + def test_proj_jt_admin_copy_edit(self, jt_copy_edit, rando): + "JT admins with access to associated resources SHOULD be able to copy" + + # random user given JT and project admin abilities + jt_copy_edit.admin_role.members.add(rando) + jt_copy_edit.save() + jt_copy_edit.project.admin_role.members.add(rando) + jt_copy_edit.project.save() + + serializer = JobTemplateSerializer(jt_copy_edit) + serializer.context = self.fake_context(rando) + response = serializer.to_representation(jt_copy_edit) + assert response['summary_fields']['user_capabilities']['copy'] + assert response['summary_fields']['user_capabilities']['edit'] @pytest.fixture @@ -166,7 +167,6 @@ def mock_access_method(mocker): mock_method.__name__ = 'bars' # Required for a logging statement return mock_method - @pytest.mark.django_db class TestAccessListCapabilities: """ @@ -274,3 +274,27 @@ def test_prefetch_group_capabilities(group, rando): cache_list_capabilities(qs, ['inventory.admin', 'inventory.adhoc'], Group, rando) assert qs[0].capabilities_cache == {'edit': False, 'adhoc': True} +@pytest.mark.django_db +def test_prefetch_jt_copy_capability(job_template, project, inventory, machine_credential, rando): + job_template.project = project + job_template.inventory = inventory + job_template.credential = machine_credential + job_template.save() + + qs = JobTemplate.objects.all() + cache_list_capabilities(qs, [{'copy': [ + 'project.use', 'inventory.use', 'credential.use', + 'cloud_credential.use', 'network_credential.use' + ]}], JobTemplate, rando) + assert qs[0].capabilities_cache == {'copy': False} + + project.use_role.members.add(rando) + inventory.use_role.members.add(rando) + machine_credential.use_role.members.add(rando) + + cache_list_capabilities(qs, [{'copy': [ + 'project.use', 'inventory.use', 'credential.use', + 'cloud_credential.use', 'network_credential.use' + ]}], JobTemplate, rando) + assert qs[0].capabilities_cache == {'copy': True} + diff --git a/awx/main/utils.py b/awx/main/utils.py index 84cfa41a54..e21913af51 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -409,7 +409,7 @@ def get_model_for_type(type): return ct_model -def cache_list_capabilities(page, role_types, model, user): +def cache_list_capabilities(page, prefetch_list, model, user): ''' Given a `page` list of objects, the specified roles for the specified user are save on each object in the list, using 1 query for each role type @@ -418,36 +418,54 @@ def cache_list_capabilities(page, role_types, model, user): capabilities_prefetch = ['admin', 'execute'] --> prefetch the admin (edit) and execute (start) permissions for items in list for current user - capabilities_prefetch = ['inventory.admin_role'] + capabilities_prefetch = ['inventory.admin'] --> prefetch the related inventory FK permissions for current user, and put it into the object's cache + capabilities_prefetch = [{'copy': ['inventory.admin', 'project.admin']}] + --> prefetch logical combination of admin permission to inventory AND + project, put into cache dictionary as "copy" ''' + from django.db.models import Q page_ids = [obj.id for obj in page] for obj in page: obj.capabilities_cache = {} - for role_path in role_types: - if '.' in role_path: - path = '__'.join(role_path.split('.')[:-1]) - role_type = role_path.split('.')[-1] - else: - path = None - role_type = role_path + for prefetch_entry in prefetch_list: - # Role name translation to UI names for methods - display_method = role_type - if role_type == 'admin': - display_method = 'edit' - elif role_type in ['execute', 'update']: - display_method = 'start' - - # Query for union of page objects & role accessible_objects - if path: - parent_model = model._meta.get_field(path).related_model - kwargs = {'%s__in' % path: parent_model.accessible_objects(user, '%s_role' % role_type)} - qs_obj = model.objects.filter(**kwargs) + display_method = None + if type(prefetch_entry) is dict: + display_method = prefetch_entry.keys()[0] + paths = prefetch_entry[display_method] else: - qs_obj = model.accessible_objects(user, '%s_role' % role_type) + paths = prefetch_entry + + if type(paths) is not list: + paths = [paths] + + # Build the query for accessible_objects according the user & role(s) + qs_obj = None + for role_path in paths: + if '.' in role_path: + res_path = '__'.join(role_path.split('.')[:-1]) + role_type = role_path.split('.')[-1] + if qs_obj is None: + qs_obj = model.objects + parent_model = model._meta.get_field(res_path).related_model + kwargs = {'%s__in' % res_path: parent_model.accessible_objects(user, '%s_role' % role_type)} + qs_obj = qs_obj.filter(Q(**kwargs) | Q(**{'%s__isnull' % res_path: True})) + else: + role_type = role_path + qs_obj = model.accessible_objects(user, '%s_role' % role_type) + + if display_method is None: + # Role name translation to UI names for methods + display_method = role_type + if role_type == 'admin': + display_method = 'edit' + elif role_type in ['execute', 'update']: + display_method = 'start' + + # Union that query with the list of items on page ids_with_role = set(qs_obj.filter(pk__in=page_ids).values_list('pk', flat=True)) # Save data item-by-item