Merge pull request #2174 from matburt/jobtemplate_sharding

Implement Job Template Sharding/Splitting/Slicing

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot] 2018-10-31 15:49:51 +00:00 committed by GitHub
commit a1fe60da78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 943 additions and 116 deletions

2
.gitignore vendored
View File

@ -1,3 +1,4 @@
# Tags
.tags
.tags1
@ -52,6 +53,7 @@ __pycache__
**/node_modules/**
/tmp
**/npm-debug.log*
**/package-lock.json
# UI build flag files
awx/ui/.deps_built

View File

@ -3011,7 +3011,7 @@ class JobTemplateSerializer(JobTemplateMixin, UnifiedJobTemplateSerializer, JobO
fields = ('*', 'host_config_key', 'ask_diff_mode_on_launch', 'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch',
'ask_skip_tags_on_launch', 'ask_job_type_on_launch', 'ask_verbosity_on_launch', 'ask_inventory_on_launch',
'ask_credential_on_launch', 'survey_enabled', 'become_enabled', 'diff_mode',
'allow_simultaneous', 'custom_virtualenv')
'allow_simultaneous', 'custom_virtualenv', 'job_slice_count')
def get_related(self, obj):
res = super(JobTemplateSerializer, self).get_related(obj)
@ -3028,6 +3028,7 @@ class JobTemplateSerializer(JobTemplateMixin, UnifiedJobTemplateSerializer, JobO
labels = self.reverse('api:job_template_label_list', kwargs={'pk': obj.pk}),
object_roles = self.reverse('api:job_template_object_roles_list', kwargs={'pk': obj.pk}),
instance_groups = self.reverse('api:job_template_instance_groups_list', kwargs={'pk': obj.pk}),
slice_workflow_jobs = self.reverse('api:job_template_slice_workflow_jobs_list', kwargs={'pk': obj.pk}),
))
if self.version > 1:
res['copy'] = self.reverse('api:job_template_copy', kwargs={'pk': obj.pk})
@ -3123,7 +3124,7 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch',
'ask_job_type_on_launch', 'ask_verbosity_on_launch', 'ask_inventory_on_launch',
'ask_credential_on_launch', 'allow_simultaneous', 'artifacts', 'scm_revision',
'instance_group', 'diff_mode')
'instance_group', 'diff_mode', 'job_slice_number', 'job_slice_count')
def get_related(self, obj):
res = super(JobSerializer, self).get_related(obj)
@ -3590,6 +3591,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
class Meta:
model = WorkflowJob
fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous',
'job_template', 'is_sliced_job',
'-execution_node', '-event_processing_finished', '-controller_node',)
def get_related(self, obj):
@ -3598,6 +3600,8 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer):
res['workflow_job_template'] = self.reverse('api:workflow_job_template_detail',
kwargs={'pk': obj.workflow_job_template.pk})
res['notifications'] = self.reverse('api:workflow_job_notifications_list', kwargs={'pk': obj.pk})
if obj.job_template_id:
res['job_template'] = self.reverse('api:job_template_detail', kwargs={'pk': obj.job_template_id})
res['workflow_nodes'] = self.reverse('api:workflow_job_workflow_nodes_list', kwargs={'pk': obj.pk})
res['labels'] = self.reverse('api:workflow_job_label_list', kwargs={'pk': obj.pk})
res['activity_stream'] = self.reverse('api:workflow_job_activity_stream_list', kwargs={'pk': obj.pk})

View File

@ -26,6 +26,9 @@ string of `?all=1` to return all hosts, including disabled ones.
Specify a query string of `?towervars=1` to add variables
to the hostvars of each host that specifies its enabled state and database ID.
Specify a query string of `?subset=slice2of5` to produce an inventory that
has a restricted number of hosts according to the rules of job slicing.
To apply multiple query strings, join them with the `&` character, like `?hostvars=1&all=1`.
## Host Response

View File

@ -8,6 +8,7 @@ from awx.api.views import (
JobTemplateDetail,
JobTemplateLaunch,
JobTemplateJobsList,
JobTemplateSliceWorkflowJobsList,
JobTemplateCallback,
JobTemplateSchedulesList,
JobTemplateSurveySpec,
@ -28,6 +29,7 @@ urls = [
url(r'^(?P<pk>[0-9]+)/$', JobTemplateDetail.as_view(), name='job_template_detail'),
url(r'^(?P<pk>[0-9]+)/launch/$', JobTemplateLaunch.as_view(), name='job_template_launch'),
url(r'^(?P<pk>[0-9]+)/jobs/$', JobTemplateJobsList.as_view(), name='job_template_jobs_list'),
url(r'^(?P<pk>[0-9]+)/slice_workflow_jobs/$', JobTemplateSliceWorkflowJobsList.as_view(), name='job_template_slice_workflow_jobs_list'),
url(r'^(?P<pk>[0-9]+)/callback/$', JobTemplateCallback.as_view(), name='job_template_callback'),
url(r'^(?P<pk>[0-9]+)/schedules/$', JobTemplateSchedulesList.as_view(), name='job_template_schedules_list'),
url(r'^(?P<pk>[0-9]+)/survey_spec/$', JobTemplateSurveySpec.as_view(), name='job_template_survey_spec'),

View File

@ -2452,6 +2452,16 @@ class InventoryScriptView(RetrieveAPIView):
hostvars = bool(request.query_params.get('hostvars', ''))
towervars = bool(request.query_params.get('towervars', ''))
show_all = bool(request.query_params.get('all', ''))
subset = request.query_params.get('subset', '')
if subset:
if not isinstance(subset, six.string_types):
raise ParseError(_('Inventory subset argument must be a string.'))
if subset.startswith('slice'):
slice_number, slice_count = Inventory.parse_slice_params(subset)
else:
raise ParseError(_('Subset does not use any supported syntax.'))
else:
slice_number, slice_count = 1, 1
if hostname:
hosts_q = dict(name=hostname)
if not show_all:
@ -2461,7 +2471,8 @@ class InventoryScriptView(RetrieveAPIView):
return Response(obj.get_script_data(
hostvars=hostvars,
towervars=towervars,
show_all=show_all
show_all=show_all,
slice_number=slice_number, slice_count=slice_count
))
@ -2912,9 +2923,14 @@ class JobTemplateLaunch(RetrieveAPIView):
return Response(data, status=status.HTTP_400_BAD_REQUEST)
else:
data = OrderedDict()
data['job'] = new_job.id
data['ignored_fields'] = self.sanitize_for_response(ignored_fields)
data.update(JobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job))
if isinstance(new_job, WorkflowJob):
data['workflow_job'] = new_job.id
data['ignored_fields'] = self.sanitize_for_response(ignored_fields)
data.update(WorkflowJobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job))
else:
data['job'] = new_job.id
data['ignored_fields'] = self.sanitize_for_response(ignored_fields)
data.update(JobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job))
headers = {'Location': new_job.get_absolute_url(request)}
return Response(data, status=status.HTTP_201_CREATED, headers=headers)
@ -3362,6 +3378,7 @@ class JobTemplateCallback(GenericAPIView):
if extra_vars is not None and job_template.ask_variables_on_launch:
extra_vars_redacted, removed = extract_ansible_vars(extra_vars)
kv['extra_vars'] = extra_vars_redacted
kv['_prevent_slicing'] = True # will only run against 1 host, so no point
with transaction.atomic():
job = job_template.create_job(**kv)
@ -3393,6 +3410,15 @@ class JobTemplateJobsList(SubListCreateAPIView):
return methods
class JobTemplateSliceWorkflowJobsList(SubListCreateAPIView):
model = WorkflowJob
serializer_class = WorkflowJobListSerializer
parent_model = JobTemplate
relationship = 'slice_workflow_jobs'
parent_key = 'job_template'
class JobTemplateInstanceGroupsList(SubListAttachDetachAPIView):
model = InstanceGroup
@ -3685,6 +3711,8 @@ class WorkflowJobRelaunch(WorkflowsEnforcementMixin, GenericAPIView):
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.is_sliced_job and not obj.job_template_id:
raise ParseError(_('Cannot relaunch slice workflow job orphaned from job template.'))
new_workflow_job = obj.create_relaunch_workflow_job()
new_workflow_job.signal_start()

View File

@ -1789,7 +1789,7 @@ class WorkflowJobNodeAccess(BaseAccess):
def filtered_queryset(self):
return self.model.objects.filter(
workflow_job__workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
workflow_job__unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs(
self.user, 'read_role'))
@check_superuser
@ -1915,7 +1915,7 @@ class WorkflowJobAccess(BaseAccess):
def filtered_queryset(self):
return WorkflowJob.objects.filter(
workflow_job_template__in=WorkflowJobTemplate.accessible_objects(
unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs(
self.user, 'read_role'))
def can_add(self, data):
@ -1947,9 +1947,11 @@ class WorkflowJobAccess(BaseAccess):
if self.user.is_superuser:
return True
wfjt = obj.workflow_job_template
template = obj.workflow_job_template
if not template and obj.job_template_id:
template = obj.job_template
# only superusers can relaunch orphans
if not wfjt:
if not template:
return False
# If job was launched by another user, it could have survey passwords
@ -1967,7 +1969,7 @@ class WorkflowJobAccess(BaseAccess):
return False
# execute permission to WFJT is mandatory for any relaunch
return (self.user in wfjt.execute_role)
return (self.user in template.execute_role)
def can_recreate(self, obj):
node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template')

View File

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-10-15 16:21
from __future__ import unicode_literals
import awx.main.utils.polymorphic
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0050_v340_drop_celery_tables'),
]
operations = [
migrations.AddField(
model_name='job',
name='job_slice_count',
field=models.PositiveIntegerField(blank=True, default=1, help_text='If ran as part of sliced jobs, the total number of slices. If 1, job is not part of a sliced job.'),
),
migrations.AddField(
model_name='job',
name='job_slice_number',
field=models.PositiveIntegerField(blank=True, default=0, help_text='If part of a sliced job, the ID of the inventory slice operated on. If not part of sliced job, parameter is not used.'),
),
migrations.AddField(
model_name='jobtemplate',
name='job_slice_count',
field=models.PositiveIntegerField(blank=True, default=1, help_text='The number of jobs to slice into at runtime. Will cause the Job Template to launch a workflow if value is greater than 1.'),
),
migrations.AddField(
model_name='workflowjob',
name='is_sliced_job',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='workflowjob',
name='job_template',
field=models.ForeignKey(blank=True, default=None, help_text='If automatically created for a sliced job run, the job template the workflow job was created from.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='slice_workflow_jobs', to='main.JobTemplate'),
),
migrations.AlterField(
model_name='unifiedjob',
name='unified_job_template',
field=models.ForeignKey(default=None, editable=False, null=True, on_delete=awx.main.utils.polymorphic.SET_NULL, related_name='unifiedjob_unified_jobs', to='main.UnifiedJobTemplate'),
),
]

View File

@ -136,8 +136,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
else:
return []
@classmethod
def _get_parent_field_name(cls):
def _get_parent_field_name(self):
return ''
@classmethod

View File

@ -19,6 +19,9 @@ from django.core.exceptions import ValidationError
from django.utils.timezone import now
from django.db.models import Q
# REST Framework
from rest_framework.exceptions import ParseError
# AWX
from awx.api.versioning import reverse
from awx.main.constants import CLOUD_PROVIDERS
@ -217,67 +220,87 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
group_children.add(from_group_id)
return group_children_map
def get_script_data(self, hostvars=False, towervars=False, show_all=False):
if show_all:
hosts_q = dict()
else:
hosts_q = dict(enabled=True)
@staticmethod
def parse_slice_params(slice_str):
m = re.match(r"slice(?P<number>\d+)of(?P<step>\d+)", slice_str)
if not m:
raise ParseError(_('Could not parse subset as slice specification.'))
number = int(m.group('number'))
step = int(m.group('step'))
if number > step:
raise ParseError(_('Slice number must be less than total number of slices.'))
elif number < 1:
raise ParseError(_('Slice number must be 1 or higher.'))
return (number, step)
def get_script_data(self, hostvars=False, towervars=False, show_all=False, slice_number=1, slice_count=1):
hosts_kw = dict()
if not show_all:
hosts_kw['enabled'] = True
fetch_fields = ['name', 'id', 'variables']
if towervars:
fetch_fields.append('enabled')
hosts = self.hosts.filter(**hosts_kw).order_by('name').only(*fetch_fields)
if slice_count > 1:
offset = slice_number - 1
hosts = hosts[offset::slice_count]
data = dict()
all_group = data.setdefault('all', dict())
all_hostnames = set(host.name for host in hosts)
if self.variables_dict:
all_group = data.setdefault('all', dict())
all_group['vars'] = self.variables_dict
if self.kind == 'smart':
if len(self.hosts.all()) == 0:
return {}
else:
all_group = data.setdefault('all', dict())
smart_hosts_qs = self.hosts.filter(**hosts_q).all()
smart_hosts = list(smart_hosts_qs.values_list('name', flat=True))
all_group['hosts'] = smart_hosts
all_group['hosts'] = [host.name for host in hosts]
else:
# Add hosts without a group to the all group.
groupless_hosts_qs = self.hosts.filter(groups__isnull=True, **hosts_q)
groupless_hosts = list(groupless_hosts_qs.values_list('name', flat=True))
if groupless_hosts:
all_group = data.setdefault('all', dict())
all_group['hosts'] = groupless_hosts
# Keep track of hosts that are members of a group
grouped_hosts = set([])
# Build in-memory mapping of groups and their hosts.
group_hosts_kw = dict(group__inventory_id=self.id, host__inventory_id=self.id)
if 'enabled' in hosts_q:
group_hosts_kw['host__enabled'] = hosts_q['enabled']
group_hosts_qs = Group.hosts.through.objects.filter(**group_hosts_kw)
group_hosts_qs = group_hosts_qs.values_list('group_id', 'host_id', 'host__name')
group_hosts_qs = Group.hosts.through.objects.filter(
group__inventory_id=self.id,
host__inventory_id=self.id
).values_list('group_id', 'host_id', 'host__name')
group_hosts_map = {}
for group_id, host_id, host_name in group_hosts_qs:
if host_name not in all_hostnames:
continue # host might not be in current shard
group_hostnames = group_hosts_map.setdefault(group_id, [])
group_hostnames.append(host_name)
grouped_hosts.add(host_name)
# Build in-memory mapping of groups and their children.
group_parents_qs = Group.parents.through.objects.filter(
from_group__inventory_id=self.id,
to_group__inventory_id=self.id,
)
group_parents_qs = group_parents_qs.values_list('from_group_id', 'from_group__name',
'to_group_id')
).values_list('from_group_id', 'from_group__name', 'to_group_id')
group_children_map = {}
for from_group_id, from_group_name, to_group_id in group_parents_qs:
group_children = group_children_map.setdefault(to_group_id, [])
group_children.append(from_group_name)
# Now use in-memory maps to build up group info.
for group in self.groups.all():
for group in self.groups.only('name', 'id', 'variables'):
group_info = dict()
group_info['hosts'] = group_hosts_map.get(group.id, [])
group_info['children'] = group_children_map.get(group.id, [])
group_info['vars'] = group.variables_dict
data[group.name] = group_info
# Add ungrouped hosts to all group
all_group['hosts'] = [host.name for host in hosts if host.name not in grouped_hosts]
# Remove any empty groups
for group_name in list(data.keys()):
if not data.get(group_name, {}).get('hosts', []):
data.pop(group_name)
if hostvars:
data.setdefault('_meta', dict())
data['_meta'].setdefault('hostvars', dict())
for host in self.hosts.filter(**hosts_q):
for host in hosts:
data['_meta']['hostvars'][host.name] = host.variables_dict
if towervars:
tower_dict = dict(remote_tower_enabled=str(host.enabled).lower(),
@ -1624,8 +1647,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
null=True
)
@classmethod
def _get_parent_field_name(cls):
def _get_parent_field_name(self):
return 'inventory_source'
@classmethod

View File

@ -277,6 +277,13 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
default=False,
allows_field='credentials'
)
job_slice_count = models.PositiveIntegerField(
blank=True,
default=1,
help_text=_("The number of jobs to slice into at runtime. "
"Will cause the Job Template to launch a workflow if value is greater than 1."),
)
admin_role = ImplicitRoleField(
parent_role=['project.organization.job_template_admin_role', 'inventory.organization.job_template_admin_role']
)
@ -295,7 +302,8 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
@classmethod
def _get_unified_job_field_names(cls):
return set(f.name for f in JobOptions._meta.fields) | set(
['name', 'description', 'schedule', 'survey_passwords', 'labels', 'credentials']
['name', 'description', 'schedule', 'survey_passwords', 'labels', 'credentials',
'job_slice_number', 'job_slice_count']
)
@property
@ -320,6 +328,31 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
'''
return self.create_unified_job(**kwargs)
def create_unified_job(self, **kwargs):
prevent_slicing = kwargs.pop('_prevent_slicing', False)
slice_event = bool(self.job_slice_count > 1 and (not prevent_slicing))
if slice_event:
# A Slice Job Template will generate a WorkflowJob rather than a Job
from awx.main.models.workflow import WorkflowJobTemplate, WorkflowJobNode
kwargs['_unified_job_class'] = WorkflowJobTemplate._get_unified_job_class()
kwargs['_parent_field_name'] = "job_template"
kwargs.setdefault('_eager_fields', {})
kwargs['_eager_fields']['is_sliced_job'] = True
job = super(JobTemplate, self).create_unified_job(**kwargs)
if slice_event:
try:
wj_config = job.launch_config
except JobLaunchConfig.DoesNotExist:
wj_config = JobLaunchConfig()
actual_inventory = wj_config.inventory if wj_config.inventory else self.inventory
for idx in xrange(min(self.job_slice_count,
actual_inventory.hosts.count())):
create_kwargs = dict(workflow_job=job,
unified_job_template=self,
ancestor_artifacts=dict(job_slice=idx + 1))
WorkflowJobNode.objects.create(**create_kwargs)
return job
def get_absolute_url(self, request=None):
return reverse('api:job_template_detail', kwargs={'pk': self.pk}, request=request)
@ -452,7 +485,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
RelatedJobsMixin
'''
def _get_related_jobs(self):
return Job.objects.filter(job_template=self)
return UnifiedJob.objects.filter(unified_job_template=self)
class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskManagerJobMixin):
@ -501,10 +534,21 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
on_delete=models.SET_NULL,
help_text=_('The SCM Refresh task used to make sure the playbooks were available for the job run'),
)
job_slice_number = models.PositiveIntegerField(
blank=True,
default=0,
help_text=_("If part of a sliced job, the ID of the inventory slice operated on. "
"If not part of sliced job, parameter is not used."),
)
job_slice_count = models.PositiveIntegerField(
blank=True,
default=1,
help_text=_("If ran as part of sliced jobs, the total number of slices. "
"If 1, job is not part of a sliced job."),
)
@classmethod
def _get_parent_field_name(cls):
def _get_parent_field_name(self):
return 'job_template'
@classmethod
@ -545,6 +589,15 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
def event_class(self):
return JobEvent
def copy_unified_job(self, **new_prompts):
# Needed for job slice relaunch consistency, do no re-spawn workflow job
# target same slice as original job
new_prompts['_prevent_slicing'] = True
new_prompts.setdefault('_eager_fields', {})
new_prompts['_eager_fields']['job_slice_number'] = self.job_slice_number
new_prompts['_eager_fields']['job_slice_count'] = self.job_slice_count
return super(Job, self).copy_unified_job(**new_prompts)
@property
def ask_diff_mode_on_launch(self):
if self.job_template is not None:
@ -638,6 +691,9 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
count_hosts = 2
else:
count_hosts = Host.objects.filter(inventory__jobs__pk=self.pk).count()
if self.job_slice_count > 1:
# Integer division intentional
count_hosts = (count_hosts + self.job_slice_count - self.job_slice_number) / self.job_slice_count
return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1
@property

View File

@ -496,8 +496,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage
default='check',
)
@classmethod
def _get_parent_field_name(cls):
def _get_parent_field_name(self):
return 'project'
@classmethod

View File

@ -309,13 +309,6 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
'''
raise NotImplementedError # Implement in subclass.
@classmethod
def _get_unified_job_field_names(cls):
'''
Return field names that should be copied from template to new job.
'''
raise NotImplementedError # Implement in subclass.
@property
def notification_templates(self):
'''
@ -338,19 +331,33 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
unified_job_class = self._get_unified_job_class()
fields = self._get_unified_job_field_names()
unallowed_fields = set(kwargs.keys()) - set(fields)
if unallowed_fields:
logger.warn('Fields {} are not allowed as overrides.'.format(unallowed_fields))
map(kwargs.pop, unallowed_fields)
parent_field_name = None
if "_unified_job_class" in kwargs:
# Special case where spawned job is different type than usual
# Only used for slice jobs
unified_job_class = kwargs.pop("_unified_job_class")
fields = unified_job_class._get_unified_job_field_names() & fields
parent_field_name = kwargs.pop('_parent_field_name')
unified_job = copy_model_by_class(self, unified_job_class, fields, kwargs)
unallowed_fields = set(kwargs.keys()) - set(fields)
validated_kwargs = kwargs.copy()
if unallowed_fields:
if parent_field_name is None:
logger.warn(six.text_type('Fields {} are not allowed as overrides to spawn {} from {}.').format(
six.text_type(', ').join(unallowed_fields), unified_job, self
))
map(validated_kwargs.pop, unallowed_fields)
unified_job = copy_model_by_class(self, unified_job_class, fields, validated_kwargs)
if eager_fields:
for fd, val in eager_fields.items():
setattr(unified_job, fd, val)
# Set the unified job template back-link on the job
parent_field_name = unified_job_class._get_parent_field_name()
# NOTE: slice workflow jobs _get_parent_field_name method
# is not correct until this is set
if not parent_field_name:
parent_field_name = unified_job._get_parent_field_name()
setattr(unified_job, parent_field_name, self)
# For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction
@ -364,24 +371,25 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio
unified_job.save()
# Labels and credentials copied here
if kwargs.get('credentials'):
if validated_kwargs.get('credentials'):
Credential = UnifiedJob._meta.get_field('credentials').related_model
cred_dict = Credential.unique_dict(self.credentials.all())
prompted_dict = Credential.unique_dict(kwargs['credentials'])
prompted_dict = Credential.unique_dict(validated_kwargs['credentials'])
# combine prompted credentials with JT
cred_dict.update(prompted_dict)
kwargs['credentials'] = [cred for cred in cred_dict.values()]
validated_kwargs['credentials'] = [cred for cred in cred_dict.values()]
kwargs['credentials'] = validated_kwargs['credentials']
from awx.main.signals import disable_activity_stream
with disable_activity_stream():
copy_m2m_relationships(self, unified_job, fields, kwargs=kwargs)
copy_m2m_relationships(self, unified_job, fields, kwargs=validated_kwargs)
if 'extra_vars' in kwargs:
unified_job.handle_extra_data(kwargs['extra_vars'])
if 'extra_vars' in validated_kwargs:
unified_job.handle_extra_data(validated_kwargs['extra_vars'])
if not getattr(self, '_deprecated_credential_launch', False):
# Create record of provided prompts for relaunch and rescheduling
unified_job.create_config_from_prompts(kwargs)
unified_job.create_config_from_prompts(kwargs, parent=self)
return unified_job
@ -546,7 +554,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
default=None,
editable=False,
related_name='%(class)s_unified_jobs',
on_delete=models.SET_NULL,
on_delete=polymorphic.SET_NULL,
)
launch_type = models.CharField(
max_length=20,
@ -694,8 +702,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def supports_isolation(cls):
return False
@classmethod
def _get_parent_field_name(cls):
def _get_parent_field_name(self):
return 'unified_job_template' # Override in subclasses.
@classmethod
@ -828,7 +835,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
'''
unified_job_class = self.__class__
unified_jt_class = self._get_unified_job_template_class()
parent_field_name = unified_job_class._get_parent_field_name()
parent_field_name = self._get_parent_field_name()
fields = unified_jt_class._get_unified_job_field_names() | set([parent_field_name])
create_data = {}
@ -866,16 +873,18 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
except JobLaunchConfig.DoesNotExist:
return None
def create_config_from_prompts(self, kwargs):
def create_config_from_prompts(self, kwargs, parent=None):
'''
Create a launch configuration entry for this job, given prompts
returns None if it can not be created
'''
if self.unified_job_template is None:
return None
JobLaunchConfig = self._meta.get_field('launch_config').related_model
config = JobLaunchConfig(job=self)
valid_fields = self.unified_job_template.get_ask_mapping().keys()
if parent is None:
parent = getattr(self, self._get_parent_field_name())
if parent is None:
return
valid_fields = parent.get_ask_mapping().keys()
# Special cases allowed for workflows
if hasattr(self, 'extra_vars'):
valid_fields.extend(['survey_passwords', 'extra_vars'])
@ -892,8 +901,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
setattr(config, key, value)
config.save()
job_creds = (set(kwargs.get('credentials', [])) -
set(self.unified_job_template.credentials.all()))
job_creds = set(kwargs.get('credentials', []))
if 'credentials' in [field.name for field in parent._meta.get_fields()]:
job_creds = job_creds - set(parent.credentials.all())
if job_creds:
config.credentials.add(*job_creds)
return config

View File

@ -9,6 +9,7 @@ import logging
from django.db import models
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ObjectDoesNotExist
#from django import settings as tower_settings
# AWX
@ -206,13 +207,24 @@ class WorkflowJobNode(WorkflowNodeBase):
workflow_pk=self.pk,
error_text=errors))
data.update(accepted_fields) # missing fields are handled in the scheduler
try:
# config saved on the workflow job itself
wj_config = self.workflow_job.launch_config
except ObjectDoesNotExist:
wj_config = None
if wj_config:
accepted_fields, ignored_fields, errors = ujt_obj._accept_or_ignore_job_kwargs(**wj_config.prompts_dict())
accepted_fields.pop('extra_vars', None) # merge handled with other extra_vars later
data.update(accepted_fields)
# build ancestor artifacts, save them to node model for later
aa_dict = {}
is_root_node = True
for parent_node in self.get_parent_nodes():
is_root_node = False
aa_dict.update(parent_node.ancestor_artifacts)
if parent_node.job and hasattr(parent_node.job, 'artifacts'):
aa_dict.update(parent_node.job.artifacts)
if aa_dict:
if aa_dict and not is_root_node:
self.ancestor_artifacts = aa_dict
self.save(update_fields=['ancestor_artifacts'])
# process password list
@ -240,6 +252,12 @@ class WorkflowJobNode(WorkflowNodeBase):
data['extra_vars'] = extra_vars
# ensure that unified jobs created by WorkflowJobs are marked
data['_eager_fields'] = {'launch_type': 'workflow'}
# Extra processing in the case that this is a slice job
if 'job_slice' in self.ancestor_artifacts and is_root_node:
data['_eager_fields']['allow_simultaneous'] = True
data['_eager_fields']['job_slice_number'] = self.ancestor_artifacts['job_slice']
data['_eager_fields']['job_slice_count'] = self.workflow_job.workflow_job_nodes.count()
data['_prevent_slicing'] = True
return data
@ -261,6 +279,12 @@ class WorkflowJobOptions(BaseModel):
def workflow_nodes(self):
raise NotImplementedError()
@classmethod
def _get_unified_job_field_names(cls):
return set(f.name for f in WorkflowJobOptions._meta.fields) | set(
['name', 'description', 'schedule', 'survey_passwords', 'labels']
)
def _create_workflow_nodes(self, old_node_list, user=None):
node_links = {}
for old_node in old_node_list:
@ -288,7 +312,7 @@ class WorkflowJobOptions(BaseModel):
def create_relaunch_workflow_job(self):
new_workflow_job = self.copy_unified_job()
if self.workflow_job_template is None:
if self.unified_job_template_id is None:
new_workflow_job.copy_nodes_from_original(original=self)
return new_workflow_job
@ -331,12 +355,6 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl
def _get_unified_job_class(cls):
return WorkflowJob
@classmethod
def _get_unified_job_field_names(cls):
return set(f.name for f in WorkflowJobOptions._meta.fields) | set(
['name', 'description', 'schedule', 'survey_passwords', 'labels']
)
@classmethod
def _get_unified_jt_copy_names(cls):
base_list = super(WorkflowJobTemplate, cls)._get_unified_jt_copy_names()
@ -433,13 +451,28 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
default=None,
on_delete=models.SET_NULL,
)
job_template = models.ForeignKey(
'JobTemplate',
related_name='slice_workflow_jobs',
blank=True,
null=True,
default=None,
on_delete=models.SET_NULL,
help_text=_("If automatically created for a sliced job run, the job template "
"the workflow job was created from."),
)
is_sliced_job = models.BooleanField(
default=False
)
@property
def workflow_nodes(self):
return self.workflow_job_nodes
@classmethod
def _get_parent_field_name(cls):
def _get_parent_field_name(self):
if self.job_template_id:
# This is a workflow job which is a container for slice jobs
return 'job_template'
return 'workflow_job_template'
@classmethod

View File

@ -121,7 +121,11 @@ class TaskManager():
spawn_node.save()
logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
if job._resources_sufficient_for_launch():
can_start = job.signal_start()
if workflow_job.start_args:
start_args = json.loads(decrypt_field(workflow_job, 'start_args'))
else:
start_args = {}
can_start = job.signal_start(**start_args)
if not can_start:
job.job_explanation = _("Job spawned from workflow could not start because it "
"was not in the right state or required manual credentials")
@ -147,7 +151,8 @@ class TaskManager():
if cancel_finished:
logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format)
workflow_job.status = 'canceled'
workflow_job.save()
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=['status', 'start_args'])
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
else:
is_done, has_failed = dag.is_workflow_done()
@ -155,8 +160,11 @@ class TaskManager():
continue
logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
result.append(workflow_job.id)
workflow_job.status = 'failed' if has_failed else 'successful'
workflow_job.save()
new_status = 'failed' if has_failed else 'successful'
logger.debug(six.text_type("Transitioning {} to {} status.").format(workflow_job.log_format, new_status))
workflow_job.status = new_status
workflow_job.start_args = '' # blank field to remove encrypted passwords
workflow_job.save(update_fields=['status', 'start_args'])
connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status))
return result
@ -419,7 +427,7 @@ class TaskManager():
logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format))
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = set([wf.workflow_job_template_id for wf in self.get_running_workflow_jobs()])
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
for task in pending_tasks:
self.process_dependencies(task, self.generate_dependencies(task))
if self.is_job_blocked(task):
@ -429,12 +437,12 @@ class TaskManager():
found_acceptable_queue = False
idle_instance_that_fits = None
if isinstance(task, WorkflowJob):
if task.workflow_job_template_id in running_workflow_templates:
if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous:
logger.debug(six.text_type("{} is blocked from running, workflow already running").format(task.log_format))
continue
else:
running_workflow_templates.add(task.workflow_job_template_id)
running_workflow_templates.add(task.unified_job_template_id)
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue
for rampart_group in preferred_instance_groups:

View File

@ -825,7 +825,12 @@ class BaseTask(object):
return False
def build_inventory(self, instance, **kwargs):
json_data = json.dumps(instance.inventory.get_script_data(hostvars=True))
script_params = dict(hostvars=True)
if hasattr(instance, 'job_slice_number'):
script_params['slice_number'] = instance.job_slice_number
script_params['slice_count'] = instance.job_slice_count
script_data = instance.inventory.get_script_data(**script_params)
json_data = json.dumps(script_data)
handle, path = tempfile.mkstemp(dir=kwargs.get('private_data_dir', None))
f = os.fdopen(handle, 'w')
f.write('#! /usr/bin/env python\n# -*- coding: utf-8 -*-\nprint %r\n' % json_data)

View File

@ -122,6 +122,22 @@ def test_job_relaunch_on_failed_hosts(post, inventory, project, machine_credenti
assert r.data.get('limit') == hosts
@pytest.mark.django_db
def test_slice_jt_recent_jobs(slice_job_factory, admin_user, get):
workflow_job = slice_job_factory(3, spawn=True)
slice_jt = workflow_job.job_template
r = get(
url=slice_jt.get_absolute_url(),
user=admin_user,
expect=200
)
job_ids = [entry['id'] for entry in r.data['summary_fields']['recent_jobs']]
assert workflow_job.pk not in job_ids
for node in workflow_job.workflow_nodes.all():
job = node.job
assert job.pk in job_ids
@pytest.mark.django_db
def test_block_unprocessed_events(delete, admin_user, mocker):
time_of_finish = parse("Thu Feb 28 09:10:20 2013 -0500")

View File

@ -6,7 +6,7 @@ import json
from awx.api.serializers import JobLaunchSerializer
from awx.main.models.credential import Credential
from awx.main.models.inventory import Inventory, Host
from awx.main.models.jobs import Job, JobTemplate
from awx.main.models.jobs import Job, JobTemplate, UnifiedJobTemplate
from awx.api.versioning import reverse
@ -553,15 +553,15 @@ def test_callback_accept_prompted_extra_var(mocker, survey_spec_factory, job_tem
with mocker.patch('awx.main.access.BaseAccess.check_license'):
mock_job = mocker.MagicMock(spec=Job, id=968, extra_vars={"job_launch_var": 3, "survey_var": 4})
with mocker.patch.object(JobTemplate, 'create_unified_job', return_value=mock_job):
with mocker.patch.object(UnifiedJobTemplate, 'create_unified_job', return_value=mock_job):
with mocker.patch('awx.api.serializers.JobSerializer.to_representation', return_value={}):
with mocker.patch('awx.api.views.JobTemplateCallback.find_matching_hosts', return_value=[host]):
post(
reverse('api:job_template_callback', kwargs={'pk': job_template.pk}),
dict(extra_vars={"job_launch_var": 3, "survey_var": 4}, host_config_key="foo"),
admin_user, expect=201, format='json')
assert JobTemplate.create_unified_job.called
assert JobTemplate.create_unified_job.call_args == ({
assert UnifiedJobTemplate.create_unified_job.called
assert UnifiedJobTemplate.create_unified_job.call_args == ({
'extra_vars': {'survey_var': 4, 'job_launch_var': 3},
'_eager_fields': {'launch_type': 'callback'},
'limit': 'single-host'},
@ -579,15 +579,15 @@ def test_callback_ignore_unprompted_extra_var(mocker, survey_spec_factory, job_t
with mocker.patch('awx.main.access.BaseAccess.check_license'):
mock_job = mocker.MagicMock(spec=Job, id=968, extra_vars={"job_launch_var": 3, "survey_var": 4})
with mocker.patch.object(JobTemplate, 'create_unified_job', return_value=mock_job):
with mocker.patch.object(UnifiedJobTemplate, 'create_unified_job', return_value=mock_job):
with mocker.patch('awx.api.serializers.JobSerializer.to_representation', return_value={}):
with mocker.patch('awx.api.views.JobTemplateCallback.find_matching_hosts', return_value=[host]):
post(
reverse('api:job_template_callback', kwargs={'pk':job_template.pk}),
dict(extra_vars={"job_launch_var": 3, "survey_var": 4}, host_config_key="foo"),
admin_user, expect=201, format='json')
assert JobTemplate.create_unified_job.called
assert JobTemplate.create_unified_job.call_args == ({
assert UnifiedJobTemplate.create_unified_job.called
assert UnifiedJobTemplate.create_unified_job.call_args == ({
'_eager_fields': {'launch_type': 'callback'},
'limit': 'single-host'},
)

View File

@ -776,3 +776,37 @@ def sqlite_copy_expert(request):
def disable_database_settings(mocker):
m = mocker.patch('awx.conf.settings.SettingsWrapper.all_supported_settings', new_callable=PropertyMock)
m.return_value = []
@pytest.fixture
def slice_jt_factory(inventory):
def r(N, jt_kwargs=None):
for i in range(N):
inventory.hosts.create(name='foo{}'.format(i))
if not jt_kwargs:
jt_kwargs = {}
return JobTemplate.objects.create(
name='slice-jt-from-factory',
job_slice_count=N,
inventory=inventory,
**jt_kwargs
)
return r
@pytest.fixture
def slice_job_factory(slice_jt_factory):
def r(N, jt_kwargs=None, prompts=None, spawn=False):
slice_jt = slice_jt_factory(N, jt_kwargs=jt_kwargs)
if not prompts:
prompts = {}
slice_job = slice_jt.create_unified_job(**prompts)
if spawn:
for node in slice_job.workflow_nodes.all():
# does what the task manager does for spawning workflow jobs
kv = node.get_job_kwargs()
job = node.unified_job_template.create_unified_job(**kv)
node.job = job
node.save()
return slice_job
return r

View File

@ -38,6 +38,33 @@ class TestInventoryScript:
'remote_tower_id': host.id
}
def test_slice_subset(self, inventory):
for i in range(3):
inventory.hosts.create(name='host{}'.format(i))
for i in range(3):
assert inventory.get_script_data(slice_number=i + 1, slice_count=3) == {
'all': {'hosts': ['host{}'.format(i)]}
}
def test_slice_subset_with_groups(self, inventory):
hosts = []
for i in range(3):
host = inventory.hosts.create(name='host{}'.format(i))
hosts.append(host)
g1 = inventory.groups.create(name='contains_all_hosts')
for host in hosts:
g1.hosts.add(host)
g2 = inventory.groups.create(name='contains_two_hosts')
for host in hosts[:2]:
g2.hosts.add(host)
for i in range(3):
expected_data = {
'contains_all_hosts': {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}},
}
if i < 2:
expected_data['contains_two_hosts'] = {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}}
assert inventory.get_script_data(slice_number=i + 1, slice_count=3) == expected_data
@pytest.mark.django_db
class TestActiveCount:

View File

@ -1,7 +1,7 @@
import pytest
import six
from awx.main.models import JobTemplate, Job, JobHostSummary
from awx.main.models import JobTemplate, Job, JobHostSummary, WorkflowJob
from crum import impersonate
@ -81,3 +81,23 @@ def test_job_host_summary_representation(host):
jhs = JobHostSummary.objects.get(pk=jhs.id)
host.delete()
assert 'N/A changed=1 dark=2 failures=3 ok=4 processed=5 skipped=6' == six.text_type(jhs)
@pytest.mark.django_db
class TestSlicingModels:
def test_slice_workflow_spawn(self, slice_jt_factory):
slice_jt = slice_jt_factory(3)
job = slice_jt.create_unified_job()
assert isinstance(job, WorkflowJob)
assert job.job_template == slice_jt
assert job.unified_job_template == slice_jt
assert job.workflow_nodes.count() == 3
def test_slices_with_JT_and_prompts(self, slice_job_factory):
job = slice_job_factory(3, jt_kwargs={'ask_limit_on_launch': True}, prompts={'limit': 'foobar'}, spawn=True)
assert job.launch_config.prompts_dict() == {'limit': 'foobar'}
for node in job.workflow_nodes.all():
assert node.limit is None # data not saved in node prompts
job = node.job
assert job.limit == 'foobar'

View File

@ -58,9 +58,7 @@ class TestCreateUnifiedJob:
job_with_links.save()
job_with_links.credentials.add(machine_credential)
job_with_links.credentials.add(net_credential)
with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate._get_unified_job_field_names',
return_value=['inventory', 'credential', 'limit']):
second_job = job_with_links.copy_unified_job()
second_job = job_with_links.copy_unified_job()
# Check that job data matches the original variables
assert second_job.credential == job_with_links.credential
@ -154,3 +152,50 @@ def test_event_processing_not_finished():
def test_event_model_undefined():
wj = WorkflowJob.objects.create(name='foobar', status='finished')
assert wj.event_processing_finished
@pytest.mark.django_db
class TestTaskImpact:
@pytest.fixture
def job_host_limit(self, job_template, inventory):
def r(hosts, forks):
for i in range(hosts):
inventory.hosts.create(name='foo' + str(i))
job = Job.objects.create(
name='fake-job',
launch_type='workflow',
job_template=job_template,
inventory=inventory,
forks=forks
)
return job
return r
def test_limit_task_impact(self, job_host_limit):
job = job_host_limit(5, 2)
assert job.task_impact == 2 + 1 # forks becomes constraint
def test_host_task_impact(self, job_host_limit):
job = job_host_limit(3, 5)
assert job.task_impact == 3 + 1 # hosts becomes constraint
def test_shard_task_impact(self, slice_job_factory):
# factory creates on host per slice
workflow_job = slice_job_factory(3, jt_kwargs={'forks': 50}, spawn=True)
# arrange the jobs by their number
jobs = [None for i in range(3)]
for node in workflow_job.workflow_nodes.all():
jobs[node.job.job_slice_number - 1] = node.job
# Even distribution - all jobs run on 1 host
assert [
len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts'])
for i in range(3)
] == [1, 1, 1]
assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact
# Uneven distribution - first job takes the extra host
jobs[0].inventory.hosts.create(name='remainder_foo')
assert [
len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts'])
for i in range(3)
] == [2, 1, 1]
assert [job.task_impact for job in jobs] == [3, 2, 2]

View File

@ -3,6 +3,11 @@ import pytest
from awx.main.models.inventory import Inventory
from awx.main.models.credential import Credential
from awx.main.models.jobs import JobTemplate, Job
from awx.main.access import (
UnifiedJobAccess,
WorkflowJobAccess, WorkflowJobNodeAccess,
JobAccess
)
@pytest.mark.django_db
@ -43,6 +48,31 @@ def test_inventory_use_access(inventory, user):
assert common_user.can_access(Inventory, 'use', inventory)
@pytest.mark.django_db
def test_slice_job(slice_job_factory, rando):
workflow_job = slice_job_factory(2, jt_kwargs={'created_by': rando}, spawn=True)
workflow_job.job_template.execute_role.members.add(rando)
# Abilities of user with execute_role for slice workflow job container
assert WorkflowJobAccess(rando).can_start(workflow_job) # relaunch allowed
for access_cls in (UnifiedJobAccess, WorkflowJobAccess):
access = access_cls(rando)
assert access.can_read(workflow_job)
assert workflow_job in access.get_queryset()
# Abilities of user with execute_role for all the slice of the job
for node in workflow_job.workflow_nodes.all():
access = WorkflowJobNodeAccess(rando)
assert access.can_read(node)
assert node in access.get_queryset()
job = node.job
assert JobAccess(rando).can_start(job) # relaunch allowed
for access_cls in (UnifiedJobAccess, JobAccess):
access = access_cls(rando)
assert access.can_read(job)
assert job in access.get_queryset()
@pytest.mark.django_db
class TestJobRelaunchAccess:
@pytest.fixture

View File

@ -246,6 +246,8 @@ class TestJobExecution(object):
# If `Job.update_model` is called, we're not actually persisting
# to the database; just update the status, which is usually
# the update we care about for testing purposes
if kwargs.get('result_traceback'):
raise Exception('Task encountered error:\n{}'.format(kwargs['result_traceback']))
if 'status' in kwargs:
self.instance.status = kwargs['status']
if 'job_env' in kwargs:

View File

@ -1,12 +1,14 @@
import JobsStrings from './jobs.strings';
import jobsRoute from './routes/jobs.route';
import { jobsSchedulesRoute, jobsSchedulesEditRoute } from '../../src/scheduler/schedules.route';
import jobsListController from './jobsList.controller';
const MODULE_NAME = 'at.features.jobs';
angular
.module(MODULE_NAME, [])
.service('JobsStrings', JobsStrings)
.controller('jobsListController', jobsListController)
.run(['$stateExtender', ($stateExtender) => {
$stateExtender.addState(jobsRoute);
$stateExtender.addState(jobsSchedulesRoute);

View File

@ -76,6 +76,22 @@ function ListJobsController (
return { icon, link, value };
});
vm.getSliceJobDetails = (job) => {
if (!job.job_slice_count) {
return null;
}
if (job.job_slice_count === 1) {
return null;
}
if (job.job_slice_number && job.job_slice_count) {
return `Slice Job ${job.job_slice_number}/${job.job_slice_count}`;
}
return null;
};
vm.getSref = ({ type, id }) => {
let sref;

View File

@ -23,7 +23,8 @@
status-tip="{{ vm.strings.get('list.STATUS_TOOLTIP', job.status) }}"
header-value="{{ job.id }} - {{ job.name }}"
header-state="{{ vm.getSref(job) }}"
header-tag="{{ vm.jobTypes[job.type] }}">
header-tag="{{ vm.jobTypes[job.type] }}"
secondary-tag="{{ vm.getSliceJobDetails(job) }}">
</at-row-item>
<div class="at-Row--inline">
<at-row-item

View File

@ -1,5 +1,4 @@
import { N_ } from '../../../src/i18n';
import jobsListController from '../jobsList.controller';
import indexController from '../index.controller';
const indexTemplate = require('~features/jobs/index.view.html');
@ -69,7 +68,7 @@ export default {
},
'jobsList@jobs': {
templateUrl: jobsListTemplate,
controller: jobsListController,
controller: 'jobsListController',
controllerAs: 'vm'
}
}

View File

@ -126,6 +126,33 @@ function getSourceWorkflowJobDetails () {
return { link, tooltip };
}
function getSliceJobDetails () {
const count = resource.model.get('job_slice_count');
if (!count) {
return null;
}
if (count === 1) {
return null;
}
const number = resource.model.get('job_slice_number');
if (!number) {
return null;
}
const label = strings.get('labels.SLICE_JOB');
const offset = `${number}/${count}`;
const tooltip = strings.get('tooltips.SLICE_JOB_DETAILS');
if (label && offset && tooltip) {
return { label, offset, tooltip };
}
return null;
}
function getJobTemplateDetails () {
const jobTemplate = resource.model.get('summary_fields.job_template');
@ -671,6 +698,7 @@ function JobDetailsController (
vm.jobType = getJobTypeDetails();
vm.jobTemplate = getJobTemplateDetails();
vm.sourceWorkflowJob = getSourceWorkflowJobDetails();
vm.sliceJobDetails = getSliceJobDetails();
vm.inventory = getInventoryDetails();
vm.project = getProjectDetails();
vm.projectUpdate = getProjectUpdateDetails();

View File

@ -151,6 +151,12 @@
<div class="JobResults-resultRowText">{{ vm.jobType.value }}</div>
</div>
<!-- SLICE JOB DETAIL -->
<div class="JobResults-resultRow" ng-if="vm.sliceJobDetails">
<label class="JobResults-resultRowLabel">{{ vm.sliceJobDetails.label }}</label>
<div class="JobResults-resultRowText">{{ vm.sliceJobDetails.offset }}</div>
</div>
<!-- LAUNCHED BY DETAIL -->
<div class="JobResults-resultRow" ng-if="vm.launchedBy">
<label class="JobResults-resultRowLabel">{{ vm.launchedBy.label }}</label>

View File

@ -23,6 +23,7 @@ function OutputStrings (BaseString) {
EXTRA_VARS: t.s('Read-only view of extra variables added to the job template'),
INVENTORY: t.s('View the Inventory'),
JOB_TEMPLATE: t.s('View the Job Template'),
SLICE_JOB_DETAILS: t.s('Job is one of several from a JT that slices on inventory'),
PROJECT: t.s('View the Project'),
PROJECT_UPDATE: t.s('View Project checkout results'),
SCHEDULE: t.s('View the Schedule'),
@ -55,6 +56,7 @@ function OutputStrings (BaseString) {
JOB_EXPLANATION: t.s('Explanation'),
JOB_TAGS: t.s('Job Tags'),
JOB_TEMPLATE: t.s('Job Template'),
SLICE_JOB: t.s('Slice Job'),
JOB_TYPE: t.s('Job Type'),
LABELS: t.s('Labels'),
LAUNCHED_BY: t.s('Launched By'),

View File

@ -41,7 +41,13 @@ function atLaunchTemplateCtrl (
selectedJobTemplate
.postLaunch({ id: vm.template.id })
.then(({ data }) => {
$state.go('output', { id: data.job, type: 'playbook' }, { reload: true });
/* Slice Jobs: Redirect to WF Details page if returned
job type is a WF job */
if (data.type === 'workflow_job' && data.workflow_job !== null) {
$state.go('workflowResults', { id: data.workflow_job }, { reload: true });
} else {
$state.go('output', { id: data.job, type: 'playbook' }, { reload: true });
}
});
} else {
const promptData = {
@ -142,7 +148,13 @@ function atLaunchTemplateCtrl (
id: vm.promptData.template,
launchData: jobLaunchData
}).then((launchRes) => {
$state.go('output', { id: launchRes.data.job, type: 'playbook' }, { reload: true });
/* Slice Jobs: Redirect to WF Details page if returned
job type is a WF job */
if (launchRes.data.type === 'workflow_job' && launchRes.data.workflow_job !== null) {
$state.go('workflowResults', { id: launchRes.data.workflow_job }, { reload: true });
} else {
$state.go('output', { id: launchRes.data.job, type: 'playbook' }, { reload: true });
}
}).catch(createErrorHandler('launch job template', 'POST'));
} else if (vm.promptData.templateType === 'workflow_job_template') {
workflowTemplate.create().postLaunch({

View File

@ -197,7 +197,7 @@
color: @at-color-list-row-item-tag-primary;
}
.at-RowItem-tag--header {
.at-RowItem-tag--header, .at-RowItem-tag--secondary {
line-height: inherit;
}

View File

@ -13,6 +13,7 @@ function atRowItem () {
headerLink: '@',
headerState: '@',
headerTag: '@',
secondaryTag: '@',
status: '@',
statusTip: '@',
statusClick: '&?',

View File

@ -29,6 +29,9 @@
<div class="at-RowItem-tag at-RowItem-tag--header" ng-if="headerTag">
{{ headerTag }}
</div>
<div class="at-RowItem-tag at-RowItem-tag--secondary" ng-if="secondaryTag">
{{ secondaryTag }}
</div>
<div class="at-RowItem-label" ng-if="labelValue && labelLink">
<a ng-href="{{ labelLink }}">{{ labelValue }}</a>
</div>

View File

@ -257,6 +257,19 @@ function(NotificationsList, i18n) {
dataPlacement: 'right',
control: '<instance-groups-multiselect instance-groups="instance_groups" field-is-disabled="!(job_template_obj.summary_fields.user_capabilities.edit || canAddJobTemplate)"></instance-groups-multiselect>',
},
job_slice_count: {
label: i18n._('Job Slicing'),
type: 'number',
integer: true,
min: 1,
default: 1,
spinner: true,
dataTitle: i18n._('Slice Job Count'),
dataPlacement: 'right',
dataContainer: 'body',
awPopOver: "<p>" + i18n._("Divide the work done by this job template into the specified number of job slices, each running the same tasks against a portion of the inventory.") + "</p>",
ngDisabled: '!(job_template_obj.summary_fields.user_capabilities.edit || canAddJobTemplate)'
},
diff_mode: {
label: i18n._('Show Changes'),
type: 'toggleSwitch',

View File

@ -39,6 +39,7 @@ export default ['workflowData', 'workflowResultsService', 'workflowDataOptions',
DELETE: i18n._('Delete'),
EDIT_USER: i18n._('Edit the user'),
EDIT_WORKFLOW: i18n._('Edit the workflow job template'),
EDIT_SLICE_TEMPLATE: i18n._('Edit the slice job template'),
EDIT_SCHEDULE: i18n._('Edit the schedule'),
TOGGLE_STDOUT_FULLSCREEN: i18n._('Expand Output'),
STATUS: '' // re-assigned elsewhere
@ -49,6 +50,7 @@ export default ['workflowData', 'workflowResultsService', 'workflowDataOptions',
STARTED: i18n._('Started'),
FINISHED: i18n._('Finished'),
LABELS: i18n._('Labels'),
SLICE_TEMPLATE: i18n._('Slice Job Template'),
STATUS: i18n._('Status')
},
details: {
@ -109,6 +111,11 @@ export default ['workflowData', 'workflowResultsService', 'workflowDataOptions',
$scope.workflow_job_template_link = `/#/templates/workflow_job_template/${$scope.workflow.summary_fields.workflow_job_template.id}`;
}
if(workflowData.summary_fields && workflowData.summary_fields.job_template &&
workflowData.summary_fields.job_template.id){
$scope.slice_job_template_link = `/#/templates/job_template/${$scope.workflow.summary_fields.job_template.id}`;
}
// turn related api browser routes into front end routes
getLinks();

View File

@ -144,6 +144,22 @@
</div>
</div>
<!-- SLIIIIIICE -->
<div class="WorkflowResults-resultRow"
ng-show="workflow.summary_fields.job_template.name">
<label
class="WorkflowResults-resultRowLabel">
{{ strings.labels.SLICE_TEMPLATE }}
</label>
<div class="WorkflowResults-resultRowText">
<a href="{{ slice_job_template_link }}"
aw-tool-tip="{{ strings.tooltips.EDIT_SLICE_TEMPLATE }}"
data-placement="top">
{{ workflow.summary_fields.job_template.name }}
</a>
</div>
</div>
<!-- EXTRA VARIABLES DETAIL -->
<at-code-mirror
ng-if="variables"

View File

@ -6,4 +6,6 @@ import './file.unit';
import './layout.unit';
import './side-nav.unit';
import './side-nav-item.unit';
import './jobs-list-split-jobs.unit';
import './job-details-split-jobs.unit';

View File

@ -0,0 +1,185 @@
import moment from 'moment';
describe('View: Job Details', () => {
let JobDetails;
let scope;
let state;
let OutputStrings;
let Prompt;
let filter;
let ProcessErrors;
let Wait;
let httpBackend;
let ParseVariableString;
let subscribe;
let OutputStatusService;
let mockData = {
job_slice_count: 2,
job_slice_number: 2,
labels: {
SLICE_JOB: 'foo'
},
tooltips: {
SLICE_JOB_DETAILS: 'bar'
}
};
const resource = {
id: '147',
type: 'playbook',
model: {
get: (obj) => obj.split('.').reduce((i, o) => i && i[o] || null, mockData),
has: jasmine.createSpy('has'),
options: jasmine.createSpy('options'),
},
events: {},
ws: {}
};
beforeEach(angular.mock.module('at.features.output', ($provide) => {
state = {
params: {
job_search: {}
},
go: jasmine.createSpy('go'),
includes: jasmine.createSpy('includes')
};
OutputStrings = {
get: (obj) => obj.split('.').reduce((i, o) => i && i[o] || null, mockData),
};
OutputStatusService = {
subscribe: jasmine.createSpy('subscribe')
};
ProcessErrors = jasmine.createSpy('ProcessErrors');
Wait = jasmine.createSpy('Wait');
Prompt = jasmine.createSpy('Prompt');
$provide.value('state', state);
$provide.value('ProcessErrors', ProcessErrors);
$provide.value('Wait', Wait);
$provide.value('Prompt', Prompt);
$provide.value('OutputStrings', OutputStrings);
$provide.value('ParseVariableString', angular.noop);
$provide.value('OutputStatusService', OutputStatusService);
$provide.provider('$stateProvider', { $get: jasmine.createSpy('$get'), });
$provide.value('$stateExtender', { addState: jasmine.createSpy('addState'), });
$provide.value('$stateRegistry', { register: jasmine.createSpy('regster'), });
$provide.value('sanitizeFilter', angular.noop);
$provide.value('subscribe', subscribe);
$provide.value('moment', moment);
$provide.value('longDateFilter', angular.noop);
}));
beforeEach(angular.mock.inject((
$injector, $componentController, $rootScope,
$httpBackend, _state_, _OutputStrings_, _ParseVariableString_, _Prompt_,
_ProcessErrors_, _Wait_, _OutputStatusService_
) => {
scope = $rootScope.$new();
state = _state_;
OutputStrings = _OutputStrings_;
Prompt = _Prompt_;
filter = $injector.get('$filter');
ProcessErrors = _ProcessErrors_;
Wait = _Wait_;
ParseVariableString = _ParseVariableString_;
httpBackend = $httpBackend;
OutputStatusService = _OutputStatusService_;
JobDetails = $componentController('atJobDetails', {
$scope: scope,
$state: state,
OutputStrings,
ProcessErrors,
Wait,
Prompt,
$filter: filter,
ParseVariableString,
httpBackend,
OutputStatusService,
}, { resource });
JobDetails.$onInit();
}));
describe('JobDetails Component', () => {
it('is created successfully', () => {
expect(JobDetails).toBeDefined();
});
it('has method "sliceJobDetails"', () => {
expect(JobDetails.sliceJobDetails).toBeDefined();
});
describe('splitJobDetails method', () => {
it('returned values are strings', () => {
const result = JobDetails.sliceJobDetails;
const { label, offset, tooltip } = result;
expect(offset).toEqual('2/2');
expect(label).toEqual('foo');
expect(tooltip).toEqual('bar');
});
it('returns null if label, offset, or tooltip is undefined', () => {
mockData = {
job_slice_count: 2,
job_slice_number: 2,
labels: {
SLICE_JOB: null
},
tooltips: {
SLICE_JOB_DETAILS: null
}
};
JobDetails.$onInit();
const result = JobDetails.sliceJobDetails;
expect(result).toBeNull();
});
it('returns null if job_slice_count is undefined or null', () => {
mockData = {
job_slice_count: null,
job_slice_number: 2,
labels: {
SLICE_JOB: 'foo'
},
tooltips: {
SLICE_JOB_DETAILS: 'bar'
}
};
JobDetails.$onInit();
const result = JobDetails.sliceJobDetails;
expect(result).toBeNull();
});
it('returns null if job_slice_number is undefined or null', () => {
mockData = {
job_slice_count: 2,
job_slice_number: null,
labels: {
SLICE_JOB: 'foo'
},
tooltips: {
SLICE_JOB_DETAILS: 'bar'
}
};
JobDetails.$onInit();
const result = JobDetails.sliceJobDetails;
expect(result).toBeNull();
});
it('returns null if job is a non-sliced job', () => {
mockData = {
job_slice_count: 1,
job_slice_number: null,
labels: {
SLICE_JOB: 'foo'
},
tooltips: {
SLICE_JOB_DETAILS: 'bar'
}
};
JobDetails.$onInit();
const result = JobDetails.sliceJobDetails;
expect(result).toBeNull();
});
});
});
});

View File

@ -0,0 +1,127 @@
describe('View: Split Jobs List', () => {
let JobList;
let scope;
let state;
let Dataset;
let resolvedModels;
let JobsStrings;
let QuerySet;
let Prompt;
let filter;
let ProcessErrors;
let Wait;
let Rest;
let SearchBasePath;
beforeEach(angular.mock.module('at.features.jobs', ($provide) => {
Dataset = {
data: {
results: {}
}
};
state = {
params: {
job_search: {}
},
go: jasmine.createSpy('go'),
includes: jasmine.createSpy('includes')
};
resolvedModels = [
{
options: () => ['foo', 'bar'],
}
];
ProcessErrors = jasmine.createSpy('ProcessErrors');
Wait = jasmine.createSpy('Wait');
Prompt = jasmine.createSpy('Prompt');
$provide.value('state', state);
$provide.value('Dataset', Dataset);
$provide.value('resolvedModels', resolvedModels);
$provide.value('ProcessErrors', ProcessErrors);
$provide.value('Wait', Wait);
$provide.value('Prompt', Prompt);
$provide.value('Rest', angular.noop);
$provide.value('SearchBasePath', '');
$provide.value('JobsStrings', angular.noop);
$provide.value('QuerySet', angular.noop);
$provide.provider('$stateProvider', { $get: jasmine.createSpy('$get'), });
$provide.value('$stateExtender', { addState: jasmine.createSpy('addState'), });
}));
beforeEach(angular.mock.inject((
$controller, $rootScope, _state_, _Dataset_, _resolvedModels_, _JobsStrings_,
_QuerySet_, _Prompt_, _$filter_, _ProcessErrors_, _Wait_, _Rest_, _SearchBasePath_
) => {
scope = $rootScope.$new();
state = _state_;
Dataset = _Dataset_;
resolvedModels = _resolvedModels_;
JobsStrings = _JobsStrings_;
QuerySet = _QuerySet_;
Prompt = _Prompt_;
filter = _$filter_;
ProcessErrors = _ProcessErrors_;
Wait = _Wait_;
Rest = _Rest_;
SearchBasePath = _SearchBasePath_;
JobList = $controller('jobsListController', {
$scope: scope,
$state: state,
Dataset,
resolvedModels,
JobsStrings,
ProcessErrors,
QuerySet,
Wait,
Prompt,
$filter: filter,
Rest,
SearchBasePath,
});
}));
describe('JobList Controller', () => {
it('is created successfully', () => {
expect(JobList).toBeDefined();
});
it('has method "getSplitJobDetails"', () => {
expect(JobList.getSliceJobDetails).toBeDefined();
});
it('returns a string', () => {
const data = {
job_slice_number: 1,
job_slice_count: 2
};
const result = JobList.getSliceJobDetails(data);
expect(result).toEqual('Slice Job 1/2');
});
it('returns null when data is null', () => {
const data = {
job_slice_number: null,
job_slice_count: null
};
const result = JobList.getSliceJobDetails(data);
expect(result).toBeNull();
});
it('returns null when data is undefined', () => {
const data = {
job_slice_number: undefined,
job_slice_count: undefined
};
const result = JobList.getSliceJobDetails(data);
expect(result).toBeNull();
});
it('returns null when job is not a sliced job', () => {
const data = {
job_slice_number: null,
job_slice_count: 1
};
const result = JobList.getSliceJobDetails(data);
expect(result).toBeNull();
});
});
});

13
docs/job_slicing.md Normal file
View File

@ -0,0 +1,13 @@
# Job Slicing Overview
Ansible, by default, runs jobs from a single control instance. At best a single Ansible job can be sliced up on a single system via forks but this doesn't fully take advantage of AWX's ability to distribute work to multiple nodes in a cluster.
Job Slicing solves this by adding a Job Template field `job_slice_count`. This field specifies the number of **Jobs** to slice the Ansible run into. When this number is greater than 1 ``AWX`` will generate a **Workflow** from a **JobTemplate** instead of a **Job**. The **Inventory** will be distributed evenly amongst the slice jobs. The workflow job is then started and proceeds as though it were a normal workflow. The API will return either a **Job** resource (if `job_slice_count` < 2) or a **WorkflowJob** resource otherwise. Likewise, the UI will redirect to the appropriate screen to display the status of the run.
## Implications for Job execution
When jobs are sliced they can run on any Tower node and some may not run at the same time. Because of this, anything that relies on setting/sliced state (using modules such as ``set_fact``) will not work as expected. It's reasonable to expect that not all jobs will actually run at the same time (if there is not enough capacity in the system for example)
## Simultaneous Execution Behavior
By default Job Templates aren't normally configured to execute simultaneously (``allow_simultaneous`` must be checked). Slicing overrides this behavior and implies ``allow_simultaneous`` even if that setting is unchecked.