add more functional tests for prompted fields and fix the lint test

check label permission and fix lint (#13)

* set created by and launch type correctly

This makes "launched_by" get computed right in the tests.

Mysteriously this seemed to work from API browser, but
this seems more correct to have it work this way, and makes
tests actually work.

For "manual" launch types the attribute used to populate "launched_by"
is "created_by". And we already have "is_bulk_job" to indicate that the
job is a bulk job. So lets just use this.

* check label is in an organization you can read
This commit is contained in:
jainnikhil30 2023-02-27 21:16:41 +05:30 committed by Elijah DeLee
parent 2becc5dda9
commit ede1b9af92
2 changed files with 33 additions and 7 deletions

View File

@ -4673,13 +4673,15 @@ class BulkJobLaunchSerializer(BaseSerializer):
return attrs
def create(self, validated_data):
request = self.context.get('request', None)
launch_user = request.user if request else None
job_node_data = validated_data.pop('jobs')
wfj_deferred_attr_names = ('skip_tags', 'limit', 'job_tags')
wfj_deferred_vals = {}
for item in wfj_deferred_attr_names:
wfj_deferred_vals[item] = validated_data.pop(item, None)
wfj = WorkflowJob.objects.create(**validated_data, is_bulk_job=True)
wfj = WorkflowJob.objects.create(**validated_data, is_bulk_job=True, launch_type='manual', created_by=launch_user)
for key, val in wfj_deferred_vals.items():
if val:
setattr(wfj, key, val)
@ -4802,7 +4804,7 @@ class BulkJobLaunchSerializer(BaseSerializer):
raise serializers.ValidationError(_(f"Credentials {not_allowed} not found or you don't have permissions to access it"))
def check_label_permission(self, requested_use_labels):
accessible_use_labels = {tup.id for tup in Label.objects.all()}
accessible_use_labels = {tup.id for tup in Label.objects.all(organization__in=Organizations.accessible_pk_qs(request.user, 'read_role'))}
if requested_use_labels - accessible_use_labels:
not_allowed = requested_use_labels - accessible_use_labels
raise serializers.ValidationError(_(f"Labels {not_allowed} not found or you don't have permissions to access it"))

View File

@ -4,12 +4,9 @@ from uuid import uuid4
from awx.api.versioning import reverse
import json
from contextlib import contextmanager
from django.db import connections
from awx.main.models.jobs import JobTemplate
from awx.main.models import Organization, Inventory, WorkflowJob
from awx.main.scheduler import TaskManager
@pytest.mark.django_db
@ -104,13 +101,17 @@ def test_bulk_job_launch_queries(job_template, organization, inventory, project,
with django_assert_max_num_queries(num_queries):
bulk_job_launch_response = post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': jobs}, normal_user, expect=201).data
for u in (org_admin,): # TODO normal_user not working because launched_by not getting set in the tests...does happen in real request
# Run task manager so the workflow job nodes actually spawn
TaskManager().schedule()
for u in (org_admin, normal_user):
bulk_job = get(bulk_job_launch_response['url'], u, expect=200).data
assert organization.id == bulk_job['summary_fields']['organization']['id']
resp = get(bulk_job_launch_response['related']['workflow_nodes'], u)
assert resp.data['count'] == num_jobs
for item in resp.data['results']:
assert item["unified_job_template"] == jt.id
assert item["inventory"] == inventory.id
@pytest.mark.django_db
@ -196,3 +197,26 @@ def test_bulk_job_launch_inventory_no_access(job_template, organization, invento
inv = Inventory.objects.create(name='inv1', organization=org2)
jt.execute_role.members.add(normal_user)
post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=400)
@pytest.mark.django_db
def test_bulk_job_inventory_prompt(job_template, organization, inventory, project, credential, post, get, user):
'''
Job template has an inventory set as prompt_on_launch
and if I provide the inventory as a parameter in bulk job
... job uses that inventory
'''
normal_user = user('normal_user', False)
org1 = Organization.objects.create(name='foo1')
jt = JobTemplate.objects.create(name='my-jt', ask_inventory_on_launch=True, project=project, playbook='helloworld.yml')
jt.save()
org1.member_role.members.add(normal_user)
inv = Inventory.objects.create(name='inv1', organization=org1)
jt.execute_role.members.add(normal_user)
inv.use_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=201
).data
bulk_job_id = bulk_job_launch_response['id']
node = WorkflowJob.objects.get(id=bulk_job_id).workflow_job_nodes.all().order_by('created')
assert inv.id == node[0].inventory.id