diff --git a/awx/api/serializers.py b/awx/api/serializers.py index e880648467..2c904e2a7f 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3590,7 +3590,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): class Meta: model = WorkflowJob - fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', + fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', 'job_template', '-execution_node', '-event_processing_finished', '-controller_node',) def get_related(self, obj): diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 8bd9f25dc2..cd96f3fcbc 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -2905,7 +2905,7 @@ class JobTemplateLaunch(RetrieveAPIView): raise PermissionDenied() passwords = serializer.validated_data.pop('credential_passwords', {}) - new_job = obj.create_job(**serializer.validated_data) + new_job = obj.create_unified_job(**serializer.validated_data) result = new_job.signal_start(**passwords) if not result: diff --git a/awx/main/access.py b/awx/main/access.py index 768545ce05..5c1ea23a3a 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1789,7 +1789,7 @@ class WorkflowJobNodeAccess(BaseAccess): def filtered_queryset(self): return self.model.objects.filter( - workflow_job__workflow_job_template__in=WorkflowJobTemplate.accessible_objects( + workflow_job__unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs( self.user, 'read_role')) @check_superuser @@ -1915,7 +1915,7 @@ class WorkflowJobAccess(BaseAccess): def filtered_queryset(self): return WorkflowJob.objects.filter( - workflow_job_template__in=WorkflowJobTemplate.accessible_objects( + unified_job_template__in=UnifiedJobTemplate.accessible_pk_qs( self.user, 'read_role')) def can_add(self, data): @@ -1947,9 +1947,11 @@ class WorkflowJobAccess(BaseAccess): if self.user.is_superuser: return True - wfjt = obj.workflow_job_template + template = obj.workflow_job_template + if not template and obj.job_template_id: + template = obj.job_template # only superusers can relaunch orphans - if not wfjt: + if not template: return False # If job was launched by another user, it could have survey passwords @@ -1967,7 +1969,7 @@ class WorkflowJobAccess(BaseAccess): return False # execute permission to WFJT is mandatory for any relaunch - return (self.user in wfjt.execute_role) + return (self.user in template.execute_role) def can_recreate(self, obj): node_qs = obj.workflow_job_nodes.all().prefetch_related('inventory', 'credentials', 'unified_job_template') diff --git a/awx/main/models/ad_hoc_commands.py b/awx/main/models/ad_hoc_commands.py index 3549bb2a41..af9c519812 100644 --- a/awx/main/models/ad_hoc_commands.py +++ b/awx/main/models/ad_hoc_commands.py @@ -136,8 +136,7 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin): else: return [] - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return '' @classmethod diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 26a5be6c3a..03220ac967 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -1647,8 +1647,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin, null=True ) - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'inventory_source' @classmethod diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 4821223a3f..1671496191 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -320,32 +320,32 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour def resources_needed_to_start(self): return [fd for fd in ['project', 'inventory'] if not getattr(self, '{}_id'.format(fd))] - def create_job(self, **kwargs): + def create_unified_job(self, **kwargs): ''' Create a new job based on this template. ''' - if self.job_shard_count > 1: + split_event = bool( + self.job_shard_count > 1 and + not kwargs.pop('_prevent_sharding', False) + ) + if split_event: # A sharded Job Template will generate a WorkflowJob rather than a Job from awx.main.models.workflow import WorkflowJobTemplate, WorkflowJobNode kwargs['_unified_job_class'] = WorkflowJobTemplate._get_unified_job_class() - kwargs['_unified_job_field_names'] = WorkflowJobTemplate._get_unified_job_field_names() - job = self.create_unified_job(**kwargs) - if self.job_shard_count > 1: - if 'inventory' in kwargs: - actual_inventory = kwargs['inventory'] - else: - actual_inventory = self.inventory + kwargs['_parent_field_name'] = "job_template" + job = super(JobTemplate, self).create_unified_job(**kwargs) + if split_event: + try: + wj_config = job.launch_config + except JobLaunchConfig.DoesNotExist: + wj_config = JobLaunchConfig() + actual_inventory = wj_config.inventory if wj_config.inventory else self.inventory for idx in xrange(min(self.job_shard_count, actual_inventory.hosts.count())): create_kwargs = dict(workflow_job=job, unified_job_template=self, - #survey_passwords=self.survey_passwords, - inventory=actual_inventory, ancestor_artifacts=dict(job_shard=idx)) - #char_prompts=self.char_prompts) wfjn = WorkflowJobNode.objects.create(**create_kwargs) - for cred in self.credentials.all(): - wfjn.credentials.add(cred) return job def get_absolute_url(self, request=None): @@ -531,8 +531,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana ) - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'job_template' @classmethod diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 7f296376fa..3c283e2fd2 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -496,8 +496,7 @@ class ProjectUpdate(UnifiedJob, ProjectOptions, JobNotificationMixin, TaskManage default='check', ) - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'project' @classmethod diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 54b6dbad3e..5dd1241e0e 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -309,13 +309,6 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio ''' raise NotImplementedError # Implement in subclass. - @classmethod - def _get_unified_job_field_names(cls): - ''' - Return field names that should be copied from template to new job. - ''' - raise NotImplementedError # Implement in subclass. - @property def notification_templates(self): ''' @@ -338,27 +331,32 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio password_list = self.survey_password_variables() encrypt_dict(kwargs.get('extra_vars', {}), password_list) - unified_job_class = kwargs.pop("_unified_job_class", self._get_unified_job_class()) - fields = kwargs.pop("_unified_job_field_names", self._get_unified_job_field_names()) - print("UJC: {}".format(unified_job_class)) - print("fields: {}".format(fields)) + unified_job_class = self._get_unified_job_class() + fields = self._get_unified_job_field_names() + parent_field_name = None + if "_unified_job_class" in kwargs: + # Special case where spawned job is different type than usual + # Only used for sharded jobs + unified_job_class = kwargs.pop("_unified_job_class") + fields = unified_job_class._get_unified_job_field_names() & fields + parent_field_name = kwargs.pop('_parent_field_name') + unallowed_fields = set(kwargs.keys()) - set(fields) + validated_kwargs = kwargs.copy() if unallowed_fields: logger.warn('Fields {} are not allowed as overrides.'.format(unallowed_fields)) - map(kwargs.pop, unallowed_fields) + map(validated_kwargs.pop, unallowed_fields) - unified_job = copy_model_by_class(self, unified_job_class, fields, kwargs) + unified_job = copy_model_by_class(self, unified_job_class, fields, validated_kwargs) if eager_fields: for fd, val in eager_fields.items(): setattr(unified_job, fd, val) - # Set the unified job template back-link on the job - # TODO: fix this hack properly before merge matburt - if isinstance(self, JobTemplate) and isinstance(unified_job, WorkflowJob): - parent_field_name = "job_template" - else: - parent_field_name = unified_job_class._get_parent_field_name() + # NOTE: sharded workflow jobs _get_parent_field_name method + # is not correct until this is set + if not parent_field_name: + parent_field_name = unified_job._get_parent_field_name() setattr(unified_job, parent_field_name, self) # For JobTemplate-based jobs with surveys, add passwords to list for perma-redaction @@ -372,24 +370,25 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, Notificatio unified_job.save() # Labels and credentials copied here - if kwargs.get('credentials'): + if validated_kwargs.get('credentials'): Credential = UnifiedJob._meta.get_field('credentials').related_model cred_dict = Credential.unique_dict(self.credentials.all()) - prompted_dict = Credential.unique_dict(kwargs['credentials']) + prompted_dict = Credential.unique_dict(validated_kwargs['credentials']) # combine prompted credentials with JT cred_dict.update(prompted_dict) - kwargs['credentials'] = [cred for cred in cred_dict.values()] + validated_kwargs['credentials'] = [cred for cred in cred_dict.values()] + kwargs['credentials'] = validated_kwargs['credentials'] from awx.main.signals import disable_activity_stream with disable_activity_stream(): - copy_m2m_relationships(self, unified_job, fields, kwargs=kwargs) + copy_m2m_relationships(self, unified_job, fields, kwargs=validated_kwargs) - if 'extra_vars' in kwargs: - unified_job.handle_extra_data(kwargs['extra_vars']) + if 'extra_vars' in validated_kwargs: + unified_job.handle_extra_data(validated_kwargs['extra_vars']) if not getattr(self, '_deprecated_credential_launch', False): # Create record of provided prompts for relaunch and rescheduling - unified_job.create_config_from_prompts(kwargs) + unified_job.create_config_from_prompts(kwargs, parent=self) return unified_job @@ -702,8 +701,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def supports_isolation(cls): return False - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): return 'unified_job_template' # Override in subclasses. @classmethod @@ -874,16 +872,16 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique except JobLaunchConfig.DoesNotExist: return None - def create_config_from_prompts(self, kwargs): + def create_config_from_prompts(self, kwargs, parent=None): ''' Create a launch configuration entry for this job, given prompts returns None if it can not be created ''' - if self.unified_job_template is None: - return None JobLaunchConfig = self._meta.get_field('launch_config').related_model config = JobLaunchConfig(job=self) - valid_fields = self.unified_job_template.get_ask_mapping().keys() + if parent is None: + parent = getattr(self, self._get_parent_field_name()) + valid_fields = parent.get_ask_mapping().keys() # Special cases allowed for workflows if hasattr(self, 'extra_vars'): valid_fields.extend(['survey_passwords', 'extra_vars']) @@ -900,8 +898,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique setattr(config, key, value) config.save() - job_creds = (set(kwargs.get('credentials', [])) - - set(self.unified_job_template.credentials.all())) + job_creds = set(kwargs.get('credentials', [])) + if 'credentials' in [field.name for field in parent._meta.get_fields()]: + job_creds = job_creds - set(parent.credentials.all()) if job_creds: config.credentials.add(*job_creds) return config diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index b97f555d57..8994c1aa1a 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -4,11 +4,13 @@ # Python #import urlparse import logging +import six # Django from django.db import models from django.conf import settings from django.utils.translation import ugettext_lazy as _ +from django.core.exceptions import ObjectDoesNotExist #from django import settings as tower_settings # AWX @@ -206,6 +208,15 @@ class WorkflowJobNode(WorkflowNodeBase): workflow_pk=self.pk, error_text=errors)) data.update(accepted_fields) # missing fields are handled in the scheduler + try: + # config saved on the workflow job itself + wj_config = self.workflow_job.launch_config + except ObjectDoesNotExist: + wj_config = None + if wj_config: + accepted_fields, ignored_fields, errors = ujt_obj._accept_or_ignore_job_kwargs(**wj_config.prompts_dict()) + accepted_fields.pop('extra_vars', None) # merge handled with other extra_vars later + data.update(accepted_fields) # build ancestor artifacts, save them to node model for later aa_dict = {} for parent_node in self.get_parent_nodes(): @@ -240,6 +251,15 @@ class WorkflowJobNode(WorkflowNodeBase): data['extra_vars'] = extra_vars # ensure that unified jobs created by WorkflowJobs are marked data['_eager_fields'] = {'launch_type': 'workflow'} + # Extra processing in the case that this is a sharded job + if 'job_shard' in self.ancestor_artifacts: + shard_str = six.text_type(self.ancestor_artifacts['job_shard'] + 1) + data['_eager_fields']['name'] = six.text_type("{} - {}").format( + self.unified_job_template.name[:512 - len(shard_str) - len(' - ')], + shard_str + ) + data['_eager_fields']['allow_simultaneous'] = True + data['_prevent_sharding'] = True return data @@ -261,6 +281,12 @@ class WorkflowJobOptions(BaseModel): def workflow_nodes(self): raise NotImplementedError() + @classmethod + def _get_unified_job_field_names(cls): + return set(f.name for f in WorkflowJobOptions._meta.fields) | set( + ['name', 'description', 'schedule', 'survey_passwords', 'labels'] + ) + def _create_workflow_nodes(self, old_node_list, user=None): node_links = {} for old_node in old_node_list: @@ -331,12 +357,6 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions, SurveyJobTempl def _get_unified_job_class(cls): return WorkflowJob - @classmethod - def _get_unified_job_field_names(cls): - return set(f.name for f in WorkflowJobOptions._meta.fields) | set( - ['name', 'description', 'schedule', 'survey_passwords', 'labels'] - ) - @classmethod def _get_unified_jt_copy_names(cls): base_list = super(WorkflowJobTemplate, cls)._get_unified_jt_copy_names() @@ -446,8 +466,10 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio def workflow_nodes(self): return self.workflow_job_nodes - @classmethod - def _get_parent_field_name(cls): + def _get_parent_field_name(self): + if self.job_template_id: + # This is a workflow job which is a container for sharded jobs + return 'job_template' return 'workflow_job_template' @classmethod diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 4f21e56903..08cb6cd247 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -117,9 +117,6 @@ class TaskManager(): continue kv = spawn_node.get_job_kwargs() job = spawn_node.unified_job_template.create_unified_job(**kv) - if 'job_shard' in spawn_node.ancestor_artifacts: - job.name = six.text_type("{} - {}").format(job.name, spawn_node.ancestor_artifacts['job_shard'] + 1) - job.save() spawn_node.job = job spawn_node.save() logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 459471b6e8..57ceec41be 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -776,3 +776,37 @@ def sqlite_copy_expert(request): def disable_database_settings(mocker): m = mocker.patch('awx.conf.settings.SettingsWrapper.all_supported_settings', new_callable=PropertyMock) m.return_value = [] + + +@pytest.fixture +def shard_jt_factory(inventory): + def r(N, jt_kwargs=None): + for i in range(N): + inventory.hosts.create(name='foo{}'.format(i)) + if not jt_kwargs: + jt_kwargs = {} + return JobTemplate.objects.create( + name='shard-jt-from-factory', + job_shard_count=N, + inventory=inventory, + **jt_kwargs + ) + return r + + +@pytest.fixture +def shard_job_factory(shard_jt_factory): + def r(N, jt_kwargs=None, prompts=None, spawn=False): + shard_jt = shard_jt_factory(N, jt_kwargs=jt_kwargs) + if not prompts: + prompts = {} + shard_job = shard_jt.create_unified_job(**prompts) + if spawn: + for node in shard_job.workflow_nodes.all(): + # does what the task manager does for spawning workflow jobs + kv = node.get_job_kwargs() + job = node.unified_job_template.create_unified_job(**kv) + node.job = job + node.save() + return shard_job + return r diff --git a/awx/main/tests/functional/models/test_job.py b/awx/main/tests/functional/models/test_job.py index 013f73ca39..9926e42b5d 100644 --- a/awx/main/tests/functional/models/test_job.py +++ b/awx/main/tests/functional/models/test_job.py @@ -1,7 +1,7 @@ import pytest import six -from awx.main.models import JobTemplate, Job, JobHostSummary +from awx.main.models import JobTemplate, Job, JobHostSummary, WorkflowJob from crum import impersonate @@ -81,3 +81,22 @@ def test_job_host_summary_representation(host): jhs = JobHostSummary.objects.get(pk=jhs.id) host.delete() assert 'N/A changed=1 dark=2 failures=3 ok=4 processed=5 skipped=6' == six.text_type(jhs) + +@pytest.mark.django_db +class TestShardingModels: + + def test_shard_workflow_spawn(self, shard_jt_factory): + shard_jt = shard_jt_factory(3) + job = shard_jt.create_unified_job() + assert isinstance(job, WorkflowJob) + assert job.job_template == shard_jt + assert job.unified_job_template == shard_jt + assert job.workflow_nodes.count() == 3 + + def test_shards_with_JT_and_prompts(self, shard_job_factory): + job = shard_job_factory(3, jt_kwargs={'ask_limit_on_launch': True}, prompts={'limit': 'foobar'}, spawn=True) + assert job.launch_config.prompts_dict() == {'limit': 'foobar'} + for node in job.workflow_nodes.all(): + assert node.limit == None # data not saved in node prompts + job = node.job + assert job.limit == 'foobar' diff --git a/awx/main/tests/functional/test_rbac_job_start.py b/awx/main/tests/functional/test_rbac_job_start.py index 60c35e0803..4b44a5a284 100644 --- a/awx/main/tests/functional/test_rbac_job_start.py +++ b/awx/main/tests/functional/test_rbac_job_start.py @@ -3,6 +3,11 @@ import pytest from awx.main.models.inventory import Inventory from awx.main.models.credential import Credential from awx.main.models.jobs import JobTemplate, Job +from awx.main.access import ( + UnifiedJobAccess, + WorkflowJobAccess, WorkflowJobNodeAccess, + JobAccess +) @pytest.mark.django_db @@ -43,6 +48,31 @@ def test_inventory_use_access(inventory, user): assert common_user.can_access(Inventory, 'use', inventory) +@pytest.mark.django_db +def test_sharded_job(shard_job_factory, rando): + workflow_job = shard_job_factory(2, jt_kwargs={'created_by': rando}, spawn=True) + workflow_job.job_template.execute_role.members.add(rando) + + # Abilities of user with execute_role for shard workflow job container + assert WorkflowJobAccess(rando).can_start(workflow_job) # relaunch allowed + for access_cls in (UnifiedJobAccess, WorkflowJobAccess): + access = access_cls(rando) + assert access.can_read(workflow_job) + assert workflow_job in access.get_queryset() + + # Abilities of user with execute_role for all the shards of the job + for node in workflow_job.workflow_nodes.all(): + access = WorkflowJobNodeAccess(rando) + assert access.can_read(node) + assert node in access.get_queryset() + job = node.job + assert JobAccess(rando).can_start(job) # relaunch allowed + for access_cls in (UnifiedJobAccess, JobAccess): + access = access_cls(rando) + assert access.can_read(job) + assert job in access.get_queryset() + + @pytest.mark.django_db class TestJobRelaunchAccess: @pytest.fixture