diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 9613d2e2d2..48f2203521 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2295,10 +2295,10 @@ class JobLaunchSerializer(BaseSerializer): # Special prohibited cases for scan jobs if 'job_type' in data and obj.ask_job_type_on_launch: - if ((obj.job_type==PERM_INVENTORY_SCAN and not data['job_type']==PERM_INVENTORY_SCAN) or - (data['job_type']==PERM_INVENTORY_SCAN and not obj['job_type']==PERM_INVENTORY_SCAN)): + if ((obj.job_type == PERM_INVENTORY_SCAN and not data['job_type'] == PERM_INVENTORY_SCAN) or + (data['job_type'] == PERM_INVENTORY_SCAN and not obj.job_type == PERM_INVENTORY_SCAN)): errors['job_type'] = 'Can not override job_type to or from a scan job.' - if (obj.job_type==PERM_INVENTORY_SCAN and ('inventory' in data) and obj.ask_inventory_on_launch and + if (obj.job_type == PERM_INVENTORY_SCAN and ('inventory' in data) and obj.ask_inventory_on_launch and obj.inventory != data['inventory']): errors['inventory'] = 'Inventory can not be changed at runtime for scan jobs.' diff --git a/awx/main/tests/functional/api/test_job_runtime_params.py b/awx/main/tests/functional/api/test_job_runtime_params.py index 37e7b68f9e..cfda95213a 100644 --- a/awx/main/tests/functional/api/test_job_runtime_params.py +++ b/awx/main/tests/functional/api/test_job_runtime_params.py @@ -60,6 +60,14 @@ def job_template_prompts_null(project): ask_credential_on_launch=True, ) +@pytest.fixture +def bad_scan_JT(job_template_prompts): + job_template = job_template_prompts(True) + job_template.job_type = 'scan' + job_template.save() + return job_template + +# End of setup, tests start here @pytest.mark.django_db @pytest.mark.job_runtime_vars def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, user, mocker): @@ -109,25 +117,24 @@ def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, user @pytest.mark.django_db @pytest.mark.job_runtime_vars -def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null, post, user, mocker): +def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null, post, rando, mocker): job_template = job_template_prompts_null - common_user = user('not-admin', False) # Give user permission to execute the job template - job_template.execute_role.members.add(common_user) + job_template.execute_role.members.add(rando) # Give user permission to use inventory and credential at runtime credential = Credential.objects.get(pk=runtime_data['credential']) - credential.use_role.members.add(common_user) + credential.use_role.members.add(rando) inventory = Inventory.objects.get(pk=runtime_data['inventory']) - inventory.use_role.members.add(common_user) + inventory.use_role.members.add(rando) mock_job = mocker.MagicMock(spec=Job, id=968, **runtime_data) with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.create_unified_job', return_value=mock_job): with mocker.patch('awx.api.serializers.JobSerializer.to_representation'): response = post(reverse('api:job_template_launch', args=[job_template.pk]), - runtime_data, common_user) + runtime_data, rando) assert response.status_code == 201 job_id = response.data['job'] @@ -175,34 +182,53 @@ def test_job_launch_fails_without_inventory(deploy_jobtemplate, post, user): @pytest.mark.django_db @pytest.mark.job_runtime_vars -def test_job_launch_fails_without_inventory_access(job_template_prompts, runtime_data, post, user): +def test_job_launch_fails_without_inventory_access(job_template_prompts, runtime_data, post, rando): job_template = job_template_prompts(True) - common_user = user('test-user', False) - job_template.execute_role.members.add(common_user) + job_template.execute_role.members.add(rando) # Assure that giving an inventory without access to the inventory blocks the launch - runtime_inventory = Inventory.objects.get(pk=runtime_data['inventory']) response = post(reverse('api:job_template_launch', args=[job_template.pk]), - dict(inventory=runtime_inventory.pk), common_user) + dict(inventory=runtime_data['inventory']), rando) assert response.status_code == 403 assert response.data['detail'] == u'You do not have permission to perform this action.' @pytest.mark.django_db @pytest.mark.job_runtime_vars -def test_job_launch_fails_without_credential_access(job_template_prompts, runtime_data, post, user): +def test_job_launch_fails_without_credential_access(job_template_prompts, runtime_data, post, rando): job_template = job_template_prompts(True) - common_user = user('test-user', False) - job_template.execute_role.members.add(common_user) + job_template.execute_role.members.add(rando) # Assure that giving a credential without access blocks the launch - runtime_credential = Credential.objects.get(pk=runtime_data['credential']) response = post(reverse('api:job_template_launch', args=[job_template.pk]), - dict(credential=runtime_credential.pk), common_user) + dict(credential=runtime_data['credential']), rando) assert response.status_code == 403 assert response.data['detail'] == u'You do not have permission to perform this action.' +@pytest.mark.django_db +@pytest.mark.job_runtime_vars +def test_job_block_scan_job_type_change(job_template_prompts, post, admin_user): + job_template = job_template_prompts(True) + + # Assure that changing the type of a scan job blocks the launch + response = post(reverse('api:job_template_launch', args=[job_template.pk]), + dict(job_type='scan'), admin_user) + + assert response.status_code == 400 + assert 'job_type' in response.data + +@pytest.mark.django_db +@pytest.mark.job_runtime_vars +def test_job_block_scan_job_inv_change(mocker, bad_scan_JT, runtime_data, post, admin_user): + # Assure that giving a new inventory for a scan job blocks the launch + with mocker.patch('awx.main.access.BaseAccess.check_license', return_value=True): + response = post(reverse('api:job_template_launch', args=[bad_scan_JT.pk]), + dict(inventory=runtime_data['inventory']), admin_user) + + assert response.status_code == 400 + assert 'inventory' in response.data + @pytest.mark.django_db @pytest.mark.job_runtime_vars def test_job_relaunch_copy_vars(job_with_links, machine_credential, inventory,