From ede1b9af924b547e2d5a2a8b5568a482da3682ce Mon Sep 17 00:00:00 2001 From: jainnikhil30 Date: Mon, 27 Feb 2023 21:16:41 +0530 Subject: [PATCH] add more functional tests for prompted fields and fix the lint test check label permission and fix lint (#13) * set created by and launch type correctly This makes "launched_by" get computed right in the tests. Mysteriously this seemed to work from API browser, but this seems more correct to have it work this way, and makes tests actually work. For "manual" launch types the attribute used to populate "launched_by" is "created_by". And we already have "is_bulk_job" to indicate that the job is a bulk job. So lets just use this. * check label is in an organization you can read --- awx/api/serializers.py | 6 +++-- awx/main/tests/functional/test_bulk.py | 34 ++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 3c93f7e5d2..c42bb56f66 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4673,13 +4673,15 @@ class BulkJobLaunchSerializer(BaseSerializer): return attrs def create(self, validated_data): + request = self.context.get('request', None) + launch_user = request.user if request else None job_node_data = validated_data.pop('jobs') wfj_deferred_attr_names = ('skip_tags', 'limit', 'job_tags') wfj_deferred_vals = {} for item in wfj_deferred_attr_names: wfj_deferred_vals[item] = validated_data.pop(item, None) - wfj = WorkflowJob.objects.create(**validated_data, is_bulk_job=True) + wfj = WorkflowJob.objects.create(**validated_data, is_bulk_job=True, launch_type='manual', created_by=launch_user) for key, val in wfj_deferred_vals.items(): if val: setattr(wfj, key, val) @@ -4802,7 +4804,7 @@ class BulkJobLaunchSerializer(BaseSerializer): raise serializers.ValidationError(_(f"Credentials {not_allowed} not found or you don't have permissions to access it")) def check_label_permission(self, requested_use_labels): - accessible_use_labels = {tup.id for tup in Label.objects.all()} + accessible_use_labels = {tup.id for tup in Label.objects.all(organization__in=Organizations.accessible_pk_qs(request.user, 'read_role'))} if requested_use_labels - accessible_use_labels: not_allowed = requested_use_labels - accessible_use_labels raise serializers.ValidationError(_(f"Labels {not_allowed} not found or you don't have permissions to access it")) diff --git a/awx/main/tests/functional/test_bulk.py b/awx/main/tests/functional/test_bulk.py index a2a6413368..638bbc500f 100644 --- a/awx/main/tests/functional/test_bulk.py +++ b/awx/main/tests/functional/test_bulk.py @@ -4,12 +4,9 @@ from uuid import uuid4 from awx.api.versioning import reverse -import json -from contextlib import contextmanager -from django.db import connections - from awx.main.models.jobs import JobTemplate from awx.main.models import Organization, Inventory, WorkflowJob +from awx.main.scheduler import TaskManager @pytest.mark.django_db @@ -104,13 +101,17 @@ def test_bulk_job_launch_queries(job_template, organization, inventory, project, with django_assert_max_num_queries(num_queries): bulk_job_launch_response = post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': jobs}, normal_user, expect=201).data - for u in (org_admin,): # TODO normal_user not working because launched_by not getting set in the tests...does happen in real request + # Run task manager so the workflow job nodes actually spawn + TaskManager().schedule() + + for u in (org_admin, normal_user): bulk_job = get(bulk_job_launch_response['url'], u, expect=200).data assert organization.id == bulk_job['summary_fields']['organization']['id'] resp = get(bulk_job_launch_response['related']['workflow_nodes'], u) assert resp.data['count'] == num_jobs for item in resp.data['results']: assert item["unified_job_template"] == jt.id + assert item["inventory"] == inventory.id @pytest.mark.django_db @@ -196,3 +197,26 @@ def test_bulk_job_launch_inventory_no_access(job_template, organization, invento inv = Inventory.objects.create(name='inv1', organization=org2) jt.execute_role.members.add(normal_user) post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=400) + + +@pytest.mark.django_db +def test_bulk_job_inventory_prompt(job_template, organization, inventory, project, credential, post, get, user): + ''' + Job template has an inventory set as prompt_on_launch + and if I provide the inventory as a parameter in bulk job + ... job uses that inventory + ''' + normal_user = user('normal_user', False) + org1 = Organization.objects.create(name='foo1') + jt = JobTemplate.objects.create(name='my-jt', ask_inventory_on_launch=True, project=project, playbook='helloworld.yml') + jt.save() + org1.member_role.members.add(normal_user) + inv = Inventory.objects.create(name='inv1', organization=org1) + jt.execute_role.members.add(normal_user) + inv.use_role.members.add(normal_user) + bulk_job_launch_response = post( + reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=201 + ).data + bulk_job_id = bulk_job_launch_response['id'] + node = WorkflowJob.objects.get(id=bulk_job_id).workflow_job_nodes.all().order_by('created') + assert inv.id == node[0].inventory.id