From 01d1470544fa922f10bab8af2ec001453797a3ac Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 1 Oct 2018 16:04:55 -0400 Subject: [PATCH] workflow variables processing, recursion detection --- awx/main/models/workflow.py | 26 ++++++++++++++++++++++---- awx/main/scheduler/task_manager.py | 25 ++++++++++++++++++++----- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index d02e3f6057..0753ae4992 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -31,7 +31,7 @@ from awx.main.models.mixins import ( SurveyJobMixin, RelatedJobsMixin, ) -from awx.main.models.jobs import LaunchTimeConfig +from awx.main.models.jobs import LaunchTimeConfig, JobTemplate from awx.main.models.credential import Credential from awx.main.redact import REPLACE_STR from awx.main.fields import JSONField @@ -199,7 +199,14 @@ class WorkflowJobNode(WorkflowNodeBase): data = {} ujt_obj = self.unified_job_template if ujt_obj is not None: - accepted_fields, ignored_fields, errors = ujt_obj._accept_or_ignore_job_kwargs(**self.prompts_dict()) + # MERGE note: move this to prompts_dict method on node when merging + # with the workflow inventory branch + prompts_data = self.prompts_dict() + if isinstance(ujt_obj, WorkflowJobTemplate): + if self.workflow_job.extra_vars: + prompts_data.setdefault('extra_vars', {}) + prompts_data['extra_vars'].update(self.workflow_job.extra_vars_dict) + accepted_fields, ignored_fields, errors = ujt_obj._accept_or_ignore_job_kwargs(**prompts_data) if errors: logger.info(_('Bad launch configuration starting template {template_pk} as part of ' 'workflow {workflow_pk}. Errors:\n{error_text}').format( @@ -246,8 +253,9 @@ class WorkflowJobNode(WorkflowNodeBase): functional_aa_dict.pop('_ansible_no_log', None) extra_vars.update(functional_aa_dict) # Workflow Job extra_vars higher precedence than ancestor artifacts - if self.workflow_job and self.workflow_job.extra_vars: - extra_vars.update(self.workflow_job.extra_vars_dict) + if ujt_obj and isinstance(ujt_obj, JobTemplate): + if self.workflow_job and self.workflow_job.extra_vars: + extra_vars.update(self.workflow_job.extra_vars_dict) if extra_vars: data['extra_vars'] = extra_vars # ensure that unified jobs created by WorkflowJobs are marked @@ -505,6 +513,16 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio def task_impact(self): return 0 + def get_ancestor_workflows(self): + ancestors = [] + wj = self + while True: + wj = wj.get_workflow_job() + if (not wj) or (not wj.workflow_job_template_id): + break + ancestors.append(wj.workflow_job_template_id) + return ancestors + def get_notification_templates(self): return self.workflow_job_template.notification_templates diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 863352857c..4936ff74e0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -26,6 +26,7 @@ from awx.main.models import ( ProjectUpdate, SystemJob, WorkflowJob, + WorkflowJobTemplate ) from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock @@ -120,7 +121,25 @@ class TaskManager(): spawn_node.job = job spawn_node.save() logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) - if job._resources_sufficient_for_launch(): + can_start = True + if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): + workflow_ancestors = job.get_ancestor_workflows() + if spawn_node.unified_job_template.id in set(workflow_ancestors): + can_start = False + logger.info('Refusing to start recursive workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.id, workflow_ancestors)) + job.job_explanation = _( + "Workflow Job spawned from workflow could not start because it " + "would result in recursion (template spawn order {})" + ).format([spawn_node.unified_job_template.id] + workflow_ancestors) + else: + logger.debug('Starting workflow-in-workflow id={}, wfjt={}, ancestors={}'.format( + job.id, spawn_node.unified_job_template.id, workflow_ancestors)) + if not job._resources_sufficient_for_launch(): + can_start = False + job.job_explanation = _("Job spawned from workflow could not start because it " + "was missing a related resource such as project or inventory") + if can_start: if workflow_job.start_args: start_args = json.loads(decrypt_field(workflow_job, 'start_args')) else: @@ -129,10 +148,6 @@ class TaskManager(): if not can_start: job.job_explanation = _("Job spawned from workflow could not start because it " "was not in the right state or required manual credentials") - else: - can_start = False - job.job_explanation = _("Job spawned from workflow could not start because it " - "was missing a related resource such as project or inventory") if not can_start: job.status = 'failed' job.save(update_fields=['status', 'job_explanation'])