Merge pull request #3553 from AlanCoding/wf_rbac_prompt

workflows RBAC and prompting
This commit is contained in:
Alan Rominger 2016-10-03 15:34:47 -04:00 committed by GitHub
commit f23a87e5a0
15 changed files with 847 additions and 146 deletions

View File

@ -40,6 +40,7 @@ from awx.main.models import * # noqa
from awx.main.access import get_user_capabilities
from awx.main.fields import ImplicitRoleField
from awx.main.utils import get_type_for_model, get_model_for_type, build_url, timestamp_apiformat, camelcase_to_underscore, getattrd
from awx.main.validators import vars_validate_or_raise
from awx.conf.license import feature_enabled
from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, EncryptedPasswordField, VerbatimField
@ -1013,14 +1014,7 @@ class ProjectUpdateCancelSerializer(ProjectUpdateSerializer):
class BaseSerializerWithVariables(BaseSerializer):
def validate_variables(self, value):
try:
json.loads(value.strip() or '{}')
except ValueError:
try:
yaml.safe_load(value)
except yaml.YAMLError:
raise serializers.ValidationError('Must be valid JSON or YAML.')
return value
return vars_validate_or_raise(value)
class InventorySerializer(BaseSerializerWithVariables):
@ -1347,18 +1341,7 @@ class InventorySourceOptionsSerializer(BaseSerializer):
return res
def validate_source_vars(self, value):
# source_env must be blank, a valid JSON or YAML dict, or ...
try:
json.loads((value or '').strip() or '{}')
return value
except ValueError:
pass
try:
yaml.safe_load(value)
return value
except yaml.YAMLError:
pass
raise serializers.ValidationError('Must be valid JSON or YAML.')
return vars_validate_or_raise(value)
def validate(self, attrs):
# TODO: Validate source, validate source_regions
@ -1924,18 +1907,7 @@ class JobTemplateSerializer(UnifiedJobTemplateSerializer, JobOptionsSerializer):
return super(JobTemplateSerializer, self).validate(attrs)
def validate_extra_vars(self, value):
# extra_vars must be blank, a valid JSON or YAML dict, or ...
try:
json.loads((value or '').strip() or '{}')
return value
except ValueError:
pass
try:
yaml.safe_load(value)
return value
except yaml.YAMLError:
pass
raise serializers.ValidationError('Must be valid JSON or YAML.')
return vars_validate_or_raise(value)
class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
@ -2201,9 +2173,11 @@ class SystemJobCancelSerializer(SystemJobSerializer):
fields = ('can_cancel',)
class WorkflowJobTemplateSerializer(UnifiedJobTemplateSerializer):
show_capabilities = ['start', 'edit', 'delete']
class Meta:
model = WorkflowJobTemplate
fields = ('*',)
fields = ('*', 'extra_vars', 'organization')
def get_related(self, obj):
res = super(WorkflowJobTemplateSerializer, self).get_related(obj)
@ -2220,6 +2194,9 @@ class WorkflowJobTemplateSerializer(UnifiedJobTemplateSerializer):
))
return res
def validate_extra_vars(self, value):
return vars_validate_or_raise(value)
# TODO:
class WorkflowJobTemplateListSerializer(WorkflowJobTemplateSerializer):
pass
@ -2251,10 +2228,15 @@ class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer)
pass
class WorkflowNodeBaseSerializer(BaseSerializer):
job_type = serializers.SerializerMethodField()
job_tags = serializers.SerializerMethodField()
limit = serializers.SerializerMethodField()
skip_tags = serializers.SerializerMethodField()
class Meta:
# TODO: workflow_job and job read-only
fields = ('id', 'url', 'related', 'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes',)
fields = ('id', 'url', 'related', 'unified_job_template',
'inventory', 'credential', 'job_type', 'job_tags', 'skip_tags', 'limit', 'skip_tags')
read_only_fields = ('success_nodes', 'failure_nodes', 'always_nodes')
def get_related(self, obj):
res = super(WorkflowNodeBaseSerializer, self).get_related(obj)
@ -2262,6 +2244,19 @@ class WorkflowNodeBaseSerializer(BaseSerializer):
res['unified_job_template'] = obj.unified_job_template.get_absolute_url()
return res
def get_job_type(self, obj):
return obj.char_prompts.get('job_type', None)
def get_job_tags(self, obj):
return obj.char_prompts.get('job_tags', None)
def get_skip_tags(self, obj):
return obj.char_prompts.get('skip_tags', None)
def get_limit(self, obj):
return obj.char_prompts.get('limit', None)
class WorkflowJobTemplateNodeSerializer(WorkflowNodeBaseSerializer):
class Meta:
model = WorkflowJobTemplateNode
@ -2276,6 +2271,46 @@ class WorkflowJobTemplateNodeSerializer(WorkflowNodeBaseSerializer):
res['workflow_job_template'] = reverse('api:workflow_job_template_detail', args=(obj.workflow_job_template.pk,))
return res
def to_internal_value(self, data):
internal_value = super(WorkflowNodeBaseSerializer, self).to_internal_value(data)
view = self.context.get('view', None)
request_method = None
if view and view.request:
request_method = view.request.method
if request_method in ['PATCH']:
obj = view.get_object()
char_prompts = copy.copy(obj.char_prompts)
char_prompts.update(self.extract_char_prompts(data))
else:
char_prompts = self.extract_char_prompts(data)
for fd in copy.copy(char_prompts):
if char_prompts[fd] is None:
char_prompts.pop(fd)
internal_value['char_prompts'] = char_prompts
return internal_value
def extract_char_prompts(self, data):
char_prompts = {}
for fd in ['job_type', 'job_tags', 'skip_tags', 'limit']:
# Accept null values, if given
if fd in data:
char_prompts[fd] = data[fd]
return char_prompts
def validate(self, attrs):
if 'char_prompts' in attrs:
if 'job_type' in attrs['char_prompts']:
job_types = [t for t, v in JOB_TYPE_CHOICES]
if attrs['char_prompts']['job_type'] not in job_types:
raise serializers.ValidationError({
"job_type": "%s is not a valid job type. The choices are %s." % (
attrs['char_prompts']['job_type'], job_types)})
ujt_obj = attrs.get('unified_job_template', None)
if isinstance(ujt_obj, (WorkflowJobTemplate, SystemJobTemplate)):
raise serializers.ValidationError({
"unified_job_template": "Can not nest a %s inside a WorkflowJobTemplate" % ujt_obj.__class__.__name__})
return super(WorkflowJobTemplateNodeSerializer, self).validate(attrs)
class WorkflowJobNodeSerializer(WorkflowNodeBaseSerializer):
class Meta:
model = WorkflowJobNode
@ -2524,13 +2559,7 @@ class JobLaunchSerializer(BaseSerializer):
errors['variables_needed_to_start'] = validation_errors
# Special prohibited cases for scan jobs
if 'job_type' in data and obj.ask_job_type_on_launch:
if ((obj.job_type == PERM_INVENTORY_SCAN and not data['job_type'] == PERM_INVENTORY_SCAN) or
(data['job_type'] == PERM_INVENTORY_SCAN and not obj.job_type == PERM_INVENTORY_SCAN)):
errors['job_type'] = 'Can not override job_type to or from a scan job.'
if (obj.job_type == PERM_INVENTORY_SCAN and ('inventory' in data) and obj.ask_inventory_on_launch and
obj.inventory != data['inventory']):
errors['inventory'] = 'Inventory can not be changed at runtime for scan jobs.'
errors.update(obj._extra_job_type_errors(data))
if errors:
raise serializers.ValidationError(errors)

View File

@ -2609,13 +2609,13 @@ class JobTemplateObjectRolesList(SubListAPIView):
content_type = ContentType.objects.get_for_model(self.parent_model)
return Role.objects.filter(content_type=content_type, object_id=po.pk)
class WorkflowJobNodeList(ListCreateAPIView):
class WorkflowJobNodeList(ListAPIView):
model = WorkflowJobNode
serializer_class = WorkflowJobNodeListSerializer
new_in_310 = True
class WorkflowJobNodeDetail(RetrieveUpdateDestroyAPIView):
class WorkflowJobNodeDetail(RetrieveAPIView):
model = WorkflowJobNode
serializer_class = WorkflowJobNodeDetailSerializer
@ -2633,6 +2633,16 @@ class WorkflowJobTemplateNodeDetail(RetrieveUpdateDestroyAPIView):
serializer_class = WorkflowJobTemplateNodeDetailSerializer
new_in_310 = True
def update_raw_data(self, data):
for fd in ['job_type', 'job_tags', 'skip_tags', 'limit', 'skip_tags']:
data[fd] = None
try:
obj = self.get_object()
data.update(obj.char_prompts)
except:
pass
return super(WorkflowJobTemplateNodeDetail, self).update_raw_data(data)
class WorkflowJobTemplateNodeChildrenBaseList(EnforceParentRelationshipMixin, SubListCreateAttachDetachAPIView):
@ -2724,7 +2734,10 @@ class WorkflowJobTemplateLaunch(GenericAPIView):
serializer_class = EmptySerializer
def get(self, request, *args, **kwargs):
return Response({})
data = {}
obj = self.get_object()
data['warnings'] = obj.get_warnings()
return Response(data)
def post(self, request, *args, **kwargs):
obj = self.get_object()
@ -2741,7 +2754,6 @@ class WorkflowJobTemplateWorkflowNodesList(SubListCreateAPIView):
model = WorkflowJobTemplateNode
serializer_class = WorkflowJobTemplateNodeListSerializer
always_allow_superuser = True # TODO: RBAC
parent_model = WorkflowJobTemplate
relationship = 'workflow_job_template_nodes'
parent_key = 'workflow_job_template'
@ -2755,17 +2767,11 @@ class WorkflowJobTemplateJobsList(SubListAPIView):
relationship = 'jobs'
parent_key = 'workflow_job_template'
# TODO:
class WorkflowJobList(ListCreateAPIView):
model = WorkflowJob
serializer_class = WorkflowJobListSerializer
def get(self, request, *args, **kwargs):
if not request.user.is_superuser and not request.user.is_system_auditor:
raise PermissionDenied("Superuser privileges needed.")
return super(WorkflowJobList, self).get(request, *args, **kwargs)
# TODO:
class WorkflowJobDetail(RetrieveDestroyAPIView):

View File

@ -1245,68 +1245,140 @@ class SystemJobAccess(BaseAccess):
# TODO:
class WorkflowJobTemplateNodeAccess(BaseAccess):
'''
I can see/use a WorkflowJobTemplateNode if I have permission to associated Workflow Job Template
I can see/use a WorkflowJobTemplateNode if I have read permission
to associated Workflow Job Template
In order to add a node, I need:
- admin access to parent WFJT
- execute access to the unified job template being used
- access to any credential or inventory provided as the prompted fields
In order to do anything to a node, I need admin access to its WFJT
In order to edit fields on a node, I need:
- execute access to the unified job template of the node
- access to BOTH credential and inventory post-change, if present
In order to delete a node, I only need the admin access its WFJT
In order to manage connections (edges) between nodes I do not need anything
beyond the standard admin access to its WFJT
'''
model = WorkflowJobTemplateNode
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
return self.model.objects.all()
qs = self.model.objects.all()
else:
qs = self.model.objects.filter(
workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
self.user, 'read_role'))
return qs
@check_superuser
def can_read(self, obj):
def can_use_prompted_resources(self, data):
cred_pk = data.get('credential', None)
inv_pk = data.get('inventory', None)
if cred_pk:
credential = get_object_or_400(Credential, pk=cred_pk)
if self.user not in credential.use_role:
return False
if inv_pk:
inventory = get_object_or_400(Inventory, pk=inv_pk)
if self.user not in inventory.use_role:
return False
return True
@check_superuser
def can_add(self, data):
if not data: # So the browseable API will work
return True
wfjt_pk = data.get('workflow_job_template', None)
if wfjt_pk:
wfjt = get_object_or_400(WorkflowJobTemplate, pk=wfjt_pk)
if self.user not in wfjt.admin_role:
return False
else:
return False
if not self.can_use_prompted_resources(data):
return False
return True
@check_superuser
def wfjt_admin(self, obj):
if not obj.workflow_job_template:
return self.user.is_superuser
else:
return self.user in obj.workflow_job_template.admin_role
def ujt_execute(self, obj):
if not obj.unified_job_template:
return self.wfjt_admin(obj)
else:
return self.user in obj.unified_job_template.execute_role and self.wfjt_admin(obj)
def can_change(self, obj, data):
if self.can_add(data) is False:
if not data:
return True
if not self.ujt_execute(obj):
# should not be able to edit the prompts if lacking access to UJT
return False
if 'credential' in data or 'inventory' in data:
new_data = data
if 'credential' not in data:
new_data['credential'] = self.credential
if 'inventory' not in data:
new_data['inventory'] = self.inventory
return self.can_use_prompted_resources(new_data)
return True
def can_delete(self, obj):
return self.can_change(obj, None)
return self.wfjt_admin(obj)
def check_same_WFJT(self, obj, sub_obj):
if type(obj) != self.model or type(sub_obj) != self.model:
raise Exception('Attaching workflow nodes only allowed for other nodes')
if obj.workflow_job_template != sub_obj.workflow_job_template:
return False
return True
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
return self.wfjt_admin(obj) and self.check_same_WFJT(obj, sub_obj)
def can_unattach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
return self.wfjt_admin(obj) and self.check_same_WFJT(obj, sub_obj)
# TODO:
class WorkflowJobNodeAccess(BaseAccess):
'''
I can see/use a WorkflowJobNode if I have permission to associated Workflow Job
I can see a WorkflowJobNode if I have permission to...
the workflow job template associated with...
the workflow job associated with the node.
Any deletion of editing of individual nodes would undermine the integrity
of the graph structure.
Deletion must happen as a cascade delete from the workflow job.
'''
model = WorkflowJobNode
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
return self.model.objects.all()
qs = self.model.objects.all()
else:
qs = self.model.objects.filter(
workflow_job__workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
self.user, 'read_role'))
return qs
@check_superuser
def can_read(self, obj):
return True
@check_superuser
def can_add(self, data):
if not data: # So the browseable API will work
return True
return True
return False
@check_superuser
def can_change(self, obj, data):
if self.can_add(data) is False:
return False
return True
return False
def can_delete(self, obj):
return self.can_change(obj, None)
return False
# TODO:
# TODO: revisit for survey logic, notification attachments?
class WorkflowJobTemplateAccess(BaseAccess):
'''
I can only see/manage Workflow Job Templates if I'm a super user
@ -1319,7 +1391,8 @@ class WorkflowJobTemplateAccess(BaseAccess):
qs = self.model.objects.all()
else:
qs = self.model.accessible_objects(self.user, 'read_role')
return qs.select_related('created_by', 'modified_by', 'next_schedule').all()
return qs.select_related('created_by', 'modified_by', 'next_schedule',
'admin_role', 'execute_role', 'read_role').all()
@check_superuser
def can_read(self, obj):
@ -1334,61 +1407,79 @@ class WorkflowJobTemplateAccess(BaseAccess):
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
'''
if not data: # So the browseable API will work
return True
return Organization.accessible_objects(self.user, 'admin_role').exists()
# if reference_obj is provided, determine if it can be coppied
reference_obj = data.pop('reference_obj', None)
if 'survey_enabled' in data and data['survey_enabled']:
self.check_license(feature='surveys')
if self.user.is_superuser:
if reference_obj:
for node in reference_obj.workflow_job_template_nodes.all():
if node.inventory and self.user not in node.inventory.use_role:
return False
if node.credential and self.user not in node.credential.use_role:
return False
if node.unified_job_template:
if isinstance(node.unified_job_template, SystemJobTemplate):
if not self.user.is_superuser:
return False
elif isinstance(node.unified_job_template, JobTemplate):
if self.user not in node.unified_job_template.execute_role:
return False
elif isinstance(node.unified_job_template, Project):
if self.user not in node.unified_job_template.update_role:
return False
elif isinstance(node.unified_job_template, InventorySource):
if not self.user.can_access(InventorySource, 'start', node.unified_job_template):
return False
else:
return False
return True
def get_value(Class, field):
if reference_obj:
return getattr(reference_obj, field, None)
else:
pk = get_pk_from_dict(data, field)
if pk:
return get_object_or_400(Class, pk=pk)
else:
return None
# will check this if surveys are added to WFJT
# if 'survey_enabled' in data and data['survey_enabled']:
# self.check_license(feature='surveys')
return False
org_pk = get_pk_from_dict(data, 'organization')
if not org_pk:
# only superusers can create or manage orphan WFJTs
return self.user.is_superuser
org = get_object_or_400(Organization, pk=org_pk)
return self.user in org.admin_role
def can_start(self, obj, validate_license=True):
# TODO: Are workflows allowed for all licenses ??
# Check license.
'''
if validate_license:
# check basic license, node count
self.check_license()
if obj.job_type == PERM_INVENTORY_SCAN:
self.check_license(feature='system_tracking')
if obj.survey_enabled:
self.check_license(feature='surveys')
'''
# if surveys are added to WFJTs, check license here
# if obj.survey_enabled:
# self.check_license(feature='surveys')
# Super users can start any job
if self.user.is_superuser:
return True
return self.can_read(obj)
# TODO: We should use execute role rather than read role
#return self.user in obj.execute_role
return self.user in obj.execute_role
def can_change(self, obj, data):
data_for_change = data
if self.user not in obj.admin_role and not self.user.is_superuser:
return False
if data is not None:
data = dict(data)
# # Check survey license if surveys are added to WFJTs
# if 'survey_enabled' in data and obj.survey_enabled != data['survey_enabled'] and data['survey_enabled']:
# self.check_license(feature='surveys')
if 'survey_enabled' in data and obj.survey_enabled != data['survey_enabled'] and data['survey_enabled']:
self.check_license(feature='surveys')
if self.user.is_superuser:
return True
return self.can_read(obj) and self.can_add(data_for_change)
org_pk = get_pk_from_dict(data, 'organization')
if ('organization' not in data or
(org_pk is None and obj.organization is None) or
(obj.organization and obj.organization.pk == org_pk)):
# No organization changes
return self.user in obj.admin_role
# If it already has an organization set, must be admin of the org to change it
if obj.organization and self.user not in obj.organization.admin_role:
return False
org = get_object_or_400(Organization, pk=org_pk)
return self.user in org.admin_role
def can_delete(self, obj):
is_delete_allowed = self.user.is_superuser or self.user in obj.admin_role
@ -1402,13 +1493,48 @@ class WorkflowJobTemplateAccess(BaseAccess):
return True
class WorkflowJobAccess(BaseAccess):
'''
I can only see Workflow Jobs if I'm a super user
I can only see Workflow Jobs if I can see the associated
workflow job template that it was created from.
I can delete them if I am admin of their workflow job template
I can cancel one if I can delete it
I can also cancel it if I started it
'''
model = WorkflowJob
def get_queryset(self):
if self.user.is_superuser or self.user.is_system_auditor:
qs = self.model.objects.all()
else:
qs = WorkflowJob.objects.filter(
workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
self.user, 'read_role'))
return qs.select_related('created_by', 'modified_by')
def can_add(self, data):
# Old add-start system for launching jobs is being depreciated, and
# not supported for new types of resources
return False
def can_change(self, obj, data):
return False
def can_delete(self, obj):
if obj.workflow_job_template is None:
# only superusers can delete orphaned workflow jobs
return self.user.is_superuser
return self.user in obj.workflow_job_template.admin_role
# TODO: add support for relaunching workflow jobs
def can_start(self, obj):
return False
def can_cancel(self, obj):
if not obj.can_cancel:
return False
return self.can_delete(obj) or self.user == obj.created_by
class AdHocCommandAccess(BaseAccess):
'''
I can only see/run ad hoc commands when:

View File

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import jsonfield.fields
import django.db.models.deletion
import awx.main.fields
class Migration(migrations.Migration):
dependencies = [
('main', '0037_v310_job_allow_simultaneous'),
]
operations = [
migrations.AddField(
model_name='workflowjobnode',
name='char_prompts',
field=jsonfield.fields.JSONField(default={}, blank=True),
),
migrations.AddField(
model_name='workflowjobnode',
name='credential',
field=models.ForeignKey(related_name='workflowjobnodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Credential', null=True),
),
migrations.AddField(
model_name='workflowjobnode',
name='inventory',
field=models.ForeignKey(related_name='workflowjobnodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Inventory', null=True),
),
migrations.AddField(
model_name='workflowjobtemplate',
name='execute_role',
field=awx.main.fields.ImplicitRoleField(related_name='+', parent_role=[b'admin_role'], to='main.Role', null=b'True'),
),
migrations.AddField(
model_name='workflowjobtemplate',
name='organization',
field=models.ForeignKey(related_name='workflows', on_delete=django.db.models.deletion.SET_NULL, blank=True, to='main.Organization', null=True),
),
migrations.AddField(
model_name='workflowjobtemplate',
name='read_role',
field=awx.main.fields.ImplicitRoleField(related_name='+', parent_role=[b'singleton:system_auditor', b'organization.auditor_role', b'execute_role', b'admin_role'], to='main.Role', null=b'True'),
),
migrations.AddField(
model_name='workflowjobtemplatenode',
name='char_prompts',
field=jsonfield.fields.JSONField(default={}, blank=True),
),
migrations.AddField(
model_name='workflowjobtemplatenode',
name='credential',
field=models.ForeignKey(related_name='workflowjobtemplatenodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Credential', null=True),
),
migrations.AddField(
model_name='workflowjobtemplatenode',
name='inventory',
field=models.ForeignKey(related_name='workflowjobtemplatenodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.Inventory', null=True),
),
migrations.AlterField(
model_name='workflowjobnode',
name='unified_job_template',
field=models.ForeignKey(related_name='workflowjobnodes', on_delete=django.db.models.deletion.SET_NULL, default=None, to='main.UnifiedJobTemplate', null=True),
),
migrations.AlterField(
model_name='workflowjobnode',
name='workflow_job',
field=models.ForeignKey(related_name='workflow_job_nodes', default=None, blank=True, to='main.WorkflowJob', null=True),
),
migrations.AlterField(
model_name='workflowjobtemplate',
name='admin_role',
field=awx.main.fields.ImplicitRoleField(related_name='+', parent_role=[b'singleton:system_administrator', b'organization.admin_role'], to='main.Role', null=b'True'),
),
migrations.AlterField(
model_name='workflowjobtemplatenode',
name='unified_job_template',
field=models.ForeignKey(related_name='workflowjobtemplatenodes', on_delete=django.db.models.deletion.SET_NULL, default=None, to='main.UnifiedJobTemplate', null=True),
),
]

View File

@ -33,6 +33,7 @@ from awx.main.utils import emit_websocket_notification
from awx.main.redact import PlainTextCleaner
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin
from awx.main.models.base import PERM_INVENTORY_SCAN
logger = logging.getLogger('awx.main.models.jobs')
@ -271,7 +272,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
validation_errors['credential'] = ["Job Template must provide 'credential' or allow prompting for it.",]
# Job type dependent checks
if self.job_type == 'scan':
if self.job_type == PERM_INVENTORY_SCAN:
if self.inventory is None or self.ask_inventory_on_launch:
validation_errors['inventory'] = ["Scan jobs must be assigned a fixed inventory.",]
elif self.project is None:
@ -473,13 +474,24 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
else:
ignored_fields[field] = kwargs[field]
# Special case to ignore inventory if it is a scan job
if prompted_fields.get('job_type', None) == 'scan' or self.job_type == 'scan':
if 'inventory' in prompted_fields:
ignored_fields['inventory'] = prompted_fields.pop('inventory')
return prompted_fields, ignored_fields
def _extra_job_type_errors(self, data):
"""
Used to enforce 2 special cases around scan jobs and prompting
- the inventory can not be changed on a scan job template
- scan jobs can not be switched to run/check type and vice versa
"""
errors = {}
if 'job_type' in data and self.ask_job_type_on_launch:
if ((self.job_type == PERM_INVENTORY_SCAN and not data['job_type'] == PERM_INVENTORY_SCAN) or
(data['job_type'] == PERM_INVENTORY_SCAN and not self.job_type == PERM_INVENTORY_SCAN)):
errors['job_type'] = 'Can not override job_type to or from a scan job.'
if (self.job_type == PERM_INVENTORY_SCAN and ('inventory' in data) and self.ask_inventory_on_launch and
self.inventory != data['inventory']):
errors['inventory'] = 'Inventory can not be changed at runtime for scan jobs.'
return errors
@property
def cache_timeout_blocked(self):
if Job.objects.filter(job_template=self, status__in=['pending', 'waiting', 'running']).count() > getattr(settings, 'SCHEDULE_MAX_JOBS', 10):

View File

@ -51,7 +51,7 @@ role_descriptions = {
'adhoc_role' : 'May run ad hoc commands on an inventory',
'admin_role' : 'Can manage all aspects of the %s',
'auditor_role' : 'Can view all settings for the %s',
'execute_role' : 'May run the job template',
'execute_role' : 'May run the %s',
'member_role' : 'User is a member of the %s',
'read_role' : 'May view settings for the %s',
'update_role' : 'May update project or inventory or group using the configured source update system',

View File

@ -9,28 +9,31 @@ from django.db import models
from django.core.urlresolvers import reverse
#from django import settings as tower_settings
from jsonfield import JSONField
# AWX
from awx.main.models import UnifiedJobTemplate, UnifiedJob
from awx.main.models.notifications import JobNotificationMixin
from awx.main.models.base import BaseModel, CreatedModifiedModel, VarsDictProperty
from awx.main.models.rbac import (
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
ROLE_SINGLETON_SYSTEM_AUDITOR
)
from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin
import yaml
import json
__all__ = ['WorkflowJobTemplate', 'WorkflowJob', 'WorkflowJobOptions', 'WorkflowJobNode', 'WorkflowJobTemplateNode',]
CHAR_PROMPTS_LIST = ['job_type', 'job_tags', 'skip_tags', 'limit']
class WorkflowNodeBase(CreatedModifiedModel):
class Meta:
abstract = True
app_label = 'main'
# TODO: RBAC
'''
admin_role = ImplicitRoleField(
parent_role='workflow_job_template.admin_role',
)
'''
success_nodes = models.ManyToManyField(
'self',
blank=True,
@ -52,11 +55,82 @@ class WorkflowNodeBase(CreatedModifiedModel):
unified_job_template = models.ForeignKey(
'UnifiedJobTemplate',
related_name='%(class)ss',
blank=False,
null=True,
default=None,
on_delete=models.SET_NULL,
)
# Prompting-related fields
inventory = models.ForeignKey(
'Inventory',
related_name='%(class)ss',
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
)
credential = models.ForeignKey(
'Credential',
related_name='%(class)ss',
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
)
char_prompts = JSONField(
blank=True,
default={}
)
def prompts_dict(self):
data = {}
if self.inventory:
data['inventory'] = self.inventory.pk
if self.credential:
data['credential'] = self.credential.pk
for fd in CHAR_PROMPTS_LIST:
if fd in self.char_prompts:
data[fd] = self.char_prompts[fd]
return data
def get_prompts_warnings(self):
ujt_obj = self.unified_job_template
if ujt_obj is None:
return {}
prompts_dict = self.prompts_dict()
if not hasattr(ujt_obj, '_ask_for_vars_dict'):
if prompts_dict:
return {'ignored': {'all': 'Can not use prompts on unified_job_template that is not type of job template'}}
else:
return {}
accepted_fields, ignored_fields = ujt_obj._accept_or_ignore_job_kwargs(**prompts_dict)
ask_for_vars_dict = ujt_obj._ask_for_vars_dict()
ignored_dict = {}
missing_dict = {}
for fd in ignored_fields:
ignored_dict[fd] = 'Workflow node provided field, but job template is not set to ask on launch'
scan_errors = ujt_obj._extra_job_type_errors(accepted_fields)
ignored_dict.update(scan_errors)
for fd in ['inventory', 'credential']:
if getattr(ujt_obj, fd) is None and not (ask_for_vars_dict.get(fd, False) and fd in prompts_dict):
missing_dict[fd] = 'Job Template does not have this field and workflow node does not provide it'
data = {}
if ignored_dict:
data['ignored'] = ignored_dict
if missing_dict:
data['missing'] = missing_dict
return data
@classmethod
def _get_workflow_job_field_names(cls):
'''
Return field names that should be copied from template node to job node.
'''
return ['workflow_job', 'unified_job_template',
'inventory', 'credential', 'char_prompts']
class WorkflowJobTemplateNode(WorkflowNodeBase):
# TODO: Ensure the API forces workflow_job_template being set
@ -72,6 +146,18 @@ class WorkflowJobTemplateNode(WorkflowNodeBase):
def get_absolute_url(self):
return reverse('api:workflow_job_template_node_detail', args=(self.pk,))
def create_workflow_job_node(self, **kwargs):
'''
Create a new workflow job node based on this workflow node.
'''
create_kwargs = {}
for field_name in self._get_workflow_job_field_names():
if field_name in kwargs:
create_kwargs[field_name] = kwargs[field_name]
elif hasattr(self, field_name):
create_kwargs[field_name] = getattr(self, field_name)
return WorkflowJobNode.objects.create(**create_kwargs)
class WorkflowJobNode(WorkflowNodeBase):
job = models.ForeignKey(
'UnifiedJob',
@ -87,12 +173,39 @@ class WorkflowJobNode(WorkflowNodeBase):
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
on_delete=models.CASCADE,
)
def get_absolute_url(self):
return reverse('api:workflow_job_node_detail', args=(self.pk,))
def get_job_kwargs(self):
# reject/accept prompted fields
data = {}
ujt_obj = self.unified_job_template
if ujt_obj and hasattr(ujt_obj, '_ask_for_vars_dict'):
accepted_fields, ignored_fields = ujt_obj._accept_or_ignore_job_kwargs(**self.prompts_dict())
for fd in ujt_obj._extra_job_type_errors(accepted_fields):
accepted_fields.pop(fd)
data.update(accepted_fields)
# TODO: decide what to do in the event of missing fields
# process extra_vars
extra_vars = {}
if self.workflow_job and self.workflow_job.extra_vars:
try:
WJ_json_extra_vars = json.loads(
(self.workflow_job.extra_vars or '').strip() or '{}')
except ValueError:
try:
WJ_json_extra_vars = yaml.safe_load(self.workflow_job.extra_vars)
except yaml.YAMLError:
WJ_json_extra_vars = {}
extra_vars.update(WJ_json_extra_vars)
# TODO: merge artifacts, add ancestor_artifacts to kwargs
if extra_vars:
data['extra_vars'] = extra_vars
return data
class WorkflowJobOptions(BaseModel):
class Meta:
abstract = True
@ -102,14 +215,29 @@ class WorkflowJobOptions(BaseModel):
default='',
)
class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions):
class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, ResourceMixin):
class Meta:
app_label = 'main'
admin_role = ImplicitRoleField(
parent_role='singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
organization = models.ForeignKey(
'Organization',
blank=True,
null=True,
on_delete=models.SET_NULL,
related_name='workflows',
)
admin_role = ImplicitRoleField(parent_role=[
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
'organization.admin_role'
])
execute_role = ImplicitRoleField(parent_role=[
'admin_role'
])
read_role = ImplicitRoleField(parent_role=[
'singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,
'organization.auditor_role', 'execute_role', 'admin_role'
])
@classmethod
def _get_unified_job_class(cls):
@ -146,6 +274,17 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions):
workflow_job.inherit_job_template_workflow_nodes()
return workflow_job
def get_warnings(self):
warning_data = {}
for node in self.workflow_job_template_nodes.all():
if node.unified_job_template is None:
warning_data[node.pk] = 'Node is missing a linked unified_job_template'
continue
node_prompts_warnings = node.get_prompts_warnings()
if node_prompts_warnings:
warning_data[node.pk] = node_prompts_warnings
return warning_data
class WorkflowJobInheritNodesMixin(object):
def _inherit_relationship(self, old_node, new_node, node_ids_map, node_type):
old_related_nodes = self._get_all_by_type(old_node, node_type)
@ -159,7 +298,7 @@ class WorkflowJobInheritNodesMixin(object):
Create a WorkflowJobNode for each WorkflowJobTemplateNode
'''
def _create_workflow_job_nodes(self, old_nodes):
return [WorkflowJobNode.objects.create(workflow_job=self, unified_job_template=old_node.unified_job_template) for old_node in old_nodes]
return [old_node.create_workflow_job_node(workflow_job=self) for old_node in old_nodes]
def _map_workflow_job_nodes(self, old_nodes, new_nodes):
node_ids_map = {}

View File

@ -53,9 +53,7 @@ def spawn_workflow_graph_jobs(workflow_jobs):
dag = WorkflowDAG(workflow_job)
spawn_nodes = dag.bfs_nodes_to_run()
for spawn_node in spawn_nodes:
# TODO: Inject job template template params as kwargs.
# Make sure to take into account extra_vars merge logic
kv = {}
kv = spawn_node.get_job_kwargs()
job = spawn_node.unified_job_template.create_unified_job(**kv)
spawn_node.job = job
spawn_node.save()

View File

@ -167,11 +167,11 @@ def mk_workflow_job(status='new', workflow_job_template=None, extra_vars={},
job.save()
return job
def mk_workflow_job_template(name, extra_vars='', spec=None, persisted=True):
def mk_workflow_job_template(name, extra_vars='', spec=None, organization=None, persisted=True):
if extra_vars:
extra_vars = json.dumps(extra_vars)
wfjt = WorkflowJobTemplate(name=name, extra_vars=extra_vars)
wfjt = WorkflowJobTemplate(name=name, extra_vars=extra_vars, organization=organization)
wfjt.survey_spec = spec
if wfjt.survey_spec is not None:

View File

@ -360,16 +360,20 @@ def generate_workflow_job_template_nodes(workflow_job_template,
new_node = WorkflowJobTemplateNode(workflow_job_template=workflow_job_template,
unified_job_template=node['unified_job_template'],
id=i)
if persisted:
new_node.save()
new_nodes.append(new_node)
node_types = ['success_nodes', 'failure_nodes', 'always_nodes']
for node_type in node_types:
for i, new_node in enumerate(new_nodes):
if node_type not in workflow_job_template_nodes[i]:
continue
for related_index in workflow_job_template_nodes[i][node_type]:
getattr(new_node, node_type).add(new_nodes[related_index])
# TODO: Implement survey and jobs
def create_workflow_job_template(name, persisted=True, **kwargs):
def create_workflow_job_template(name, organization=None, persisted=True, **kwargs):
Objects = generate_objects(["workflow_job_template",
"workflow_job_template_nodes",
"survey",], kwargs)
@ -382,7 +386,8 @@ def create_workflow_job_template(name, persisted=True, **kwargs):
if 'survey' in kwargs:
spec = create_survey_spec(kwargs['survey'])
wfjt = mk_workflow_job_template(name,
wfjt = mk_workflow_job_template(name,
organization=organization,
spec=spec,
extra_vars=extra_vars,
persisted=persisted)

View File

@ -0,0 +1,73 @@
import pytest
from awx.main.access import (
WorkflowJobTemplateAccess,
WorkflowJobTemplateNodeAccess,
WorkflowJobAccess,
# WorkflowJobNodeAccess
)
@pytest.fixture
def wfjt(workflow_job_template_factory, organization):
objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_with_nodes(workflow_job_template_factory, organization, job_template):
objects = workflow_job_template_factory(
'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_node(wfjt_with_nodes):
return wfjt_with_nodes.workflow_job_template_nodes.all()[0]
@pytest.fixture
def workflow_job(wfjt):
return wfjt.jobs.create(name='test_workflow')
@pytest.mark.django_db
class TestWorkflowJobTemplateAccess:
def test_random_user_no_edit(self, wfjt, rando):
access = WorkflowJobTemplateAccess(rando)
assert not access.can_change(wfjt, {'name': 'new name'})
def test_org_admin_edit(self, wfjt, org_admin):
access = WorkflowJobTemplateAccess(org_admin)
assert access.can_change(wfjt, {'name': 'new name'})
def test_org_admin_role_inheritance(self, wfjt, org_admin):
assert org_admin in wfjt.admin_role
assert org_admin in wfjt.execute_role
assert org_admin in wfjt.read_role
def test_jt_blocks_copy(self, wfjt_with_nodes, org_admin):
"""I want to copy a workflow JT in my organization, but someone
included a job template that I don't have access to, so I can
not copy the WFJT as-is"""
access = WorkflowJobTemplateAccess(org_admin)
assert not access.can_add({'reference_obj': wfjt_with_nodes})
@pytest.mark.django_db
class TestWorkflowJobTemplateNodeAccess:
def test_jt_access_to_edit(self, wfjt_node, org_admin):
access = WorkflowJobTemplateNodeAccess(org_admin)
assert not access.can_change(wfjt_node, {'job_type': 'scan'})
@pytest.mark.django_db
class TestWorkflowJobAccess:
def test_wfjt_admin_delete(self, wfjt, workflow_job, rando):
wfjt.admin_role.members.add(rando)
access = WorkflowJobAccess(rando)
assert access.can_delete(workflow_job)
def test_cancel_your_own_job(self, wfjt, workflow_job, rando):
wfjt.execute_role.members.add(rando)
workflow_job.created_by = rando
workflow_job.save()
access = WorkflowJobAccess(rando)
assert access.can_cancel(workflow_job)

View File

@ -104,6 +104,42 @@ class TestWorkflowJobTemplateNodeSerializerGetRelated():
assert 'workflow_job_template' not in related
class FakeView:
def __init__(self, obj):
self.obj = obj
def get_object(self):
return self.obj
class FakeRequest:
pass
class TestWorkflowJobTemplateNodeSerializerCharPrompts():
@pytest.fixture
def WFJT_serializer(self):
serializer = WorkflowJobTemplateNodeSerializer()
node = WorkflowJobTemplateNode(pk=1)
node.char_prompts = {'limit': 'webservers'}
view = FakeView(node)
view.request = FakeRequest()
view.request.method = "PATCH"
serializer.context = {'view': view}
return serializer
def test_change_single_field(self, WFJT_serializer):
"Test that a single prompt field can be changed without affecting other fields"
internal_value = WFJT_serializer.to_internal_value({'job_type': 'check'})
assert internal_value['char_prompts']['job_type'] == 'check'
assert internal_value['char_prompts']['limit'] == 'webservers'
def test_null_single_field(self, WFJT_serializer):
"Test that a single prompt field can be removed without affecting other fields"
internal_value = WFJT_serializer.to_internal_value({'job_type': None})
assert 'job_type' not in internal_value['char_prompts']
assert internal_value['char_prompts']['limit'] == 'webservers'
@mock.patch('awx.api.serializers.WorkflowNodeBaseSerializer.get_related', lambda x,y: {})
class TestWorkflowJobNodeSerializerGetRelated():
@pytest.fixture

View File

@ -1,7 +1,11 @@
import pytest
from awx.main.models.jobs import JobTemplate
from awx.main.models.workflow import WorkflowJobTemplateNode, WorkflowJobInheritNodesMixin, WorkflowJobNode
from awx.main.models import Inventory, Credential, Project
from awx.main.models.workflow import (
WorkflowJobTemplate, WorkflowJobTemplateNode, WorkflowJobInheritNodesMixin,
WorkflowJob, WorkflowJobNode
)
class TestWorkflowJobInheritNodesMixin():
class TestCreateWorkflowJobNodes():
@ -14,14 +18,13 @@ class TestWorkflowJobInheritNodesMixin():
return [WorkflowJobTemplateNode(unified_job_template=job_templates[i]) for i in range(0, 10)]
def test__create_workflow_job_nodes(self, mocker, job_template_nodes):
workflow_job_node_create = mocker.patch('awx.main.models.WorkflowJobNode.objects.create')
workflow_job_node_create = mocker.patch('awx.main.models.WorkflowJobTemplateNode.create_workflow_job_node')
mixin = WorkflowJobInheritNodesMixin()
mixin._create_workflow_job_nodes(job_template_nodes)
for job_template_node in job_template_nodes:
workflow_job_node_create.assert_any_call(workflow_job=mixin,
unified_job_template=job_template_node.unified_job_template)
workflow_job_node_create.assert_any_call(workflow_job=mixin)
class TestMapWorkflowJobNodes():
@pytest.fixture
@ -79,3 +82,147 @@ class TestWorkflowJobInheritNodesMixin():
job_nodes[i].success_nodes.add.assert_any_call(job_nodes[i + 1])
@pytest.fixture
def workflow_job_unit():
return WorkflowJob(name='workflow', status='new')
@pytest.fixture
def workflow_job_template_unit():
return WorkflowJobTemplate(name='workflow')
@pytest.fixture
def jt_ask(job_template_factory):
# note: factory sets ask_xxxx_on_launch to true for inventory & credential
jt = job_template_factory(name='example-jt', persisted=False).job_template
jt.ask_job_type_on_launch = True
jt.ask_skip_tags_on_launch = True
jt.ask_limit_on_launch = True
jt.ask_tags_on_launch = True
return jt
@pytest.fixture
def project_unit():
return Project(name='example-proj')
example_prompts = dict(job_type='check', job_tags='quack', limit='duck', skip_tags='oink')
@pytest.fixture
def job_node_no_prompts(workflow_job_unit, jt_ask):
return WorkflowJobNode(workflow_job=workflow_job_unit, unified_job_template=jt_ask)
@pytest.fixture
def job_node_with_prompts(job_node_no_prompts):
job_node_no_prompts.char_prompts = example_prompts
job_node_no_prompts.inventory = Inventory(name='example-inv')
job_node_no_prompts.credential = Credential(name='example-inv', kind='ssh', username='asdf', password='asdf')
return job_node_no_prompts
@pytest.fixture
def wfjt_node_no_prompts(workflow_job_template_unit, jt_ask):
return WorkflowJobTemplateNode(workflow_job_template=workflow_job_template_unit, unified_job_template=jt_ask)
@pytest.fixture
def wfjt_node_with_prompts(wfjt_node_no_prompts):
wfjt_node_no_prompts.char_prompts = example_prompts
wfjt_node_no_prompts.inventory = Inventory(name='example-inv')
wfjt_node_no_prompts.credential = Credential(name='example-inv', kind='ssh', username='asdf', password='asdf')
return wfjt_node_no_prompts
class TestWorkflowJobCreate:
def test_create_no_prompts(self, wfjt_node_no_prompts, workflow_job_unit, mocker):
mock_create = mocker.MagicMock()
with mocker.patch('awx.main.models.WorkflowJobNode.objects.create', mock_create):
wfjt_node_no_prompts.create_workflow_job_node(workflow_job=workflow_job_unit)
mock_create.assert_called_once_with(
char_prompts=wfjt_node_no_prompts.char_prompts,
inventory=None, credential=None,
unified_job_template=wfjt_node_no_prompts.unified_job_template,
workflow_job=workflow_job_unit)
def test_create_with_prompts(self, wfjt_node_with_prompts, workflow_job_unit, mocker):
mock_create = mocker.MagicMock()
with mocker.patch('awx.main.models.WorkflowJobNode.objects.create', mock_create):
wfjt_node_with_prompts.create_workflow_job_node(workflow_job=workflow_job_unit)
mock_create.assert_called_once_with(
char_prompts=wfjt_node_with_prompts.char_prompts,
inventory=wfjt_node_with_prompts.inventory,
credential=wfjt_node_with_prompts.credential,
unified_job_template=wfjt_node_with_prompts.unified_job_template,
workflow_job=workflow_job_unit)
class TestWorkflowJobNodeJobKWARGS:
"""
Tests for building the keyword arguments that go into creating and
launching a new job that corresponds to a workflow node.
"""
def test_null_kwargs(self, job_node_no_prompts):
assert job_node_no_prompts.get_job_kwargs() == {}
def test_inherit_workflow_job_extra_vars(self, job_node_no_prompts):
workflow_job = job_node_no_prompts.workflow_job
workflow_job.extra_vars = '{"a": 84}'
assert job_node_no_prompts.get_job_kwargs() == {'extra_vars': {'a': 84}}
def test_char_prompts_and_res_node_prompts(self, job_node_with_prompts):
assert job_node_with_prompts.get_job_kwargs() == dict(
inventory=job_node_with_prompts.inventory.pk,
credential=job_node_with_prompts.credential.pk,
**example_prompts)
def test_reject_some_node_prompts(self, job_node_with_prompts):
job_node_with_prompts.unified_job_template.ask_inventory_on_launch = False
job_node_with_prompts.unified_job_template.ask_job_type_on_launch = False
expect_kwargs = dict(inventory=job_node_with_prompts.inventory.pk,
credential=job_node_with_prompts.credential.pk,
**example_prompts)
expect_kwargs.pop('inventory')
expect_kwargs.pop('job_type')
assert job_node_with_prompts.get_job_kwargs() == expect_kwargs
def test_no_accepted_project_node_prompts(self, job_node_no_prompts, project_unit):
job_node_no_prompts.unified_job_template = project_unit
assert job_node_no_prompts.get_job_kwargs() == {}
class TestWorkflowWarnings:
"""
Tests of warnings that show user errors in the construction of a workflow
"""
def test_no_warn_project_node_no_prompts(self, job_node_no_prompts, project_unit):
job_node_no_prompts.unified_job_template = project_unit
assert job_node_no_prompts.get_prompts_warnings() == {}
def test_warn_project_node_reject_all_prompts(self, job_node_with_prompts, project_unit):
job_node_with_prompts.unified_job_template = project_unit
assert 'ignored' in job_node_with_prompts.get_prompts_warnings()
assert 'all' in job_node_with_prompts.get_prompts_warnings()['ignored']
def test_no_warn_accept_all_prompts(self, job_node_with_prompts):
assert job_node_with_prompts.get_prompts_warnings() == {}
def test_warn_reject_some_prompts(self, job_node_with_prompts):
job_node_with_prompts.unified_job_template.ask_credential_on_launch = False
job_node_with_prompts.unified_job_template.ask_job_type_on_launch = False
assert 'ignored' in job_node_with_prompts.get_prompts_warnings()
assert 'job_type' in job_node_with_prompts.get_prompts_warnings()['ignored']
assert 'credential' in job_node_with_prompts.get_prompts_warnings()['ignored']
assert len(job_node_with_prompts.get_prompts_warnings()['ignored']) == 2
def test_warn_scan_errors_node_prompts(self, job_node_with_prompts):
job_node_with_prompts.unified_job_template.job_type = 'scan'
job_node_with_prompts.job_type = 'run'
job_node_with_prompts.inventory = Inventory(name='different-inventory', pk=23)
assert 'ignored' in job_node_with_prompts.get_prompts_warnings()
assert 'job_type' in job_node_with_prompts.get_prompts_warnings()['ignored']
assert 'inventory' in job_node_with_prompts.get_prompts_warnings()['ignored']
assert len(job_node_with_prompts.get_prompts_warnings()['ignored']) == 2
def test_warn_missing_fields(self, job_node_no_prompts):
job_node_no_prompts.inventory = None
assert 'missing' in job_node_no_prompts.get_prompts_warnings()
assert 'inventory' in job_node_no_prompts.get_prompts_warnings()['missing']
assert 'credential' in job_node_no_prompts.get_prompts_warnings()['missing']
assert len(job_node_no_prompts.get_prompts_warnings()['missing']) == 2

View File

@ -8,6 +8,7 @@ from awx.main.access import (
BaseAccess,
check_superuser,
JobTemplateAccess,
WorkflowJobTemplateAccess,
)
from awx.main.models import Credential, Inventory, Project, Role, Organization
@ -110,6 +111,30 @@ def test_jt_can_add_bad_data(user_unit):
access = JobTemplateAccess(user_unit)
assert not access.can_add({'asdf': 'asdf'})
class TestWorkflowAccessMethods:
@pytest.fixture
def workflow(self, workflow_job_template_factory):
objects = workflow_job_template_factory('test_workflow', persisted=False)
return objects.workflow_job_template
def test_workflow_can_add(self, workflow, user_unit):
organization = Organization(name='test-org')
workflow.organization = organization
organization.admin_role = Role()
def mock_get_object(Class, **kwargs):
if Class == Organization:
return organization
else:
raise Exception('Item requested has not been mocked')
access = WorkflowJobTemplateAccess(user_unit)
with mock.patch('awx.main.models.rbac.Role.__contains__', return_value=True):
with mock.patch('awx.main.access.get_object_or_400', mock_get_object):
assert access.can_add({'organization': 1})
@pytest.mark.django_db
def test_user_capabilities_method():
"""Unit test to verify that the user_capabilities method will defer

View File

@ -4,11 +4,16 @@
# Python
import base64
import re
import yaml
import json
# Django
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError
# REST framework
from rest_framework.serializers import ValidationError as RestValidationError
def validate_pem(data, min_keys=0, max_keys=None, min_certs=0, max_certs=None):
"""
@ -166,3 +171,21 @@ def validate_ssh_private_key(data):
credential.
"""
return validate_pem(data, min_keys=1)
def vars_validate_or_raise(vars_str):
"""
Validate that fields like extra_vars or variables on resources like
job templates, inventories, or hosts are either an acceptable
blank string, or are valid JSON or YAML dict
"""
try:
json.loads((vars_str or '').strip() or '{}')
return vars_str
except ValueError:
pass
try:
yaml.safe_load(vars_str)
return vars_str
except yaml.YAMLError:
pass
raise RestValidationError('Must be valid JSON or YAML.')