add more functional test for related fields on bulk job and some other minor fixes

fix the functional test

lint fix

functional test fixes
This commit is contained in:
jainnikhil30 2023-02-28 18:38:17 +05:30 committed by Elijah DeLee
parent ede1b9af92
commit f622d3a1e6
2 changed files with 97 additions and 9 deletions

View File

@ -4559,7 +4559,7 @@ class BulkJobNodeSerializer(serializers.Serializer):
scm_branch = serializers.CharField(required=False, write_only=True, allow_blank=False)
verbosity = serializers.IntegerField(required=False, min_value=1)
forks = serializers.IntegerField(required=False, min_value=1)
diff_mode = serializers.CharField(required=False, write_only=True, allow_blank=False)
diff_mode = serializers.BooleanField(required=False, allow_null=True, default=None)
job_tags = serializers.CharField(required=False, write_only=True, allow_blank=False)
job_type = serializers.CharField(required=False, write_only=True, allow_blank=False)
skip_tags = serializers.CharField(required=False, write_only=True, allow_blank=False)
@ -4646,7 +4646,7 @@ class BulkJobLaunchSerializer(BaseSerializer):
self.check_inventory_permission(attrs, request, requested_use_inventories)
if requested_use_labels:
self.check_label_permission(requested_use_labels)
self.check_label_permission(request, requested_use_labels)
if requested_use_instance_groups:
self.check_instance_group_permission(request, requested_use_instance_groups)
@ -4803,8 +4803,8 @@ class BulkJobLaunchSerializer(BaseSerializer):
not_allowed = requested_use_credentials - accessible_use_credentials
raise serializers.ValidationError(_(f"Credentials {not_allowed} not found or you don't have permissions to access it"))
def check_label_permission(self, requested_use_labels):
accessible_use_labels = {tup.id for tup in Label.objects.all(organization__in=Organizations.accessible_pk_qs(request.user, 'read_role'))}
def check_label_permission(self, request, requested_use_labels):
accessible_use_labels = {tup.id for tup in Label.objects.filter(organization__in=Organization.accessible_pk_qs(request.user, 'read_role'))}
if requested_use_labels - accessible_use_labels:
not_allowed = requested_use_labels - accessible_use_labels
raise serializers.ValidationError(_(f"Labels {not_allowed} not found or you don't have permissions to access it"))

View File

@ -5,7 +5,7 @@ from uuid import uuid4
from awx.api.versioning import reverse
from awx.main.models.jobs import JobTemplate
from awx.main.models import Organization, Inventory, WorkflowJob
from awx.main.models import Organization, Inventory, WorkflowJob, ExecutionEnvironment
from awx.main.scheduler import TaskManager
@ -75,6 +75,7 @@ def test_bulk_host_create_rbac(organization, inventory, post, get, user):
bulk_host_create_response = post(
reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': [{'name': f'foobar2-{indx}'}]}, u, expect=400
).data
assert bulk_host_create_response['__all__'][0] == f'Inventory with id {inventory.id} not found or lack permissions to add hosts.'
@pytest.mark.django_db
@ -124,7 +125,10 @@ def test_bulk_job_launch_no_access_to_job_template(job_template, organization, i
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
organization.member_role.members.add(normal_user)
post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == f'Unified Job Templates {{{jt.id}}} not found or you don\'t have permissions to access it'
@pytest.mark.django_db
@ -137,7 +141,10 @@ def test_bulk_job_launch_no_org_assigned(job_template, organization, inventory,
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
jt.execute_role.members.add(normal_user)
post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == 'User not part of any organization, please assign an organization to assign to the bulk job'
@pytest.mark.django_db
@ -155,7 +162,10 @@ def test_bulk_job_launch_multiple_org_assigned(job_template, organization, inven
jt = JobTemplate.objects.create(name='my-jt', inventory=inventory, project=project, playbook='helloworld.yml')
jt.save()
jt.execute_role.members.add(normal_user)
post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == 'User has permission to multiple Organizations, please set one of them in the request'
@pytest.mark.django_db
@ -196,7 +206,10 @@ def test_bulk_job_launch_inventory_no_access(job_template, organization, invento
org1.member_role.members.add(normal_user)
inv = Inventory.objects.create(name='inv1', organization=org2)
jt.execute_role.members.add(normal_user)
post(reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=400)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'), {'name': 'Bulk Job Launch', 'jobs': [{'unified_job_template': jt.id, 'inventory': inv.id}]}, normal_user, expect=400
).data
assert bulk_job_launch_response['__all__'][0] == f'Inventories {{{inv.id}}} not found or you don\'t have permissions to access it'
@pytest.mark.django_db
@ -220,3 +233,78 @@ def test_bulk_job_inventory_prompt(job_template, organization, inventory, projec
bulk_job_id = bulk_job_launch_response['id']
node = WorkflowJob.objects.get(id=bulk_job_id).workflow_job_nodes.all().order_by('created')
assert inv.id == node[0].inventory.id
@pytest.mark.django_db
def test_bulk_job_set_all_prompt(job_template, organization, inventory, project, credentialtype_ssh, post, get, user):
'''
Job template has many fields set as prompt_on_launch
and if I provide all those fields as a parameter in bulk job
... job uses them
'''
normal_user = user('normal_user', False)
jt = JobTemplate.objects.create(
name='my-jt',
ask_inventory_on_launch=True,
ask_diff_mode_on_launch=True,
ask_job_type_on_launch=True,
ask_verbosity_on_launch=True,
ask_execution_environment_on_launch=True,
ask_forks_on_launch=True,
ask_job_slice_count_on_launch=True,
ask_timeout_on_launch=True,
ask_variables_on_launch=True,
ask_scm_branch_on_launch=True,
ask_limit_on_launch=True,
ask_skip_tags_on_launch=True,
ask_tags_on_launch=True,
project=project,
playbook='helloworld.yml',
)
jt.save()
organization.member_role.members.add(normal_user)
inv = Inventory.objects.create(name='inv1', organization=organization)
ee = ExecutionEnvironment.objects.create(name='test-ee', image='quay.io/foo/bar')
jt.execute_role.members.add(normal_user)
inv.use_role.members.add(normal_user)
bulk_job_launch_response = post(
reverse('api:bulk_job_launch'),
{
'name': 'Bulk Job Launch',
'jobs': [
{
'unified_job_template': jt.id,
'inventory': inv.id,
'diff_mode': True,
'job_type': 'check',
'verbosity': 3,
'execution_environment': ee.id,
'forks': 1,
'job_slice_count': 1,
'timeout': 200,
'extra_data': {'prompted_key': 'prompted_val'},
'scm_branch': 'non_dev',
'limit': 'kansas',
'skip_tags': 'foobar',
'job_tags': 'untagged',
}
],
},
normal_user,
expect=201,
).data
bulk_job_id = bulk_job_launch_response['id']
node = WorkflowJob.objects.get(id=bulk_job_id).workflow_job_nodes.all().order_by('created')
assert node[0].inventory.id == inv.id
assert node[0].diff_mode == True
assert node[0].job_type == 'check'
assert node[0].verbosity == 3
assert node[0].execution_environment.id == ee.id
assert node[0].forks == 1
assert node[0].job_slice_count == 1
assert node[0].timeout == 200
assert node[0].extra_data == {'prompted_key': 'prompted_val'}
assert node[0].scm_branch == 'non_dev'
assert node[0].limit == 'kansas'
assert node[0].skip_tags == 'foobar'
assert node[0].job_tags == 'untagged'