mirror of
https://github.com/ansible/awx.git
synced 2026-07-02 11:58:03 -02:30
ask_for_inventory permissions and runtime tests finished
cleanup prompt-for additions update migration after rebase
This commit is contained in:
@@ -8,7 +8,7 @@ from django.conf import settings
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0012_v300_create_labels'),
|
||||
('main', '0013_v300_label_changes'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
@@ -378,10 +378,11 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
|
||||
kwargs['extra_vars'] = json.dumps(extra_vars)
|
||||
return kwargs
|
||||
|
||||
def _accept_or_ignore_job_kwargs(self, user, **kwargs):
|
||||
def _accept_or_ignore_job_kwargs(self, **kwargs):
|
||||
# Sort the runtime fields allowed and disallowed by job template
|
||||
ignored_fields = {}
|
||||
prompted_fields = {}
|
||||
|
||||
if 'extra_vars' in kwargs:
|
||||
prompted_fields['extra_vars'] = {}
|
||||
ignored_fields['extra_vars'] = {}
|
||||
@@ -401,40 +402,26 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
|
||||
# No survey & prompt flag is false - ignore all
|
||||
ignored_fields['extra_vars'] = kwargs['extra_vars']
|
||||
|
||||
if 'limit' in kwargs:
|
||||
if self.ask_limit_on_launch:
|
||||
prompted_fields['limit'] = kwargs['limit']
|
||||
else:
|
||||
ignored_fields['limit'] = kwargs['limit']
|
||||
# Fields which all follow the same pattern
|
||||
ask_for_field_dict = dict(
|
||||
limit=self.ask_limit_on_launch,
|
||||
job_tags=self.ask_tags_on_launch,
|
||||
skip_tags=self.ask_tags_on_launch,
|
||||
job_type=self.ask_job_type_on_launch,
|
||||
inventory=self.ask_inventory_on_launch
|
||||
)
|
||||
|
||||
if 'job_tags' or 'skip_tags' in kwargs:
|
||||
if self.ask_tags_on_launch:
|
||||
if 'job_tags' in kwargs:
|
||||
prompted_fields['job_tags'] = kwargs['job_tags']
|
||||
if 'skip_tags' in kwargs:
|
||||
prompted_fields['skip_tags'] = kwargs['skip_tags']
|
||||
else:
|
||||
if 'job_tags' in kwargs:
|
||||
ignored_fields['job_tags'] = kwargs['job_tags']
|
||||
if 'skip_tags' in kwargs:
|
||||
ignored_fields['skip_tags'] = kwargs['skip_tags']
|
||||
|
||||
if 'job_type' in kwargs:
|
||||
if self.ask_job_type_on_launch:
|
||||
prompted_fields['job_type'] = kwargs['job_type']
|
||||
else:
|
||||
ignored_fields['job_type'] = kwargs['job_type']
|
||||
|
||||
if 'inventory' in kwargs:
|
||||
inv_id = kwargs['inventory']
|
||||
if self.ask_inventory_on_launch:
|
||||
from awx.main.models.inventory import Inventory
|
||||
if Inventory.objects.get(pk=inv_id).accessible_by(user, {'write': True}):
|
||||
prompted_fields['inventory'] = inv_id
|
||||
for field in ask_for_field_dict:
|
||||
if field in kwargs:
|
||||
if ask_for_field_dict[field]:
|
||||
prompted_fields[field] = kwargs[field]
|
||||
else:
|
||||
ignored_fields['inventory'] = inv_id
|
||||
else:
|
||||
ignored_fields['inventory'] = inv_id
|
||||
ignored_fields[field] = kwargs[field]
|
||||
|
||||
if prompted_fields.get('job_type', None) == 'scan' or self.job_type == 'scan':
|
||||
if 'inventory' in prompted_fields:
|
||||
ignored_fields['inventory'] = prompted_fields.pop('inventory')
|
||||
|
||||
return prompted_fields, ignored_fields
|
||||
|
||||
@property
|
||||
|
||||
@@ -3,29 +3,35 @@ import yaml
|
||||
|
||||
from awx.api.serializers import JobLaunchSerializer
|
||||
from awx.main.models.credential import Credential
|
||||
from awx.main.models.inventory import Inventory
|
||||
from awx.main.models.jobs import Job, JobTemplate
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
from copy import copy
|
||||
|
||||
@pytest.fixture
|
||||
def runtime_data():
|
||||
def runtime_data(organization):
|
||||
cred_obj = Credential.objects.create(name='runtime-cred', kind='ssh', username='test_user2', password='pas4word2')
|
||||
inv_obj = organization.inventories.create(name="runtime-inv")
|
||||
return dict(
|
||||
extra_vars='{"job_launch_var": 4}',
|
||||
limit='test-servers',
|
||||
job_type='check',
|
||||
inventory=cred_obj.pk,
|
||||
job_tags='["provision"]',
|
||||
skip_tags='["restart"]',
|
||||
extra_vars='{"job_launch_var": 4}'
|
||||
job_tags='provision',
|
||||
skip_tags='restart',
|
||||
inventory=inv_obj.pk,
|
||||
credential=cred_obj.pk,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
def job_template_prompts(project, inventory, machine_credential):
|
||||
def rf(on_off):
|
||||
return JobTemplate.objects.create(
|
||||
job_type='run', project=project, inventory=inventory,
|
||||
credential=machine_credential, name='deploy-job-template',
|
||||
job_type='run',
|
||||
project=project,
|
||||
inventory=inventory,
|
||||
credential=machine_credential,
|
||||
name='deploy-job-template',
|
||||
ask_variables_on_launch=on_off,
|
||||
ask_tags_on_launch=on_off,
|
||||
ask_job_type_on_launch=on_off,
|
||||
@@ -34,42 +40,25 @@ def job_template_prompts(project, inventory, machine_credential):
|
||||
)
|
||||
return rf
|
||||
|
||||
# Probably remove this test after development is finished
|
||||
@pytest.mark.django_db
|
||||
def test_job_launch_prompts_echo(job_template_prompts, get, user):
|
||||
job_template = job_template_prompts(True)
|
||||
assert job_template.ask_variables_on_launch
|
||||
|
||||
url = reverse('api:job_template_launch', args=[job_template.pk])
|
||||
|
||||
response = get(
|
||||
reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
user('admin', True))
|
||||
|
||||
# Just checking that the GET response has what we expect
|
||||
assert response.data['ask_variables_on_launch']
|
||||
assert response.data['ask_tags_on_launch']
|
||||
assert response.data['ask_job_type_on_launch']
|
||||
assert response.data['ask_inventory_on_launch']
|
||||
assert response.data['ask_limit_on_launch']
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, user):
|
||||
job_template = job_template_prompts(False)
|
||||
job_template_saved = copy(job_template)
|
||||
|
||||
response = post(
|
||||
reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
runtime_data, user('admin', True))
|
||||
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
runtime_data, user('admin', True))
|
||||
|
||||
assert response.status_code == 202
|
||||
job_id = response.data['job']
|
||||
job_obj = Job.objects.get(pk=job_id)
|
||||
|
||||
# Check that job data matches job_template data
|
||||
assert len(yaml.load(job_obj.extra_vars)) == 0
|
||||
assert job_obj.limit == job_template.limit
|
||||
assert job_obj.job_type == job_template.job_type
|
||||
assert job_obj.inventory.pk == job_template.inventory.pk
|
||||
assert job_obj.job_tags == job_template.job_tags
|
||||
assert job_obj.limit == job_template_saved.limit
|
||||
assert job_obj.job_type == job_template_saved.job_type
|
||||
assert job_obj.inventory.pk == job_template_saved.inventory.pk
|
||||
assert job_obj.job_tags == job_template_saved.job_tags
|
||||
|
||||
# Check that response tells us what things were ignored
|
||||
assert 'job_launch_var' in response.data['ignored_fields']['extra_vars']
|
||||
@@ -80,13 +69,18 @@ def test_job_ignore_unprompted_vars(runtime_data, job_template_prompts, post, us
|
||||
assert 'skip_tags' in response.data['ignored_fields']
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, user):
|
||||
job_template = job_template_prompts(True)
|
||||
admin_user = user('admin', True)
|
||||
|
||||
response = post(
|
||||
reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
runtime_data, user('admin', True))
|
||||
job_template.inventory.executor_role.members.add(admin_user)
|
||||
job_template.inventory.save()
|
||||
|
||||
response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
runtime_data, user('admin', True))
|
||||
|
||||
assert response.status_code == 202
|
||||
job_id = response.data['job']
|
||||
job_obj = Job.objects.get(pk=job_id)
|
||||
|
||||
@@ -97,6 +91,101 @@ def test_job_accept_prompted_vars(runtime_data, job_template_prompts, post, user
|
||||
assert job_obj.inventory.pk == runtime_data['inventory']
|
||||
assert job_obj.job_tags == runtime_data['job_tags']
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_reject_invalid_prompted_vars(runtime_data, job_template_prompts, post, user):
|
||||
job_template = job_template_prompts(True)
|
||||
|
||||
response = post(
|
||||
reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
dict(job_type='foobicate', # foobicate is not a valid job type
|
||||
inventory=87865), user('admin', True))
|
||||
|
||||
assert response.status_code == 400
|
||||
assert response.data['job_type'] == [u'"foobicate" is not a valid choice.']
|
||||
assert response.data['inventory'] == [u'Invalid pk "87865" - object does not exist.']
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_reject_invalid_prompted_extra_vars(runtime_data, job_template_prompts, post, user):
|
||||
job_template = job_template_prompts(True)
|
||||
|
||||
response = post(
|
||||
reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
dict(extra_vars='{"unbalanced brackets":'), user('admin', True))
|
||||
|
||||
assert response.status_code == 400
|
||||
assert response.data['extra_vars'] == ['Must be valid JSON or YAML']
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_launch_fails_without_inventory(deploy_jobtemplate, post, user):
|
||||
deploy_jobtemplate.inventory = None
|
||||
deploy_jobtemplate.save()
|
||||
|
||||
response = post(reverse('api:job_template_launch',
|
||||
args=[deploy_jobtemplate.pk]), {}, user('admin', True))
|
||||
|
||||
assert response.status_code == 400
|
||||
assert response.data['inventory'] == ['Job Template Inventory is missing or undefined']
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_launch_fails_without_inventory_access(deploy_jobtemplate, machine_credential, post, user):
|
||||
deploy_jobtemplate.ask_inventory_on_launch = True
|
||||
deploy_jobtemplate.credential = machine_credential
|
||||
common_user = user('test-user', False)
|
||||
# TODO: Change admin_role to executor_role once issue #1422 is resolved
|
||||
deploy_jobtemplate.admin_role.members.add(common_user)
|
||||
deploy_jobtemplate.save()
|
||||
deploy_jobtemplate.inventory.executor_role.members.add(common_user)
|
||||
deploy_jobtemplate.inventory.save()
|
||||
deploy_jobtemplate.project.member_role.members.add(common_user)
|
||||
deploy_jobtemplate.project.save()
|
||||
# TODO: change owner_role to usage_role after fix
|
||||
deploy_jobtemplate.credential.owner_role.members.add(common_user)
|
||||
deploy_jobtemplate.credential.save()
|
||||
|
||||
# Assure that the base job template can be launched to begin with
|
||||
response = post(reverse('api:job_template_launch',
|
||||
args=[deploy_jobtemplate.pk]), {}, common_user)
|
||||
|
||||
assert response.status_code == 202
|
||||
|
||||
# Assure that giving an inventory without access to the inventory blocks the launch
|
||||
new_inv = deploy_jobtemplate.project.organization.inventories.create(name="user-can-not-use")
|
||||
response = post(reverse('api:job_template_launch', args=[deploy_jobtemplate.pk]),
|
||||
dict(inventory=new_inv.pk), common_user)
|
||||
|
||||
assert response.status_code == 403
|
||||
assert response.data['detail'] == u'You do not have permission to perform this action.'
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_relaunch_prompted_vars(runtime_data, job_template_prompts, post, user):
|
||||
job_template = job_template_prompts(True)
|
||||
admin_user = user('admin', True)
|
||||
|
||||
# Launch job, overwriting several JT fields
|
||||
first_response = post(reverse('api:job_template_launch', args=[job_template.pk]),
|
||||
runtime_data, admin_user)
|
||||
|
||||
assert first_response.status_code == 202
|
||||
original_job = Job.objects.get(pk=first_response.data['job'])
|
||||
|
||||
# Launch a second job as a relaunch of the first
|
||||
second_response = post(reverse('api:job_relaunch', args=[original_job.pk]),
|
||||
{}, admin_user)
|
||||
relaunched_job = Job.objects.get(pk=second_response.data['job'])
|
||||
|
||||
# Check that job data matches the original runtime variables
|
||||
assert first_response.status_code == 202
|
||||
assert 'job_launch_var' in yaml.load(relaunched_job.extra_vars)
|
||||
assert relaunched_job.limit == runtime_data['limit']
|
||||
assert relaunched_job.job_type == runtime_data['job_type']
|
||||
assert relaunched_job.inventory.pk == runtime_data['inventory']
|
||||
assert relaunched_job.job_tags == runtime_data['job_tags']
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
|
||||
deploy_jobtemplate.extra_vars = '{"job_template_var": 3}'
|
||||
@@ -118,6 +207,7 @@ def test_job_launch_JT_with_validation(machine_credential, deploy_jobtemplate):
|
||||
assert 'job_launch_var' in final_job_extra_vars
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.job_runtime_vars
|
||||
def test_job_launch_unprompted_vars_with_survey(job_template_prompts, post, user):
|
||||
job_template = job_template_prompts(False)
|
||||
job_template.survey_enabled = True
|
||||
@@ -149,11 +239,7 @@ def test_job_launch_unprompted_vars_with_survey(job_template_prompts, post, user
|
||||
job_id = response.data['job']
|
||||
job_obj = Job.objects.get(pk=job_id)
|
||||
|
||||
# Check that the survey variable is accept and the job variable isn't
|
||||
# Check that the survey variable is accepted and the job variable isn't
|
||||
job_extra_vars = yaml.load(job_obj.extra_vars)
|
||||
assert 'job_launch_var' not in job_extra_vars
|
||||
assert 'survey_var' in job_extra_vars
|
||||
|
||||
# To add:
|
||||
# permissions testing (can't provide inventory you can't run against)
|
||||
# credentials/password test if they will be included in response format
|
||||
|
||||
Reference in New Issue
Block a user