include scan job_type errors in criteria for rejecting workflow node prompts

This commit is contained in:
AlanCoding 2016-09-27 17:24:09 -04:00
parent fdca3b41ad
commit a452acf214
4 changed files with 54 additions and 32 deletions

View File

@ -2557,13 +2557,7 @@ class JobLaunchSerializer(BaseSerializer):
errors['variables_needed_to_start'] = validation_errors
# Special prohibited cases for scan jobs
if 'job_type' in data and obj.ask_job_type_on_launch:
if ((obj.job_type == PERM_INVENTORY_SCAN and not data['job_type'] == PERM_INVENTORY_SCAN) or
(data['job_type'] == PERM_INVENTORY_SCAN and not obj.job_type == PERM_INVENTORY_SCAN)):
errors['job_type'] = 'Can not override job_type to or from a scan job.'
if (obj.job_type == PERM_INVENTORY_SCAN and ('inventory' in data) and obj.ask_inventory_on_launch and
obj.inventory != data['inventory']):
errors['inventory'] = 'Inventory can not be changed at runtime for scan jobs.'
errors.update(obj._extra_job_type_errors(data))
if errors:
raise serializers.ValidationError(errors)

View File

@ -34,6 +34,7 @@ from awx.main.redact import PlainTextCleaner
from awx.main.conf import tower_settings
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin
from awx.main.models.base import PERM_INVENTORY_SCAN
logger = logging.getLogger('awx.main.models.jobs')
@ -272,7 +273,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
validation_errors['credential'] = ["Job Template must provide 'credential' or allow prompting for it.",]
# Job type dependent checks
if self.job_type == 'scan':
if self.job_type == PERM_INVENTORY_SCAN:
if self.inventory is None or self.ask_inventory_on_launch:
validation_errors['inventory'] = ["Scan jobs must be assigned a fixed inventory.",]
elif self.project is None:
@ -474,13 +475,24 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
else:
ignored_fields[field] = kwargs[field]
# Special case to ignore inventory if it is a scan job
if prompted_fields.get('job_type', None) == 'scan' or self.job_type == 'scan':
if 'inventory' in prompted_fields:
ignored_fields['inventory'] = prompted_fields.pop('inventory')
return prompted_fields, ignored_fields
def _extra_job_type_errors(self, data):
"""
Used to enforce 2 special cases around scan jobs and prompting
- the inventory can not be changed on a scan job template
- scan jobs can not be switched to run/check type and vice versa
"""
errors = {}
if 'job_type' in data and self.ask_job_type_on_launch:
if ((self.job_type == PERM_INVENTORY_SCAN and not data['job_type'] == PERM_INVENTORY_SCAN) or
(data['job_type'] == PERM_INVENTORY_SCAN and not self.job_type == PERM_INVENTORY_SCAN)):
errors['job_type'] = 'Can not override job_type to or from a scan job.'
if (self.job_type == PERM_INVENTORY_SCAN and ('inventory' in data) and self.ask_inventory_on_launch and
self.inventory != data['inventory']):
errors['inventory'] = 'Inventory can not be changed at runtime for scan jobs.'
return errors
@property
def cache_timeout_blocked(self):
if Job.objects.filter(job_template=self, status__in=['pending', 'waiting', 'running']).count() > getattr(tower_settings, 'SCHEDULE_MAX_JOBS', 10):

View File

@ -85,9 +85,9 @@ class WorkflowNodeBase(CreatedModifiedModel):
def prompts_dict(self):
data = {}
if self.inventory:
data['inventory'] = self.inventory
data['inventory'] = self.inventory.pk
if self.credential:
data['credential'] = self.credential
data['credential'] = self.credential.pk
for fd in CHAR_PROMPTS_LIST:
if fd in self.char_prompts:
data[fd] = self.char_prompts[fd]
@ -103,16 +103,20 @@ class WorkflowNodeBase(CreatedModifiedModel):
return {'ignored': {'all': 'Can not use prompts on unified_job_template that is not type of job template'}}
else:
return {}
accepted_fields, ignored_fields = ujt_obj._accept_or_ignore_job_kwargs(**prompts_dict)
ask_for_vars_dict = ujt_obj._ask_for_vars_dict()
ignored_dict = {}
missing_dict = {}
for fd in prompts_dict:
if not ask_for_vars_dict[fd]:
ignored_dict[fd] = 'Workflow node provided field, but job template is not set to ask on launch'
for fd in ask_for_vars_dict:
ujt_field = getattr(ujt_obj, fd)
if ujt_field is None and prompts_dict.get(fd, None) is None:
for fd in ignored_fields:
ignored_dict[fd] = 'Workflow node provided field, but job template is not set to ask on launch'
scan_errors = ujt_obj._extra_job_type_errors(accepted_fields)
ignored_dict.update(scan_errors)
for fd in ['inventory', 'credential']:
if getattr(ujt_obj, fd) is None and not (ask_for_vars_dict.get(fd, False) and fd in prompts_dict):
missing_dict[fd] = 'Job Template does not have this field and workflow node does not provide it'
data = {}
if ignored_dict:
data['ignored'] = ignored_dict
@ -160,11 +164,11 @@ class WorkflowJobNode(WorkflowNodeBase):
data = {}
ujt_obj = self.unified_job_template
if ujt_obj and hasattr(ujt_obj, '_ask_for_vars_dict'):
ask_for_vars_dict = ujt_obj._ask_for_vars_dict()
prompts_dict = self.prompts_dict()
for fd in prompts_dict:
if ask_for_vars_dict.get(fd, False):
data[fd] = prompts_dict[fd]
accepted_fields, ignored_fields = ujt_obj._accept_or_ignore_job_kwargs(**self.prompts_dict())
for fd in ujt_obj._extra_job_type_errors(accepted_fields):
accepted_fields.pop(fd)
data.update(accepted_fields)
# TODO: decide what to do in the event of missing fields
# process extra_vars
extra_vars = {}
if self.workflow_job and self.workflow_job.extra_vars:

View File

@ -102,7 +102,7 @@ def node_no_prompts(workflow_job_unit, job_template_factory):
def project_unit():
return Project(name='example-proj')
example_prompts = dict(job_type='scan', job_tags='quack', limit='duck', skip_tags='oink')
example_prompts = dict(job_type='check', job_tags='quack', limit='duck', skip_tags='oink')
@pytest.fixture
def node_with_prompts(node_no_prompts):
@ -129,15 +129,15 @@ class TestWorkflowJobNodeJobKWARGS:
def test_char_prompts_and_res_node_prompts(self, node_with_prompts):
assert node_with_prompts.get_job_kwargs() == dict(
inventory=node_with_prompts.inventory,
credential=node_with_prompts.credential,
inventory=node_with_prompts.inventory.pk,
credential=node_with_prompts.credential.pk,
**example_prompts)
def test_reject_some_node_prompts(self, node_with_prompts):
node_with_prompts.unified_job_template.ask_inventory_on_launch = False
node_with_prompts.unified_job_template.ask_job_type_on_launch = False
expect_kwargs = dict(inventory=node_with_prompts.inventory,
credential=node_with_prompts.credential,
expect_kwargs = dict(inventory=node_with_prompts.inventory.pk,
credential=node_with_prompts.credential.pk,
**example_prompts)
expect_kwargs.pop('inventory')
expect_kwargs.pop('job_type')
@ -153,7 +153,7 @@ class TestWorkflowWarnings:
Tests of warnings that show user errors in the construction of a workflow
"""
def test_warn_project_node_no_prompts(self, node_no_prompts, project_unit):
def test_no_warn_project_node_no_prompts(self, node_no_prompts, project_unit):
node_no_prompts.unified_job_template = project_unit
assert node_no_prompts.get_prompts_warnings() == {}
@ -162,6 +162,9 @@ class TestWorkflowWarnings:
assert 'ignored' in node_with_prompts.get_prompts_warnings()
assert 'all' in node_with_prompts.get_prompts_warnings()['ignored']
def test_no_warn_accept_all_prompts(self, node_with_prompts):
assert node_with_prompts.get_prompts_warnings() == {}
def test_warn_reject_some_prompts(self, node_with_prompts):
node_with_prompts.unified_job_template.ask_credential_on_launch = False
node_with_prompts.unified_job_template.ask_job_type_on_launch = False
@ -170,6 +173,15 @@ class TestWorkflowWarnings:
assert 'credential' in node_with_prompts.get_prompts_warnings()['ignored']
assert len(node_with_prompts.get_prompts_warnings()['ignored']) == 2
def test_warn_scan_errors_node_prompts(self, node_with_prompts):
node_with_prompts.unified_job_template.job_type = 'scan'
node_with_prompts.job_type = 'run'
node_with_prompts.inventory = Inventory(name='different-inventory', pk=23)
assert 'ignored' in node_with_prompts.get_prompts_warnings()
assert 'job_type' in node_with_prompts.get_prompts_warnings()['ignored']
assert 'inventory' in node_with_prompts.get_prompts_warnings()['ignored']
assert len(node_with_prompts.get_prompts_warnings()['ignored']) == 2
def test_warn_missing_fields(self, node_no_prompts):
node_no_prompts.inventory = None
assert 'missing' in node_no_prompts.get_prompts_warnings()