mirror of
https://github.com/ansible/awx.git
synced 2026-05-21 07:47:44 -02:30
update permission logic, update job runtime tests
This commit is contained in:
@@ -2119,12 +2119,12 @@ class JobTemplateLaunch(RetrieveAPIView, GenericAPIView):
|
|||||||
|
|
||||||
if 'credential' in prompted_fields and prompted_fields['credential'] != getattrd(obj, 'credential.pk', None):
|
if 'credential' in prompted_fields and prompted_fields['credential'] != getattrd(obj, 'credential.pk', None):
|
||||||
new_credential = Credential.objects.get(pk=prompted_fields['credential'])
|
new_credential = Credential.objects.get(pk=prompted_fields['credential'])
|
||||||
if not request.user.can_access(Credential, 'read', new_credential):
|
if not request.user.can_access(Credential, 'use', new_credential):
|
||||||
raise PermissionDenied()
|
raise PermissionDenied()
|
||||||
|
|
||||||
if 'inventory' in prompted_fields and prompted_fields['inventory'] != getattrd(obj, 'inventory.pk', None):
|
if 'inventory' in prompted_fields and prompted_fields['inventory'] != getattrd(obj, 'inventory.pk', None):
|
||||||
new_inventory = Inventory.objects.get(pk=prompted_fields['inventory'])
|
new_inventory = Inventory.objects.get(pk=prompted_fields['inventory'])
|
||||||
if not request.user.can_access(Inventory, 'read', new_inventory):
|
if not request.user.can_access(Inventory, 'use', new_inventory):
|
||||||
raise PermissionDenied()
|
raise PermissionDenied()
|
||||||
|
|
||||||
kv = prompted_fields
|
kv = prompted_fields
|
||||||
|
|||||||
@@ -41,6 +41,22 @@ def job_template_prompts(project, inventory, machine_credential):
|
|||||||
)
|
)
|
||||||
return rf
|
return rf
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def job_template_prompts_null(project):
|
||||||
|
return JobTemplate.objects.create(
|
||||||
|
job_type='run',
|
||||||
|
project=project,
|
||||||
|
inventory=None,
|
||||||
|
credential=None,
|
||||||
|
name='deploy-job-template',
|
||||||
|
ask_variables_on_launch=True,
|
||||||
|
ask_tags_on_launch=True,
|
||||||
|
ask_job_type_on_launch=True,
|
||||||
|
ask_inventory_on_launch=True,
|
||||||
|
ask_limit_on_launch=True,
|
||||||
|
ask_credential_on_launch=True,
|
||||||
|
)
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, user):
|
def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, user):
|
||||||
@@ -81,7 +97,38 @@ def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, user
|
|||||||
job_template.inventory.save()
|
job_template.inventory.save()
|
||||||
|
|
||||||
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
runtime_data, user('admin', True))
|
runtime_data, admin_user)
|
||||||
|
|
||||||
|
assert response.status_code == 201
|
||||||
|
job_id = response.data['job']
|
||||||
|
job_obj = Job.objects.get(pk=job_id)
|
||||||
|
|
||||||
|
# Check that job data matches the given runtime variables
|
||||||
|
assert 'job_launch_var' in yaml.load(job_obj.extra_vars)
|
||||||
|
assert job_obj.limit == runtime_data['limit']
|
||||||
|
assert job_obj.job_type == runtime_data['job_type']
|
||||||
|
assert job_obj.inventory.pk == runtime_data['inventory']
|
||||||
|
assert job_obj.credential.pk == runtime_data['credential']
|
||||||
|
assert job_obj.job_tags == runtime_data['job_tags']
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.skip(reason="JT can_start without inventory needs to be fixed before passing")
|
||||||
|
@pytest.mark.job_runtime_vars
|
||||||
|
def test_job_accept_prompted_vars_null(runtime_data, job_template_prompts_null, post, user):
|
||||||
|
job_template = job_template_prompts_null
|
||||||
|
common_user = user('admin', False)
|
||||||
|
|
||||||
|
job_template.executor_role.members.add(common_user)
|
||||||
|
job_template.save()
|
||||||
|
job_template.project.member_role.members.add(common_user)
|
||||||
|
job_template.project.save()
|
||||||
|
|
||||||
|
credential = Credential.objects.get(pk=runtime_data['credential'])
|
||||||
|
credential.usage_role.members.add(common_user)
|
||||||
|
credential.save()
|
||||||
|
|
||||||
|
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
|
runtime_data, common_user)
|
||||||
|
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
job_id = response.data['job']
|
job_id = response.data['job']
|
||||||
@@ -140,15 +187,13 @@ def test_job_launch_fails_without_inventory_access(deploy_jobtemplate, machine_c
|
|||||||
deploy_jobtemplate.ask_inventory_on_launch = True
|
deploy_jobtemplate.ask_inventory_on_launch = True
|
||||||
deploy_jobtemplate.credential = machine_credential
|
deploy_jobtemplate.credential = machine_credential
|
||||||
common_user = user('test-user', False)
|
common_user = user('test-user', False)
|
||||||
# TODO: Change admin_role to executor_role once issue #1422 is resolved
|
deploy_jobtemplate.executor_role.members.add(common_user)
|
||||||
deploy_jobtemplate.admin_role.members.add(common_user)
|
|
||||||
deploy_jobtemplate.save()
|
deploy_jobtemplate.save()
|
||||||
deploy_jobtemplate.inventory.executor_role.members.add(common_user)
|
deploy_jobtemplate.inventory.usage_role.members.add(common_user)
|
||||||
deploy_jobtemplate.inventory.save()
|
deploy_jobtemplate.inventory.save()
|
||||||
deploy_jobtemplate.project.member_role.members.add(common_user)
|
deploy_jobtemplate.project.member_role.members.add(common_user)
|
||||||
deploy_jobtemplate.project.save()
|
deploy_jobtemplate.project.save()
|
||||||
# TODO: change owner_role to usage_role after fix
|
deploy_jobtemplate.credential.usage_role.members.add(common_user)
|
||||||
deploy_jobtemplate.credential.owner_role.members.add(common_user)
|
|
||||||
deploy_jobtemplate.credential.save()
|
deploy_jobtemplate.credential.save()
|
||||||
|
|
||||||
# Assure that the base job template can be launched to begin with
|
# Assure that the base job template can be launched to begin with
|
||||||
@@ -215,7 +260,8 @@ def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
|
|||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.job_runtime_vars
|
@pytest.mark.job_runtime_vars
|
||||||
def test_job_launch_unprompted_vars_with_survey(job_template_prompts, post, user):
|
def test_job_launch_unprompted_vars_with_survey(mocker, job_template_prompts, post, user):
|
||||||
|
with mocker.patch('awx.main.access.BaseAccess.check_license', return_value=False):
|
||||||
job_template = job_template_prompts(False)
|
job_template = job_template_prompts(False)
|
||||||
job_template.survey_enabled = True
|
job_template.survey_enabled = True
|
||||||
job_template.survey_spec = {
|
job_template.survey_spec = {
|
||||||
@@ -242,6 +288,7 @@ def test_job_launch_unprompted_vars_with_survey(job_template_prompts, post, user
|
|||||||
reverse('api:job_template_launch', args=[job_template.pk]),
|
reverse('api:job_template_launch', args=[job_template.pk]),
|
||||||
dict(extra_vars={"job_launch_var": 3, "survey_var": 4}),
|
dict(extra_vars={"job_launch_var": 3, "survey_var": 4}),
|
||||||
user('admin', True))
|
user('admin', True))
|
||||||
|
assert response.status_code == 201
|
||||||
|
|
||||||
job_id = response.data['job']
|
job_id = response.data['job']
|
||||||
job_obj = Job.objects.get(pk=job_id)
|
job_obj = Job.objects.get(pk=job_id)
|
||||||
|
|||||||
Reference in New Issue
Block a user