diff --git a/.gitignore b/.gitignore index 621101995d..f1463667a5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ + # Tags .tags .tags1 @@ -52,6 +53,7 @@ __pycache__ **/node_modules/** /tmp **/npm-debug.log* +**/package-lock.json # UI build flag files awx/ui/.deps_built diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 0d9867f49c..b86c4c5e4a 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3011,7 +3011,7 @@ class JobTemplateSerializer(JobTemplateMixin, UnifiedJobTemplateSerializer, JobO fields = ('*', 'host_config_key', 'ask_diff_mode_on_launch', 'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch', 'ask_job_type_on_launch', 'ask_verbosity_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch', 'survey_enabled', 'become_enabled', 'diff_mode', - 'allow_simultaneous', 'custom_virtualenv') + 'allow_simultaneous', 'custom_virtualenv', 'job_slice_count') def get_related(self, obj): res = super(JobTemplateSerializer, self).get_related(obj) @@ -3028,6 +3028,7 @@ class JobTemplateSerializer(JobTemplateMixin, UnifiedJobTemplateSerializer, JobO labels = self.reverse('api:job_template_label_list', kwargs={'pk': obj.pk}), object_roles = self.reverse('api:job_template_object_roles_list', kwargs={'pk': obj.pk}), instance_groups = self.reverse('api:job_template_instance_groups_list', kwargs={'pk': obj.pk}), + slice_workflow_jobs = self.reverse('api:job_template_slice_workflow_jobs_list', kwargs={'pk': obj.pk}), )) if self.version > 1: res['copy'] = self.reverse('api:job_template_copy', kwargs={'pk': obj.pk}) @@ -3123,7 +3124,7 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer): 'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch', 'ask_job_type_on_launch', 'ask_verbosity_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch', 'allow_simultaneous', 'artifacts', 'scm_revision', - 'instance_group', 'diff_mode') + 'instance_group', 'diff_mode', 'job_slice_number', 'job_slice_count') def get_related(self, obj): res = super(JobSerializer, self).get_related(obj) @@ -3590,6 +3591,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): class Meta: model = WorkflowJob fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', + 'job_template', 'is_sliced_job', '-execution_node', '-event_processing_finished', '-controller_node',) def get_related(self, obj): @@ -3598,6 +3600,8 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): res['workflow_job_template'] = self.reverse('api:workflow_job_template_detail', kwargs={'pk': obj.workflow_job_template.pk}) res['notifications'] = self.reverse('api:workflow_job_notifications_list', kwargs={'pk': obj.pk}) + if obj.job_template_id: + res['job_template'] = self.reverse('api:job_template_detail', kwargs={'pk': obj.job_template_id}) res['workflow_nodes'] = self.reverse('api:workflow_job_workflow_nodes_list', kwargs={'pk': obj.pk}) res['labels'] = self.reverse('api:workflow_job_label_list', kwargs={'pk': obj.pk}) res['activity_stream'] = self.reverse('api:workflow_job_activity_stream_list', kwargs={'pk': obj.pk}) diff --git a/awx/api/templates/api/inventory_script_view.md b/awx/api/templates/api/inventory_script_view.md index 07656c1eff..28126dcbbb 100644 --- a/awx/api/templates/api/inventory_script_view.md +++ b/awx/api/templates/api/inventory_script_view.md @@ -26,6 +26,9 @@ string of `?all=1` to return all hosts, including disabled ones. Specify a query string of `?towervars=1` to add variables to the hostvars of each host that specifies its enabled state and database ID. +Specify a query string of `?subset=slice2of5` to produce an inventory that +has a restricted number of hosts according to the rules of job slicing. + To apply multiple query strings, join them with the `&` character, like `?hostvars=1&all=1`. ## Host Response diff --git a/awx/api/urls/job_template.py b/awx/api/urls/job_template.py index b11dbf4fea..0b43575ba4 100644 --- a/awx/api/urls/job_template.py +++ b/awx/api/urls/job_template.py @@ -8,6 +8,7 @@ from awx.api.views import ( JobTemplateDetail, JobTemplateLaunch, JobTemplateJobsList, + JobTemplateSliceWorkflowJobsList, JobTemplateCallback, JobTemplateSchedulesList, JobTemplateSurveySpec, @@ -28,6 +29,7 @@ urls = [ url(r'^(?P[0-9]+)/$', JobTemplateDetail.as_view(), name='job_template_detail'), url(r'^(?P[0-9]+)/launch/$', JobTemplateLaunch.as_view(), name='job_template_launch'), url(r'^(?P[0-9]+)/jobs/$', JobTemplateJobsList.as_view(), name='job_template_jobs_list'), + url(r'^(?P[0-9]+)/slice_workflow_jobs/$', JobTemplateSliceWorkflowJobsList.as_view(), name='job_template_slice_workflow_jobs_list'), url(r'^(?P[0-9]+)/callback/$', JobTemplateCallback.as_view(), name='job_template_callback'), url(r'^(?P[0-9]+)/schedules/$', JobTemplateSchedulesList.as_view(), name='job_template_schedules_list'), url(r'^(?P[0-9]+)/survey_spec/$', JobTemplateSurveySpec.as_view(), name='job_template_survey_spec'), diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index e1af329953..71250e6a0b 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -2452,6 +2452,16 @@ class InventoryScriptView(RetrieveAPIView): hostvars = bool(request.query_params.get('hostvars', '')) towervars = bool(request.query_params.get('towervars', '')) show_all = bool(request.query_params.get('all', '')) + subset = request.query_params.get('subset', '') + if subset: + if not isinstance(subset, six.string_types): + raise ParseError(_('Inventory subset argument must be a string.')) + if subset.startswith('slice'): + slice_number, slice_count = Inventory.parse_slice_params(subset) + else: + raise ParseError(_('Subset does not use any supported syntax.')) + else: + slice_number, slice_count = 1, 1 if hostname: hosts_q = dict(name=hostname) if not show_all: @@ -2461,7 +2471,8 @@ class InventoryScriptView(RetrieveAPIView): return Response(obj.get_script_data( hostvars=hostvars, towervars=towervars, - show_all=show_all + show_all=show_all, + slice_number=slice_number, slice_count=slice_count )) @@ -2912,9 +2923,14 @@ class JobTemplateLaunch(RetrieveAPIView): return Response(data, status=status.HTTP_400_BAD_REQUEST) else: data = OrderedDict() - data['job'] = new_job.id - data['ignored_fields'] = self.sanitize_for_response(ignored_fields) - data.update(JobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job)) + if isinstance(new_job, WorkflowJob): + data['workflow_job'] = new_job.id + data['ignored_fields'] = self.sanitize_for_response(ignored_fields) + data.update(WorkflowJobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job)) + else: + data['job'] = new_job.id + data['ignored_fields'] = self.sanitize_for_response(ignored_fields) + data.update(JobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job)) headers = {'Location': new_job.get_absolute_url(request)} return Response(data, status=status.HTTP_201_CREATED, headers=headers) @@ -3362,6 +3378,7 @@ class JobTemplateCallback(GenericAPIView): if extra_vars is not None and job_template.ask_variables_on_launch: extra_vars_redacted, removed = extract_ansible_vars(extra_vars) kv['extra_vars'] = extra_vars_redacted + kv['_prevent_slicing'] = True # will only run against 1 host, so no point with transaction.atomic(): job = job_template.create_job(**kv) @@ -3393,6 +3410,15 @@ class JobTemplateJobsList(SubListCreateAPIView): return methods +class JobTemplateSliceWorkflowJobsList(SubListCreateAPIView): + + model = WorkflowJob + serializer_class = WorkflowJobListSerializer + parent_model = JobTemplate + relationship = 'slice_workflow_jobs' + parent_key = 'job_template' + + class JobTemplateInstanceGroupsList(SubListAttachDetachAPIView): model = InstanceGroup @@ -3685,6 +3711,8 @@ class WorkflowJobRelaunch(WorkflowsEnforcementMixin, GenericAPIView): def post(self, request, *args, **kwargs): obj = self.get_object() + if obj.is_sliced_job and not obj.job_template_id: + raise ParseError(_('Cannot relaunch slice workflow job orphaned from job template.')) new_workflow_job = obj.create_relaunch_workflow_job() new_workflow_job.signal_start() diff --git a/awx/main/access.py b/awx/main/access.py index 768545ce05..5c1ea23a3a 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1789,7 +1789,7 @@ class WorkflowJobNodeAccess(BaseAccess): def filtered_queryset(self): return self.model.objects.filter( - workflow_job__workflow_job_template__in=WorkflowJobTemplate.accessible_objects( + workflow_job__unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs( self.user, 'read_role')) @check_superuser @@ -1915,7 +1915,7 @@ class WorkflowJobAccess(BaseAccess): def filtered_queryset(self): return WorkflowJob.objects.filter( - workflow_job_template__in=WorkflowJobTemplate.accessible_objects( + unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs( self.user, 'read_role')) def can_add(self, data): @@ -1947,9 +1947,11 @@ class WorkflowJobAccess(BaseAccess): if self.user.is_superuser: return True - wfjt = obj.workflow_job_template + template = obj.workflow_job_template + if not template and obj.job_template_id: + template = obj.job_template # only superusers can relaunch orphans - if not wfjt: + if not template: return False # If job was launched by another user, it could have survey passwords @@ -1967,7 +1969,7 @@ class WorkflowJobAccess(BaseAccess): return False # execute permission to WFJT is mandatory for any relaunch - return (self.user in wfjt.execute_role) + return (self.user in template.execute_role) def can_recreate(self, obj): node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template') diff --git a/awx/main/migrations/0051_v340_job_slicing.py b/awx/main/migrations/0051_v340_job_slicing.py new file mode 100644 index 0000000000..0e5c8bd701 --- /dev/null +++ b/awx/main/migrations/0051_v340_job_slicing.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-10-15 16:21 +from __future__ import unicode_literals + +import awx.main.utils.polymorphic +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0050_v340_drop_celery_tables'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='job_slice_count', + field=models.PositiveIntegerField(blank=True, default=1, help_text='If ran as part of sliced jobs, the total number of slices. If 1, job is not part of a sliced job.'), + ), + migrations.AddField( + model_name='job', + name='job_slice_number', + field=models.PositiveIntegerField(blank=True, default=0, help_text='If part of a sliced job, the ID of the inventory slice operated on. If not part of sliced job, parameter is not used.'), + ), + migrations.AddField( + model_name='jobtemplate', + name='job_slice_count', + field=models.PositiveIntegerField(blank=True, default=1, help_text='The number of jobs to slice into at runtime. Will cause the Job Template to launch a workflow if value is greater than 1.'), + ), + migrations.AddField( + model_name='workflowjob', + name='is_sliced_job', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='workflowjob', + name='job_template', + field=models.ForeignKey(blank=True, default=None, help_text='If automatically created for a sliced job run, the job template the workflow job was created from.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='slice_workflow_jobs', to='main.JobTemplate'), + ), + migrations.AlterField( + model_name='unifiedjob', + name='unified_job_template', + field=models.ForeignKey(default=None, editable=False, null=True, on_delete=awx.main.utils.polymorphic.SET_NULL, related_name='unifiedjob_unified_jobs', to='main.UnifiedJobTemplate'), + ), + ] diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index 3549bb2a41..af9c519812 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -136,8 +136,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): else: return [] - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return '' @classmethod diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 175fa40236..c7b9254ada 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -19,6 +19,9 @@ from django.core.exceptions import ValidationError from django.utils.timezone import now from django.db.models import Q +# REST Framework +from rest_framework.exceptions import ParseError + # AWX from awx.api.versioning import reverse from awx.main.constants import CLOUD_PROVIDERS @@ -217,67 +220,87 @@ class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin): group_children.add(from_group_id) return group_children_map - def get_script_data(self, hostvars=False, towervars=False, show_all=False): - if show_all: - hosts_q = dict() - else: - hosts_q = dict(enabled=True) + @staticmethod + def parse_slice_params(slice_str): + m = re.match(r"slice(?P\d+)of(?P\d+)", slice_str) + if not m: + raise ParseError(_('Could not parse subset as slice specification.')) + number = int(m.group('number')) + step = int(m.group('step')) + if number > step: + raise ParseError(_('Slice number must be less than total number of slices.')) + elif number < 1: + raise ParseError(_('Slice number must be 1 or higher.')) + return (number, step) + + def get_script_data(self, hostvars=False, towervars=False, show_all=False, slice_number=1, slice_count=1): + hosts_kw = dict() + if not show_all: + hosts_kw['enabled'] = True + fetch_fields = ['name', 'id', 'variables'] + if towervars: + fetch_fields.append('enabled') + hosts = self.hosts.filter(**hosts_kw).order_by('name').only(*fetch_fields) + if slice_count > 1: + offset = slice_number - 1 + hosts = hosts[offset::slice_count] + data = dict() + all_group = data.setdefault('all', dict()) + all_hostnames = set(host.name for host in hosts) if self.variables_dict: - all_group = data.setdefault('all', dict()) all_group['vars'] = self.variables_dict + if self.kind == 'smart': - if len(self.hosts.all()) == 0: - return {} - else: - all_group = data.setdefault('all', dict()) - smart_hosts_qs = self.hosts.filter(**hosts_q).all() - smart_hosts = list(smart_hosts_qs.values_list('name', flat=True)) - all_group['hosts'] = smart_hosts + all_group['hosts'] = [host.name for host in hosts] else: - # Add hosts without a group to the all group. - groupless_hosts_qs = self.hosts.filter(groups__isnull=True, **hosts_q) - groupless_hosts = list(groupless_hosts_qs.values_list('name', flat=True)) - if groupless_hosts: - all_group = data.setdefault('all', dict()) - all_group['hosts'] = groupless_hosts + # Keep track of hosts that are members of a group + grouped_hosts = set([]) # Build in-memory mapping of groups and their hosts. - group_hosts_kw = dict(group__inventory_id=self.id, host__inventory_id=self.id) - if 'enabled' in hosts_q: - group_hosts_kw['host__enabled'] = hosts_q['enabled'] - group_hosts_qs = Group.hosts.through.objects.filter(**group_hosts_kw) - group_hosts_qs = group_hosts_qs.values_list('group_id', 'host_id', 'host__name') + group_hosts_qs = Group.hosts.through.objects.filter( + group__inventory_id=self.id, + host__inventory_id=self.id + ).values_list('group_id', 'host_id', 'host__name') group_hosts_map = {} for group_id, host_id, host_name in group_hosts_qs: + if host_name not in all_hostnames: + continue # host might not be in current shard group_hostnames = group_hosts_map.setdefault(group_id, []) group_hostnames.append(host_name) + grouped_hosts.add(host_name) # Build in-memory mapping of groups and their children. group_parents_qs = Group.parents.through.objects.filter( from_group__inventory_id=self.id, to_group__inventory_id=self.id, - ) - group_parents_qs = group_parents_qs.values_list('from_group_id', 'from_group__name', - 'to_group_id') + ).values_list('from_group_id', 'from_group__name', 'to_group_id') group_children_map = {} for from_group_id, from_group_name, to_group_id in group_parents_qs: group_children = group_children_map.setdefault(to_group_id, []) group_children.append(from_group_name) # Now use in-memory maps to build up group info. - for group in self.groups.all(): + for group in self.groups.only('name', 'id', 'variables'): group_info = dict() group_info['hosts'] = group_hosts_map.get(group.id, []) group_info['children'] = group_children_map.get(group.id, []) group_info['vars'] = group.variables_dict data[group.name] = group_info + # Add ungrouped hosts to all group + all_group['hosts'] = [host.name for host in hosts if host.name not in grouped_hosts] + + # Remove any empty groups + for group_name in list(data.keys()): + if not data.get(group_name, {}).get('hosts', []): + data.pop(group_name) + if hostvars: data.setdefault('_meta', dict()) data['_meta'].setdefault('hostvars', dict()) - for host in self.hosts.filter(**hosts_q): + for host in hosts: data['_meta']['hostvars'][host.name] = host.variables_dict if towervars: tower_dict = dict(remote_tower_enabled=str(host.enabled).lower(), @@ -1624,8 +1647,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, null=True ) - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'inventory_source' @classmethod diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index c5bad0c6c4..a0ea174bbe 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -277,6 +277,13 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour default=False, allows_field='credentials' ) + job_slice_count = models.PositiveIntegerField( + blank=True, + default=1, + help_text=_("The number of jobs to slice into at runtime. " + "Will cause the Job Template to launch a workflow if value is greater than 1."), + ) + admin_role = ImplicitRoleField( parent_role=['project.organization.job_template_admin_role', 'inventory.organization.job_template_admin_role'] ) @@ -295,7 +302,8 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour @classmethod def _get_unified_job_field_names(cls): return set(f.name for f in JobOptions._meta.fields) | set( - ['name', 'description', 'schedule', 'survey_passwords', 'labels', 'credentials'] + ['name', 'description', 'schedule', 'survey_passwords', 'labels', 'credentials', + 'job_slice_number', 'job_slice_count'] ) @property @@ -320,6 +328,31 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour ''' return self.create_unified_job(**kwargs) + def create_unified_job(self, **kwargs): + prevent_slicing = kwargs.pop('_prevent_slicing', False) + slice_event = bool(self.job_slice_count > 1 and (not prevent_slicing)) + if slice_event: + # A Slice Job Template will generate a WorkflowJob rather than a Job + from awx.main.models.workflow import WorkflowJobTemplate, WorkflowJobNode + kwargs['_unified_job_class'] = WorkflowJobTemplate._get_unified_job_class() + kwargs['_parent_field_name'] = "job_template" + kwargs.setdefault('_eager_fields', {}) + kwargs['_eager_fields']['is_sliced_job'] = True + job = super(JobTemplate, self).create_unified_job(**kwargs) + if slice_event: + try: + wj_config = job.launch_config + except JobLaunchConfig.DoesNotExist: + wj_config = JobLaunchConfig() + actual_inventory = wj_config.inventory if wj_config.inventory else self.inventory + for idx in xrange(min(self.job_slice_count, + actual_inventory.hosts.count())): + create_kwargs = dict(workflow_job=job, + unified_job_template=self, + ancestor_artifacts=dict(job_slice=idx + 1)) + WorkflowJobNode.objects.create(**create_kwargs) + return job + def get_absolute_url(self, request=None): return reverse('api:job_template_detail', kwargs={'pk': self.pk}, request=request) @@ -452,7 +485,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour RelatedJobsMixin ''' def _get_related_jobs(self): - return Job.objects.filter(job_template=self) + return UnifiedJob.objects.filter(unified_job_template=self) class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskManagerJobMixin): @@ -501,10 +534,21 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana on_delete=models.SET_NULL, help_text=_('The SCM Refresh task used to make sure the playbooks were available for the job run'), ) + job_slice_number = models.PositiveIntegerField( + blank=True, + default=0, + help_text=_("If part of a sliced job, the ID of the inventory slice operated on. " + "If not part of sliced job, parameter is not used."), + ) + job_slice_count = models.PositiveIntegerField( + blank=True, + default=1, + help_text=_("If ran as part of sliced jobs, the total number of slices. " + "If 1, job is not part of a sliced job."), + ) - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'job_template' @classmethod @@ -545,6 +589,15 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana def event_class(self): return JobEvent + def copy_unified_job(self, **new_prompts): + # Needed for job slice relaunch consistency, do no re-spawn workflow job + # target same slice as original job + new_prompts['_prevent_slicing'] = True + new_prompts.setdefault('_eager_fields', {}) + new_prompts['_eager_fields']['job_slice_number'] = self.job_slice_number + new_prompts['_eager_fields']['job_slice_count'] = self.job_slice_count + return super(Job, self).copy_unified_job(**new_prompts) + @property def ask_diff_mode_on_launch(self): if self.job_template is not None: @@ -638,6 +691,9 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana count_hosts = 2 else: count_hosts = Host.objects.filter(inventory__jobs__pk=self.pk).count() + if self.job_slice_count > 1: + # Integer division intentional + count_hosts = (count_hosts + self.job_slice_count - self.job_slice_number) / self.job_slice_count return min(count_hosts, 5 if self.forks == 0 else self.forks) + 1 @property diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 7f296376fa..3c283e2fd2 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -496,8 +496,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage default='check', ) - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'project' @classmethod diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index da3f43ad80..0c0d82d31e 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -309,13 +309,6 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio ''' raise NotImplementedError # Implement in subclass. - @classmethod - def _get_unified_job_field_names(cls): - ''' - Return field names that should be copied from template to new job. - ''' - raise NotImplementedError # Implement in subclass. - @property def notification_templates(self): ''' @@ -338,19 +331,33 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio unified_job_class = self._get_unified_job_class() fields = self._get_unified_job_field_names() - unallowed_fields = set(kwargs.keys()) - set(fields) - if unallowed_fields: - logger.warn('Fields {} are not allowed as overrides.'.format(unallowed_fields)) - map(kwargs.pop, unallowed_fields) + parent_field_name = None + if "_unified_job_class" in kwargs: + # Special case where spawned job is different type than usual + # Only used for slice jobs + unified_job_class = kwargs.pop("_unified_job_class") + fields = unified_job_class._get_unified_job_field_names() & fields + parent_field_name = kwargs.pop('_parent_field_name') - unified_job = copy_model_by_class(self, unified_job_class, fields, kwargs) + unallowed_fields = set(kwargs.keys()) - set(fields) + validated_kwargs = kwargs.copy() + if unallowed_fields: + if parent_field_name is None: + logger.warn(six.text_type('Fields {} are not allowed as overrides to spawn {} from {}.').format( + six.text_type(', ').join(unallowed_fields), unified_job, self + )) + map(validated_kwargs.pop, unallowed_fields) + + unified_job = copy_model_by_class(self, unified_job_class, fields, validated_kwargs) if eager_fields: for fd, val in eager_fields.items(): setattr(unified_job, fd, val) - # Set the unified job template back-link on the job - parent_field_name = unified_job_class._get_parent_field_name() + # NOTE: slice workflow jobs _get_parent_field_name method + # is not correct until this is set + if not parent_field_name: + parent_field_name = unified_job._get_parent_field_name() setattr(unified_job, parent_field_name, self) # For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction @@ -364,24 +371,25 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio unified_job.save() # Labels and credentials copied here - if kwargs.get('credentials'): + if validated_kwargs.get('credentials'): Credential = UnifiedJob._meta.get_field('credentials').related_model cred_dict = Credential.unique_dict(self.credentials.all()) - prompted_dict = Credential.unique_dict(kwargs['credentials']) + prompted_dict = Credential.unique_dict(validated_kwargs['credentials']) # combine prompted credentials with JT cred_dict.update(prompted_dict) - kwargs['credentials'] = [cred for cred in cred_dict.values()] + validated_kwargs['credentials'] = [cred for cred in cred_dict.values()] + kwargs['credentials'] = validated_kwargs['credentials'] from awx.main.signals import disable_activity_stream with disable_activity_stream(): - copy_m2m_relationships(self, unified_job, fields, kwargs=kwargs) + copy_m2m_relationships(self, unified_job, fields, kwargs=validated_kwargs) - if 'extra_vars' in kwargs: - unified_job.handle_extra_data(kwargs['extra_vars']) + if 'extra_vars' in validated_kwargs: + unified_job.handle_extra_data(validated_kwargs['extra_vars']) if not getattr(self, '_deprecated_credential_launch', False): # Create record of provided prompts for relaunch and rescheduling - unified_job.create_config_from_prompts(kwargs) + unified_job.create_config_from_prompts(kwargs, parent=self) return unified_job @@ -546,7 +554,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique default=None, editable=False, related_name='%(class)s_unified_jobs', - on_delete=models.SET_NULL, + on_delete=polymorphic.SET_NULL, ) launch_type = models.CharField( max_length=20, @@ -694,8 +702,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def supports_isolation(cls): return False - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'unified_job_template' # Override in subclasses. @classmethod @@ -828,7 +835,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique ''' unified_job_class = self.__class__ unified_jt_class = self._get_unified_job_template_class() - parent_field_name = unified_job_class._get_parent_field_name() + parent_field_name = self._get_parent_field_name() fields = unified_jt_class._get_unified_job_field_names() | set([parent_field_name]) create_data = {} @@ -866,16 +873,18 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique except JobLaunchConfig.DoesNotExist: return None - def create_config_from_prompts(self, kwargs): + def create_config_from_prompts(self, kwargs, parent=None): ''' Create a launch configuration entry for this job, given prompts returns None if it can not be created ''' - if self.unified_job_template is None: - return None JobLaunchConfig = self._meta.get_field('launch_config').related_model config = JobLaunchConfig(job=self) - valid_fields = self.unified_job_template.get_ask_mapping().keys() + if parent is None: + parent = getattr(self, self._get_parent_field_name()) + if parent is None: + return + valid_fields = parent.get_ask_mapping().keys() # Special cases allowed for workflows if hasattr(self, 'extra_vars'): valid_fields.extend(['survey_passwords', 'extra_vars']) @@ -892,8 +901,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique setattr(config, key, value) config.save() - job_creds = (set(kwargs.get('credentials', [])) - - set(self.unified_job_template.credentials.all())) + job_creds = set(kwargs.get('credentials', [])) + if 'credentials' in [field.name for field in parent._meta.get_fields()]: + job_creds = job_creds - set(parent.credentials.all()) if job_creds: config.credentials.add(*job_creds) return config diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 357dd9eeb0..d02e3f6057 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -9,6 +9,7 @@ import logging from django.db import models from django.conf import settings from django.utils.translation import ugettext_lazy as _ +from django.core.exceptions import ObjectDoesNotExist #from django import settings as tower_settings # AWX @@ -206,13 +207,24 @@ class WorkflowJobNode(WorkflowNodeBase): workflow_pk=self.pk, error_text=errors)) data.update(accepted_fields) # missing fields are handled in the scheduler + try: + # config saved on the workflow job itself + wj_config = self.workflow_job.launch_config + except ObjectDoesNotExist: + wj_config = None + if wj_config: + accepted_fields, ignored_fields, errors = ujt_obj._accept_or_ignore_job_kwargs(**wj_config.prompts_dict()) + accepted_fields.pop('extra_vars', None) # merge handled with other extra_vars later + data.update(accepted_fields) # build ancestor artifacts, save them to node model for later aa_dict = {} + is_root_node = True for parent_node in self.get_parent_nodes(): + is_root_node = False aa_dict.update(parent_node.ancestor_artifacts) if parent_node.job and hasattr(parent_node.job, 'artifacts'): aa_dict.update(parent_node.job.artifacts) - if aa_dict: + if aa_dict and not is_root_node: self.ancestor_artifacts = aa_dict self.save(update_fields=['ancestor_artifacts']) # process password list @@ -240,6 +252,12 @@ class WorkflowJobNode(WorkflowNodeBase): data['extra_vars'] = extra_vars # ensure that unified jobs created by WorkflowJobs are marked data['_eager_fields'] = {'launch_type': 'workflow'} + # Extra processing in the case that this is a slice job + if 'job_slice' in self.ancestor_artifacts and is_root_node: + data['_eager_fields']['allow_simultaneous'] = True + data['_eager_fields']['job_slice_number'] = self.ancestor_artifacts['job_slice'] + data['_eager_fields']['job_slice_count'] = self.workflow_job.workflow_job_nodes.count() + data['_prevent_slicing'] = True return data @@ -261,6 +279,12 @@ class WorkflowJobOptions(BaseModel): def workflow_nodes(self): raise NotImplementedError() + @classmethod + def _get_unified_job_field_names(cls): + return set(f.name for f in WorkflowJobOptions._meta.fields) | set( + ['name', 'description', 'schedule', 'survey_passwords', 'labels'] + ) + def _create_workflow_nodes(self, old_node_list, user=None): node_links = {} for old_node in old_node_list: @@ -288,7 +312,7 @@ class WorkflowJobOptions(BaseModel): def create_relaunch_workflow_job(self): new_workflow_job = self.copy_unified_job() - if self.workflow_job_template is None: + if self.unified_job_template_id is None: new_workflow_job.copy_nodes_from_original(original=self) return new_workflow_job @@ -331,12 +355,6 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl def _get_unified_job_class(cls): return WorkflowJob - @classmethod - def _get_unified_job_field_names(cls): - return set(f.name for f in WorkflowJobOptions._meta.fields) | set( - ['name', 'description', 'schedule', 'survey_passwords', 'labels'] - ) - @classmethod def _get_unified_jt_copy_names(cls): base_list = super(WorkflowJobTemplate, cls)._get_unified_jt_copy_names() @@ -433,13 +451,28 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio default=None, on_delete=models.SET_NULL, ) + job_template = models.ForeignKey( + 'JobTemplate', + related_name='slice_workflow_jobs', + blank=True, + null=True, + default=None, + on_delete=models.SET_NULL, + help_text=_("If automatically created for a sliced job run, the job template " + "the workflow job was created from."), + ) + is_sliced_job = models.BooleanField( + default=False + ) @property def workflow_nodes(self): return self.workflow_job_nodes - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): + if self.job_template_id: + # This is a workflow job which is a container for slice jobs + return 'job_template' return 'workflow_job_template' @classmethod diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 08cb6cd247..863352857c 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -121,7 +121,11 @@ class TaskManager(): spawn_node.save() logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) if job._resources_sufficient_for_launch(): - can_start = job.signal_start() + if workflow_job.start_args: + start_args = json.loads(decrypt_field(workflow_job, 'start_args')) + else: + start_args = {} + can_start = job.signal_start(**start_args) if not can_start: job.job_explanation = _("Job spawned from workflow could not start because it " "was not in the right state or required manual credentials") @@ -147,7 +151,8 @@ class TaskManager(): if cancel_finished: logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) workflow_job.status = 'canceled' - workflow_job.save() + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=['status', 'start_args']) connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) else: is_done, has_failed = dag.is_workflow_done() @@ -155,8 +160,11 @@ class TaskManager(): continue logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') result.append(workflow_job.id) - workflow_job.status = 'failed' if has_failed else 'successful' - workflow_job.save() + new_status = 'failed' if has_failed else 'successful' + logger.debug(six.text_type("Transitioning {} to {} status.").format(workflow_job.log_format, new_status)) + workflow_job.status = new_status + workflow_job.start_args = '' # blank field to remove encrypted passwords + workflow_job.save(update_fields=['status', 'start_args']) connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) return result @@ -419,7 +427,7 @@ class TaskManager(): logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format)) def process_pending_tasks(self, pending_tasks): - running_workflow_templates = set([wf.workflow_job_template_id for wf in self.get_running_workflow_jobs()]) + running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()]) for task in pending_tasks: self.process_dependencies(task, self.generate_dependencies(task)) if self.is_job_blocked(task): @@ -429,12 +437,12 @@ class TaskManager(): found_acceptable_queue = False idle_instance_that_fits = None if isinstance(task, WorkflowJob): - if task.workflow_job_template_id in running_workflow_templates: + if task.unified_job_template_id in running_workflow_templates: if not task.allow_simultaneous: logger.debug(six.text_type("{} is blocked from running, workflow already running").format(task.log_format)) continue else: - running_workflow_templates.add(task.workflow_job_template_id) + running_workflow_templates.add(task.unified_job_template_id) self.start_task(task, None, task.get_jobs_fail_chain(), None) continue for rampart_group in preferred_instance_groups: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0fdbef8036..7106b68a25 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -825,7 +825,12 @@ class BaseTask(object): return False def build_inventory(self, instance, **kwargs): - json_data = json.dumps(instance.inventory.get_script_data(hostvars=True)) + script_params = dict(hostvars=True) + if hasattr(instance, 'job_slice_number'): + script_params['slice_number'] = instance.job_slice_number + script_params['slice_count'] = instance.job_slice_count + script_data = instance.inventory.get_script_data(**script_params) + json_data = json.dumps(script_data) handle, path = tempfile.mkstemp(dir=kwargs.get('private_data_dir', None)) f = os.fdopen(handle, 'w') f.write('#! /usr/bin/env python\n# -*- coding: utf-8 -*-\nprint %r\n' % json_data) diff --git a/awx/main/tests/functional/api/test_job.py b/awx/main/tests/functional/api/test_job.py index 8cd26e71ba..d635a35e0f 100644 --- a/awx/main/tests/functional/api/test_job.py +++ b/awx/main/tests/functional/api/test_job.py @@ -122,6 +122,22 @@ def test_job_relaunch_on_failed_hosts(post, inventory, project, machine_credenti assert r.data.get('limit') == hosts +@pytest.mark.django_db +def test_slice_jt_recent_jobs(slice_job_factory, admin_user, get): + workflow_job = slice_job_factory(3, spawn=True) + slice_jt = workflow_job.job_template + r = get( + url=slice_jt.get_absolute_url(), + user=admin_user, + expect=200 + ) + job_ids = [entry['id'] for entry in r.data['summary_fields']['recent_jobs']] + assert workflow_job.pk not in job_ids + for node in workflow_job.workflow_nodes.all(): + job = node.job + assert job.pk in job_ids + + @pytest.mark.django_db def test_block_unprocessed_events(delete, admin_user, mocker): time_of_finish = parse("Thu Feb 28 09:10:20 2013 -0500") diff --git a/awx/main/tests/functional/api/test_job_runtime_params.py b/awx/main/tests/functional/api/test_job_runtime_params.py index cf9c5c8286..4bd46e5cdf 100644 --- a/awx/main/tests/functional/api/test_job_runtime_params.py +++ b/awx/main/tests/functional/api/test_job_runtime_params.py @@ -6,7 +6,7 @@ import json from awx.api.serializers import JobLaunchSerializer from awx.main.models.credential import Credential from awx.main.models.inventory import Inventory, Host -from awx.main.models.jobs import Job, JobTemplate +from awx.main.models.jobs import Job, JobTemplate, UnifiedJobTemplate from awx.api.versioning import reverse @@ -553,15 +553,15 @@ def test_callback_accept_prompted_extra_var(mocker, survey_spec_factory, job_tem with mocker.patch('awx.main.access.BaseAccess.check_license'): mock_job = mocker.MagicMock(spec=Job, id=968, extra_vars={"job_launch_var": 3, "survey_var": 4}) - with mocker.patch.object(JobTemplate, 'create_unified_job', return_value=mock_job): + with mocker.patch.object(UnifiedJobTemplate, 'create_unified_job', return_value=mock_job): with mocker.patch('awx.api.serializers.JobSerializer.to_representation', return_value={}): with mocker.patch('awx.api.views.JobTemplateCallback.find_matching_hosts', return_value=[host]): post( reverse('api:job_template_callback', kwargs={'pk': job_template.pk}), dict(extra_vars={"job_launch_var": 3, "survey_var": 4}, host_config_key="foo"), admin_user, expect=201, format='json') - assert JobTemplate.create_unified_job.called - assert JobTemplate.create_unified_job.call_args == ({ + assert UnifiedJobTemplate.create_unified_job.called + assert UnifiedJobTemplate.create_unified_job.call_args == ({ 'extra_vars': {'survey_var': 4, 'job_launch_var': 3}, '_eager_fields': {'launch_type': 'callback'}, 'limit': 'single-host'}, @@ -579,15 +579,15 @@ def test_callback_ignore_unprompted_extra_var(mocker, survey_spec_factory, job_t with mocker.patch('awx.main.access.BaseAccess.check_license'): mock_job = mocker.MagicMock(spec=Job, id=968, extra_vars={"job_launch_var": 3, "survey_var": 4}) - with mocker.patch.object(JobTemplate, 'create_unified_job', return_value=mock_job): + with mocker.patch.object(UnifiedJobTemplate, 'create_unified_job', return_value=mock_job): with mocker.patch('awx.api.serializers.JobSerializer.to_representation', return_value={}): with mocker.patch('awx.api.views.JobTemplateCallback.find_matching_hosts', return_value=[host]): post( reverse('api:job_template_callback', kwargs={'pk':job_template.pk}), dict(extra_vars={"job_launch_var": 3, "survey_var": 4}, host_config_key="foo"), admin_user, expect=201, format='json') - assert JobTemplate.create_unified_job.called - assert JobTemplate.create_unified_job.call_args == ({ + assert UnifiedJobTemplate.create_unified_job.called + assert UnifiedJobTemplate.create_unified_job.call_args == ({ '_eager_fields': {'launch_type': 'callback'}, 'limit': 'single-host'}, ) diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 459471b6e8..1b90225adb 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -776,3 +776,37 @@ def sqlite_copy_expert(request): def disable_database_settings(mocker): m = mocker.patch('awx.conf.settings.SettingsWrapper.all_supported_settings', new_callable=PropertyMock) m.return_value = [] + + +@pytest.fixture +def slice_jt_factory(inventory): + def r(N, jt_kwargs=None): + for i in range(N): + inventory.hosts.create(name='foo{}'.format(i)) + if not jt_kwargs: + jt_kwargs = {} + return JobTemplate.objects.create( + name='slice-jt-from-factory', + job_slice_count=N, + inventory=inventory, + **jt_kwargs + ) + return r + + +@pytest.fixture +def slice_job_factory(slice_jt_factory): + def r(N, jt_kwargs=None, prompts=None, spawn=False): + slice_jt = slice_jt_factory(N, jt_kwargs=jt_kwargs) + if not prompts: + prompts = {} + slice_job = slice_jt.create_unified_job(**prompts) + if spawn: + for node in slice_job.workflow_nodes.all(): + # does what the task manager does for spawning workflow jobs + kv = node.get_job_kwargs() + job = node.unified_job_template.create_unified_job(**kv) + node.job = job + node.save() + return slice_job + return r diff --git a/awx/main/tests/functional/models/test_inventory.py b/awx/main/tests/functional/models/test_inventory.py index 57365b914b..97cf1cb0a0 100644 --- a/awx/main/tests/functional/models/test_inventory.py +++ b/awx/main/tests/functional/models/test_inventory.py @@ -38,6 +38,33 @@ class TestInventoryScript: 'remote_tower_id': host.id } + def test_slice_subset(self, inventory): + for i in range(3): + inventory.hosts.create(name='host{}'.format(i)) + for i in range(3): + assert inventory.get_script_data(slice_number=i + 1, slice_count=3) == { + 'all': {'hosts': ['host{}'.format(i)]} + } + + def test_slice_subset_with_groups(self, inventory): + hosts = [] + for i in range(3): + host = inventory.hosts.create(name='host{}'.format(i)) + hosts.append(host) + g1 = inventory.groups.create(name='contains_all_hosts') + for host in hosts: + g1.hosts.add(host) + g2 = inventory.groups.create(name='contains_two_hosts') + for host in hosts[:2]: + g2.hosts.add(host) + for i in range(3): + expected_data = { + 'contains_all_hosts': {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}}, + } + if i < 2: + expected_data['contains_two_hosts'] = {'hosts': ['host{}'.format(i)], 'children': [], 'vars': {}} + assert inventory.get_script_data(slice_number=i + 1, slice_count=3) == expected_data + @pytest.mark.django_db class TestActiveCount: diff --git a/awx/main/tests/functional/models/test_job.py b/awx/main/tests/functional/models/test_job.py index 013f73ca39..e64acafd2b 100644 --- a/awx/main/tests/functional/models/test_job.py +++ b/awx/main/tests/functional/models/test_job.py @@ -1,7 +1,7 @@ import pytest import six -from awx.main.models import JobTemplate, Job, JobHostSummary +from awx.main.models import JobTemplate, Job, JobHostSummary, WorkflowJob from crum import impersonate @@ -81,3 +81,23 @@ def test_job_host_summary_representation(host): jhs = JobHostSummary.objects.get(pk=jhs.id) host.delete() assert 'N/A changed=1 dark=2 failures=3 ok=4 processed=5 skipped=6' == six.text_type(jhs) + + +@pytest.mark.django_db +class TestSlicingModels: + + def test_slice_workflow_spawn(self, slice_jt_factory): + slice_jt = slice_jt_factory(3) + job = slice_jt.create_unified_job() + assert isinstance(job, WorkflowJob) + assert job.job_template == slice_jt + assert job.unified_job_template == slice_jt + assert job.workflow_nodes.count() == 3 + + def test_slices_with_JT_and_prompts(self, slice_job_factory): + job = slice_job_factory(3, jt_kwargs={'ask_limit_on_launch': True}, prompts={'limit': 'foobar'}, spawn=True) + assert job.launch_config.prompts_dict() == {'limit': 'foobar'} + for node in job.workflow_nodes.all(): + assert node.limit is None # data not saved in node prompts + job = node.job + assert job.limit == 'foobar' diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index ff3af2439b..74e163c66e 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -58,9 +58,7 @@ class TestCreateUnifiedJob: job_with_links.save() job_with_links.credentials.add(machine_credential) job_with_links.credentials.add(net_credential) - with mocker.patch('awx.main.models.unified_jobs.UnifiedJobTemplate._get_unified_job_field_names', - return_value=['inventory', 'credential', 'limit']): - second_job = job_with_links.copy_unified_job() + second_job = job_with_links.copy_unified_job() # Check that job data matches the original variables assert second_job.credential == job_with_links.credential @@ -154,3 +152,50 @@ def test_event_processing_not_finished(): def test_event_model_undefined(): wj = WorkflowJob.objects.create(name='foobar', status='finished') assert wj.event_processing_finished + + +@pytest.mark.django_db +class TestTaskImpact: + @pytest.fixture + def job_host_limit(self, job_template, inventory): + def r(hosts, forks): + for i in range(hosts): + inventory.hosts.create(name='foo' + str(i)) + job = Job.objects.create( + name='fake-job', + launch_type='workflow', + job_template=job_template, + inventory=inventory, + forks=forks + ) + return job + return r + + def test_limit_task_impact(self, job_host_limit): + job = job_host_limit(5, 2) + assert job.task_impact == 2 + 1 # forks becomes constraint + + def test_host_task_impact(self, job_host_limit): + job = job_host_limit(3, 5) + assert job.task_impact == 3 + 1 # hosts becomes constraint + + def test_shard_task_impact(self, slice_job_factory): + # factory creates on host per slice + workflow_job = slice_job_factory(3, jt_kwargs={'forks': 50}, spawn=True) + # arrange the jobs by their number + jobs = [None for i in range(3)] + for node in workflow_job.workflow_nodes.all(): + jobs[node.job.job_slice_number - 1] = node.job + # Even distribution - all jobs run on 1 host + assert [ + len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) + for i in range(3) + ] == [1, 1, 1] + assert [job.task_impact for job in jobs] == [2, 2, 2] # plus one base task impact + # Uneven distribution - first job takes the extra host + jobs[0].inventory.hosts.create(name='remainder_foo') + assert [ + len(jobs[0].inventory.get_script_data(slice_number=i + 1, slice_count=3)['all']['hosts']) + for i in range(3) + ] == [2, 1, 1] + assert [job.task_impact for job in jobs] == [3, 2, 2] diff --git a/awx/main/tests/functional/test_rbac_job_start.py b/awx/main/tests/functional/test_rbac_job_start.py index 60c35e0803..6748b3df5d 100644 --- a/awx/main/tests/functional/test_rbac_job_start.py +++ b/awx/main/tests/functional/test_rbac_job_start.py @@ -3,6 +3,11 @@ import pytest from awx.main.models.inventory import Inventory from awx.main.models.credential import Credential from awx.main.models.jobs import JobTemplate, Job +from awx.main.access import ( + UnifiedJobAccess, + WorkflowJobAccess, WorkflowJobNodeAccess, + JobAccess +) @pytest.mark.django_db @@ -43,6 +48,31 @@ def test_inventory_use_access(inventory, user): assert common_user.can_access(Inventory, 'use', inventory) +@pytest.mark.django_db +def test_slice_job(slice_job_factory, rando): + workflow_job = slice_job_factory(2, jt_kwargs={'created_by': rando}, spawn=True) + workflow_job.job_template.execute_role.members.add(rando) + + # Abilities of user with execute_role for slice workflow job container + assert WorkflowJobAccess(rando).can_start(workflow_job) # relaunch allowed + for access_cls in (UnifiedJobAccess, WorkflowJobAccess): + access = access_cls(rando) + assert access.can_read(workflow_job) + assert workflow_job in access.get_queryset() + + # Abilities of user with execute_role for all the slice of the job + for node in workflow_job.workflow_nodes.all(): + access = WorkflowJobNodeAccess(rando) + assert access.can_read(node) + assert node in access.get_queryset() + job = node.job + assert JobAccess(rando).can_start(job) # relaunch allowed + for access_cls in (UnifiedJobAccess, JobAccess): + access = access_cls(rando) + assert access.can_read(job) + assert job in access.get_queryset() + + @pytest.mark.django_db class TestJobRelaunchAccess: @pytest.fixture diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 050b9c8b07..578b9f7eda 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -246,6 +246,8 @@ class TestJobExecution(object): # If `Job.update_model` is called, we're not actually persisting # to the database; just update the status, which is usually # the update we care about for testing purposes + if kwargs.get('result_traceback'): + raise Exception('Task encountered error:\n{}'.format(kwargs['result_traceback'])) if 'status' in kwargs: self.instance.status = kwargs['status'] if 'job_env' in kwargs: diff --git a/awx/ui/client/features/jobs/index.js b/awx/ui/client/features/jobs/index.js index 8bb692f0c8..99d91515d5 100644 --- a/awx/ui/client/features/jobs/index.js +++ b/awx/ui/client/features/jobs/index.js @@ -1,12 +1,14 @@ import JobsStrings from './jobs.strings'; import jobsRoute from './routes/jobs.route'; import { jobsSchedulesRoute, jobsSchedulesEditRoute } from '../../src/scheduler/schedules.route'; +import jobsListController from './jobsList.controller'; const MODULE_NAME = 'at.features.jobs'; angular .module(MODULE_NAME, []) .service('JobsStrings', JobsStrings) + .controller('jobsListController', jobsListController) .run(['$stateExtender', ($stateExtender) => { $stateExtender.addState(jobsRoute); $stateExtender.addState(jobsSchedulesRoute); diff --git a/awx/ui/client/features/jobs/jobsList.controller.js b/awx/ui/client/features/jobs/jobsList.controller.js index 4833fdda9a..1a1943c6c4 100644 --- a/awx/ui/client/features/jobs/jobsList.controller.js +++ b/awx/ui/client/features/jobs/jobsList.controller.js @@ -76,6 +76,22 @@ function ListJobsController ( return { icon, link, value }; }); + vm.getSliceJobDetails = (job) => { + if (!job.job_slice_count) { + return null; + } + + if (job.job_slice_count === 1) { + return null; + } + + if (job.job_slice_number && job.job_slice_count) { + return `Slice Job ${job.job_slice_number}/${job.job_slice_count}`; + } + + return null; + }; + vm.getSref = ({ type, id }) => { let sref; diff --git a/awx/ui/client/features/jobs/jobsList.view.html b/awx/ui/client/features/jobs/jobsList.view.html index 580a35d8a7..783a734024 100644 --- a/awx/ui/client/features/jobs/jobsList.view.html +++ b/awx/ui/client/features/jobs/jobsList.view.html @@ -23,7 +23,8 @@ status-tip="{{ vm.strings.get('list.STATUS_TOOLTIP', job.status) }}" header-value="{{ job.id }} - {{ job.name }}" header-state="{{ vm.getSref(job) }}" - header-tag="{{ vm.jobTypes[job.type] }}"> + header-tag="{{ vm.jobTypes[job.type] }}" + secondary-tag="{{ vm.getSliceJobDetails(job) }}">
{{ vm.jobType.value }}
+ +
+ +
{{ vm.sliceJobDetails.offset }}
+
+
diff --git a/awx/ui/client/features/output/output.strings.js b/awx/ui/client/features/output/output.strings.js index 538b533cb0..53430984b8 100644 --- a/awx/ui/client/features/output/output.strings.js +++ b/awx/ui/client/features/output/output.strings.js @@ -23,6 +23,7 @@ function OutputStrings (BaseString) { EXTRA_VARS: t.s('Read-only view of extra variables added to the job template'), INVENTORY: t.s('View the Inventory'), JOB_TEMPLATE: t.s('View the Job Template'), + SLICE_JOB_DETAILS: t.s('Job is one of several from a JT that slices on inventory'), PROJECT: t.s('View the Project'), PROJECT_UPDATE: t.s('View Project checkout results'), SCHEDULE: t.s('View the Schedule'), @@ -55,6 +56,7 @@ function OutputStrings (BaseString) { JOB_EXPLANATION: t.s('Explanation'), JOB_TAGS: t.s('Job Tags'), JOB_TEMPLATE: t.s('Job Template'), + SLICE_JOB: t.s('Slice Job'), JOB_TYPE: t.s('Job Type'), LABELS: t.s('Labels'), LAUNCHED_BY: t.s('Launched By'), diff --git a/awx/ui/client/lib/components/launchTemplateButton/launchTemplateButton.component.js b/awx/ui/client/lib/components/launchTemplateButton/launchTemplateButton.component.js index 637555d57f..20cf1d8e94 100644 --- a/awx/ui/client/lib/components/launchTemplateButton/launchTemplateButton.component.js +++ b/awx/ui/client/lib/components/launchTemplateButton/launchTemplateButton.component.js @@ -41,7 +41,13 @@ function atLaunchTemplateCtrl ( selectedJobTemplate .postLaunch({ id: vm.template.id }) .then(({ data }) => { - $state.go('output', { id: data.job, type: 'playbook' }, { reload: true }); + /* Slice Jobs: Redirect to WF Details page if returned + job type is a WF job */ + if (data.type === 'workflow_job' && data.workflow_job !== null) { + $state.go('workflowResults', { id: data.workflow_job }, { reload: true }); + } else { + $state.go('output', { id: data.job, type: 'playbook' }, { reload: true }); + } }); } else { const promptData = { @@ -142,7 +148,13 @@ function atLaunchTemplateCtrl ( id: vm.promptData.template, launchData: jobLaunchData }).then((launchRes) => { - $state.go('output', { id: launchRes.data.job, type: 'playbook' }, { reload: true }); + /* Slice Jobs: Redirect to WF Details page if returned + job type is a WF job */ + if (launchRes.data.type === 'workflow_job' && launchRes.data.workflow_job !== null) { + $state.go('workflowResults', { id: launchRes.data.workflow_job }, { reload: true }); + } else { + $state.go('output', { id: launchRes.data.job, type: 'playbook' }, { reload: true }); + } }).catch(createErrorHandler('launch job template', 'POST')); } else if (vm.promptData.templateType === 'workflow_job_template') { workflowTemplate.create().postLaunch({ diff --git a/awx/ui/client/lib/components/list/_index.less b/awx/ui/client/lib/components/list/_index.less index 1f8d534034..c5b64d4eed 100644 --- a/awx/ui/client/lib/components/list/_index.less +++ b/awx/ui/client/lib/components/list/_index.less @@ -197,7 +197,7 @@ color: @at-color-list-row-item-tag-primary; } -.at-RowItem-tag--header { +.at-RowItem-tag--header, .at-RowItem-tag--secondary { line-height: inherit; } diff --git a/awx/ui/client/lib/components/list/row-item.directive.js b/awx/ui/client/lib/components/list/row-item.directive.js index 731aa837ec..b28b661581 100644 --- a/awx/ui/client/lib/components/list/row-item.directive.js +++ b/awx/ui/client/lib/components/list/row-item.directive.js @@ -13,6 +13,7 @@ function atRowItem () { headerLink: '@', headerState: '@', headerTag: '@', + secondaryTag: '@', status: '@', statusTip: '@', statusClick: '&?', diff --git a/awx/ui/client/lib/components/list/row-item.partial.html b/awx/ui/client/lib/components/list/row-item.partial.html index eebeab39a9..17f1e3b5d9 100644 --- a/awx/ui/client/lib/components/list/row-item.partial.html +++ b/awx/ui/client/lib/components/list/row-item.partial.html @@ -29,6 +29,9 @@
{{ headerTag }}
+
+ {{ secondaryTag }} +
diff --git a/awx/ui/client/src/templates/job_templates/job-template.form.js b/awx/ui/client/src/templates/job_templates/job-template.form.js index 54bbe2c70d..f716cb4abb 100644 --- a/awx/ui/client/src/templates/job_templates/job-template.form.js +++ b/awx/ui/client/src/templates/job_templates/job-template.form.js @@ -257,6 +257,19 @@ function(NotificationsList, i18n) { dataPlacement: 'right', control: '', }, + job_slice_count: { + label: i18n._('Job Slicing'), + type: 'number', + integer: true, + min: 1, + default: 1, + spinner: true, + dataTitle: i18n._('Slice Job Count'), + dataPlacement: 'right', + dataContainer: 'body', + awPopOver: "

" + i18n._("Divide the work done by this job template into the specified number of job slices, each running the same tasks against a portion of the inventory.") + "

", + ngDisabled: '!(job_template_obj.summary_fields.user_capabilities.edit || canAddJobTemplate)' + }, diff_mode: { label: i18n._('Show Changes'), type: 'toggleSwitch', diff --git a/awx/ui/client/src/workflow-results/workflow-results.controller.js b/awx/ui/client/src/workflow-results/workflow-results.controller.js index 31ad3191f8..fe9d42ebcb 100644 --- a/awx/ui/client/src/workflow-results/workflow-results.controller.js +++ b/awx/ui/client/src/workflow-results/workflow-results.controller.js @@ -39,6 +39,7 @@ export default ['workflowData', 'workflowResultsService', 'workflowDataOptions', DELETE: i18n._('Delete'), EDIT_USER: i18n._('Edit the user'), EDIT_WORKFLOW: i18n._('Edit the workflow job template'), + EDIT_SLICE_TEMPLATE: i18n._('Edit the slice job template'), EDIT_SCHEDULE: i18n._('Edit the schedule'), TOGGLE_STDOUT_FULLSCREEN: i18n._('Expand Output'), STATUS: '' // re-assigned elsewhere @@ -49,6 +50,7 @@ export default ['workflowData', 'workflowResultsService', 'workflowDataOptions', STARTED: i18n._('Started'), FINISHED: i18n._('Finished'), LABELS: i18n._('Labels'), + SLICE_TEMPLATE: i18n._('Slice Job Template'), STATUS: i18n._('Status') }, details: { @@ -109,6 +111,11 @@ export default ['workflowData', 'workflowResultsService', 'workflowDataOptions', $scope.workflow_job_template_link = `/#/templates/workflow_job_template/${$scope.workflow.summary_fields.workflow_job_template.id}`; } + if(workflowData.summary_fields && workflowData.summary_fields.job_template && + workflowData.summary_fields.job_template.id){ + $scope.slice_job_template_link = `/#/templates/job_template/${$scope.workflow.summary_fields.job_template.id}`; + } + // turn related api browser routes into front end routes getLinks(); diff --git a/awx/ui/client/src/workflow-results/workflow-results.partial.html b/awx/ui/client/src/workflow-results/workflow-results.partial.html index 97ef30a914..474fac1d6e 100644 --- a/awx/ui/client/src/workflow-results/workflow-results.partial.html +++ b/awx/ui/client/src/workflow-results/workflow-results.partial.html @@ -144,6 +144,22 @@
+ +
+ + +
+ { + let JobDetails; + let scope; + let state; + let OutputStrings; + let Prompt; + let filter; + let ProcessErrors; + let Wait; + let httpBackend; + let ParseVariableString; + let subscribe; + let OutputStatusService; + + let mockData = { + job_slice_count: 2, + job_slice_number: 2, + labels: { + SLICE_JOB: 'foo' + }, + tooltips: { + SLICE_JOB_DETAILS: 'bar' + } + }; + const resource = { + id: '147', + type: 'playbook', + model: { + get: (obj) => obj.split('.').reduce((i, o) => i && i[o] || null, mockData), + has: jasmine.createSpy('has'), + options: jasmine.createSpy('options'), + }, + events: {}, + ws: {} + }; + + beforeEach(angular.mock.module('at.features.output', ($provide) => { + state = { + params: { + job_search: {} + }, + go: jasmine.createSpy('go'), + includes: jasmine.createSpy('includes') + }; + + OutputStrings = { + get: (obj) => obj.split('.').reduce((i, o) => i && i[o] || null, mockData), + }; + + OutputStatusService = { + subscribe: jasmine.createSpy('subscribe') + }; + + ProcessErrors = jasmine.createSpy('ProcessErrors'); + Wait = jasmine.createSpy('Wait'); + Prompt = jasmine.createSpy('Prompt'); + + $provide.value('state', state); + $provide.value('ProcessErrors', ProcessErrors); + $provide.value('Wait', Wait); + $provide.value('Prompt', Prompt); + $provide.value('OutputStrings', OutputStrings); + $provide.value('ParseVariableString', angular.noop); + $provide.value('OutputStatusService', OutputStatusService); + + $provide.provider('$stateProvider', { $get: jasmine.createSpy('$get'), }); + $provide.value('$stateExtender', { addState: jasmine.createSpy('addState'), }); + $provide.value('$stateRegistry', { register: jasmine.createSpy('regster'), }); + $provide.value('sanitizeFilter', angular.noop); + $provide.value('subscribe', subscribe); + $provide.value('moment', moment); + $provide.value('longDateFilter', angular.noop); + })); + + beforeEach(angular.mock.inject(( + $injector, $componentController, $rootScope, + $httpBackend, _state_, _OutputStrings_, _ParseVariableString_, _Prompt_, + _ProcessErrors_, _Wait_, _OutputStatusService_ + ) => { + scope = $rootScope.$new(); + state = _state_; + OutputStrings = _OutputStrings_; + Prompt = _Prompt_; + filter = $injector.get('$filter'); + ProcessErrors = _ProcessErrors_; + Wait = _Wait_; + ParseVariableString = _ParseVariableString_; + httpBackend = $httpBackend; + OutputStatusService = _OutputStatusService_; + + JobDetails = $componentController('atJobDetails', { + $scope: scope, + $state: state, + OutputStrings, + ProcessErrors, + Wait, + Prompt, + $filter: filter, + ParseVariableString, + httpBackend, + OutputStatusService, + }, { resource }); + JobDetails.$onInit(); + })); + + describe('JobDetails Component', () => { + it('is created successfully', () => { + expect(JobDetails).toBeDefined(); + }); + it('has method "sliceJobDetails"', () => { + expect(JobDetails.sliceJobDetails).toBeDefined(); + }); + describe('splitJobDetails method', () => { + it('returned values are strings', () => { + const result = JobDetails.sliceJobDetails; + const { label, offset, tooltip } = result; + expect(offset).toEqual('2/2'); + expect(label).toEqual('foo'); + expect(tooltip).toEqual('bar'); + }); + it('returns null if label, offset, or tooltip is undefined', () => { + mockData = { + job_slice_count: 2, + job_slice_number: 2, + labels: { + SLICE_JOB: null + }, + tooltips: { + SLICE_JOB_DETAILS: null + } + }; + JobDetails.$onInit(); + const result = JobDetails.sliceJobDetails; + expect(result).toBeNull(); + }); + it('returns null if job_slice_count is undefined or null', () => { + mockData = { + job_slice_count: null, + job_slice_number: 2, + labels: { + SLICE_JOB: 'foo' + }, + tooltips: { + SLICE_JOB_DETAILS: 'bar' + } + }; + JobDetails.$onInit(); + const result = JobDetails.sliceJobDetails; + expect(result).toBeNull(); + }); + it('returns null if job_slice_number is undefined or null', () => { + mockData = { + job_slice_count: 2, + job_slice_number: null, + labels: { + SLICE_JOB: 'foo' + }, + tooltips: { + SLICE_JOB_DETAILS: 'bar' + } + }; + JobDetails.$onInit(); + const result = JobDetails.sliceJobDetails; + expect(result).toBeNull(); + }); + it('returns null if job is a non-sliced job', () => { + mockData = { + job_slice_count: 1, + job_slice_number: null, + labels: { + SLICE_JOB: 'foo' + }, + tooltips: { + SLICE_JOB_DETAILS: 'bar' + } + }; + JobDetails.$onInit(); + const result = JobDetails.sliceJobDetails; + expect(result).toBeNull(); + }); + }); + }); +}); diff --git a/awx/ui/test/unit/components/jobs-list-split-jobs.unit.js b/awx/ui/test/unit/components/jobs-list-split-jobs.unit.js new file mode 100644 index 0000000000..cd4438a0b5 --- /dev/null +++ b/awx/ui/test/unit/components/jobs-list-split-jobs.unit.js @@ -0,0 +1,127 @@ +describe('View: Split Jobs List', () => { + let JobList; + let scope; + let state; + let Dataset; + let resolvedModels; + let JobsStrings; + let QuerySet; + let Prompt; + let filter; + let ProcessErrors; + let Wait; + let Rest; + let SearchBasePath; + + beforeEach(angular.mock.module('at.features.jobs', ($provide) => { + Dataset = { + data: { + results: {} + } + }; + state = { + params: { + job_search: {} + }, + go: jasmine.createSpy('go'), + includes: jasmine.createSpy('includes') + }; + resolvedModels = [ + { + options: () => ['foo', 'bar'], + } + ]; + + ProcessErrors = jasmine.createSpy('ProcessErrors'); + Wait = jasmine.createSpy('Wait'); + Prompt = jasmine.createSpy('Prompt'); + + $provide.value('state', state); + $provide.value('Dataset', Dataset); + $provide.value('resolvedModels', resolvedModels); + $provide.value('ProcessErrors', ProcessErrors); + $provide.value('Wait', Wait); + $provide.value('Prompt', Prompt); + $provide.value('Rest', angular.noop); + $provide.value('SearchBasePath', ''); + $provide.value('JobsStrings', angular.noop); + $provide.value('QuerySet', angular.noop); + + $provide.provider('$stateProvider', { $get: jasmine.createSpy('$get'), }); + $provide.value('$stateExtender', { addState: jasmine.createSpy('addState'), }); + })); + + beforeEach(angular.mock.inject(( + $controller, $rootScope, _state_, _Dataset_, _resolvedModels_, _JobsStrings_, + _QuerySet_, _Prompt_, _$filter_, _ProcessErrors_, _Wait_, _Rest_, _SearchBasePath_ + ) => { + scope = $rootScope.$new(); + state = _state_; + Dataset = _Dataset_; + resolvedModels = _resolvedModels_; + JobsStrings = _JobsStrings_; + QuerySet = _QuerySet_; + Prompt = _Prompt_; + filter = _$filter_; + ProcessErrors = _ProcessErrors_; + Wait = _Wait_; + Rest = _Rest_; + SearchBasePath = _SearchBasePath_; + + JobList = $controller('jobsListController', { + $scope: scope, + $state: state, + Dataset, + resolvedModels, + JobsStrings, + ProcessErrors, + QuerySet, + Wait, + Prompt, + $filter: filter, + Rest, + SearchBasePath, + }); + })); + + describe('JobList Controller', () => { + it('is created successfully', () => { + expect(JobList).toBeDefined(); + }); + it('has method "getSplitJobDetails"', () => { + expect(JobList.getSliceJobDetails).toBeDefined(); + }); + it('returns a string', () => { + const data = { + job_slice_number: 1, + job_slice_count: 2 + }; + const result = JobList.getSliceJobDetails(data); + expect(result).toEqual('Slice Job 1/2'); + }); + it('returns null when data is null', () => { + const data = { + job_slice_number: null, + job_slice_count: null + }; + const result = JobList.getSliceJobDetails(data); + expect(result).toBeNull(); + }); + it('returns null when data is undefined', () => { + const data = { + job_slice_number: undefined, + job_slice_count: undefined + }; + const result = JobList.getSliceJobDetails(data); + expect(result).toBeNull(); + }); + it('returns null when job is not a sliced job', () => { + const data = { + job_slice_number: null, + job_slice_count: 1 + }; + const result = JobList.getSliceJobDetails(data); + expect(result).toBeNull(); + }); + }); +}); diff --git a/docs/job_slicing.md b/docs/job_slicing.md new file mode 100644 index 0000000000..d9234a978e --- /dev/null +++ b/docs/job_slicing.md @@ -0,0 +1,13 @@ +# Job Slicing Overview + +Ansible, by default, runs jobs from a single control instance. At best a single Ansible job can be sliced up on a single system via forks but this doesn't fully take advantage of AWX's ability to distribute work to multiple nodes in a cluster. + +Job Slicing solves this by adding a Job Template field `job_slice_count`. This field specifies the number of **Jobs** to slice the Ansible run into. When this number is greater than 1 ``AWX`` will generate a **Workflow** from a **JobTemplate** instead of a **Job**. The **Inventory** will be distributed evenly amongst the slice jobs. The workflow job is then started and proceeds as though it were a normal workflow. The API will return either a **Job** resource (if `job_slice_count` < 2) or a **WorkflowJob** resource otherwise. Likewise, the UI will redirect to the appropriate screen to display the status of the run. + +## Implications for Job execution + +When jobs are sliced they can run on any Tower node and some may not run at the same time. Because of this, anything that relies on setting/sliced state (using modules such as ``set_fact``) will not work as expected. It's reasonable to expect that not all jobs will actually run at the same time (if there is not enough capacity in the system for example) + +## Simultaneous Execution Behavior + +By default Job Templates aren't normally configured to execute simultaneously (``allow_simultaneous`` must be checked). Slicing overrides this behavior and implies ``allow_simultaneous`` even if that setting is unchecked.