From c96cf3cf5db7e38e5a01552f9389347f14802f14 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 1 Aug 2018 14:52:43 -0400 Subject: [PATCH 1/3] unless allow_simultaneous, do not start workflow jobs that are already running --- awx/main/scheduler/task_manager.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 198f0e3652..24ffab5103 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -482,6 +482,12 @@ class TaskManager(): found_acceptable_queue = False idle_instance_that_fits = None if isinstance(task, WorkflowJob): + running_workflow_templates = [wf.workflow_job_template.pk for wf in self.get_running_workflow_jobs()] + running = task.workflow_job_template.pk in running_workflow_templates + if running: + if not task.allow_simultaneous: + logger.debug(six.text_type("{} is blocked from running, workflow already running").format(task.log_format, task.workflow_job_template.pk)) + continue self.start_task(task, None, task.get_jobs_fail_chain(), None) continue for rampart_group in preferred_instance_groups: From da603dd3ade33cb81949d7b9b0fc4ec7f8bf8a18 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 1 Aug 2018 15:55:19 -0400 Subject: [PATCH 2/3] use workflow_template_id --- awx/main/scheduler/task_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 24ffab5103..748f269dfd 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -482,11 +482,11 @@ class TaskManager(): found_acceptable_queue = False idle_instance_that_fits = None if isinstance(task, WorkflowJob): - running_workflow_templates = [wf.workflow_job_template.pk for wf in self.get_running_workflow_jobs()] - running = task.workflow_job_template.pk in running_workflow_templates + running_workflow_templates = [wf.workflow_job_template_id for wf in self.get_running_workflow_jobs()] + running = task.workflow_job_template_id in running_workflow_templates if running: if not task.allow_simultaneous: - logger.debug(six.text_type("{} is blocked from running, workflow already running").format(task.log_format, task.workflow_job_template.pk)) + logger.debug(six.text_type("{} is blocked from running, workflow already running").format(task.log_format)) continue self.start_task(task, None, task.get_jobs_fail_chain(), None) continue From 74ed1c2c40cb4177f5d6b1fe297b32fd3999d6f2 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Thu, 2 Aug 2018 11:29:30 -0400 Subject: [PATCH 3/3] fetch running workflow jobs once per process --- awx/main/scheduler/task_manager.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 748f269dfd..f4e92c7dea 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -473,6 +473,7 @@ class TaskManager(): logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format)) def process_pending_tasks(self, pending_tasks): + running_workflow_templates = set([wf.workflow_job_template_id for wf in self.get_running_workflow_jobs()]) for task in pending_tasks: self.process_dependencies(task, self.generate_dependencies(task)) if self.is_job_blocked(task): @@ -482,9 +483,7 @@ class TaskManager(): found_acceptable_queue = False idle_instance_that_fits = None if isinstance(task, WorkflowJob): - running_workflow_templates = [wf.workflow_job_template_id for wf in self.get_running_workflow_jobs()] - running = task.workflow_job_template_id in running_workflow_templates - if running: + if task.workflow_job_template_id in running_workflow_templates: if not task.allow_simultaneous: logger.debug(six.text_type("{} is blocked from running, workflow already running").format(task.log_format)) continue