diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 3f7657f571..924e459929 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -50,6 +50,7 @@ class WorkflowDAG(SimpleDAG): return [n['node_object'] for n in nodes_found] def cancel_node_jobs(self): + cancel_finished = True for n in self.nodes: obj = n['node_object'] job = obj.job @@ -57,7 +58,9 @@ class WorkflowDAG(SimpleDAG): if not job: continue elif job.can_cancel: + cancel_finished = False job.cancel() + return cancel_finished def is_workflow_done(self): root_nodes = self.get_root_nodes() diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 51c807047a..08cb6cd247 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -103,8 +103,15 @@ class TaskManager(): def spawn_workflow_graph_jobs(self, workflow_jobs): for workflow_job in workflow_jobs: + if workflow_job.cancel_flag: + logger.debug('Not spawning jobs for %s because it is pending cancelation.', workflow_job.log_format) + continue dag = WorkflowDAG(workflow_job) spawn_nodes = dag.bfs_nodes_to_run() + if spawn_nodes: + logger.info('Spawning jobs for %s', workflow_job.log_format) + else: + logger.debug('No nodes to spawn for %s', workflow_job.log_format) for spawn_node in spawn_nodes: if spawn_node.unified_job_template is None: continue @@ -112,6 +119,7 @@ class TaskManager(): job = spawn_node.unified_job_template.create_unified_job(**kv) spawn_node.job = job spawn_node.save() + logger.info('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) if job._resources_sufficient_for_launch(): can_start = job.signal_start() if not can_start: @@ -129,20 +137,23 @@ class TaskManager(): # TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ? #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - # See comment in tasks.py::RunWorkflowJob::run() def process_finished_workflow_jobs(self, workflow_jobs): result = [] for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) if workflow_job.cancel_flag: - workflow_job.status = 'canceled' - workflow_job.save() - dag.cancel_node_jobs() - connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) + logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) + cancel_finished = dag.cancel_node_jobs() + if cancel_finished: + logger.info('Marking %s as canceled, all spawned jobs have concluded.', workflow_job.log_format) + workflow_job.status = 'canceled' + workflow_job.save() + connection.on_commit(lambda: workflow_job.websocket_emit_status(workflow_job.status)) else: is_done, has_failed = dag.is_workflow_done() if not is_done: continue + logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') result.append(workflow_job.id) workflow_job.status = 'failed' if has_failed else 'successful' workflow_job.save() @@ -499,6 +510,14 @@ class TaskManager(): running_workflow_tasks = self.get_running_workflow_jobs() finished_wfjs = self.process_finished_workflow_jobs(running_workflow_tasks) + previously_running_workflow_tasks = running_workflow_tasks + running_workflow_tasks = [] + for workflow_job in previously_running_workflow_tasks: + if workflow_job.status == 'running': + running_workflow_tasks.append(workflow_job) + else: + logger.debug('Removed %s from job spawning consideration.', workflow_job.log_format) + self.spawn_workflow_graph_jobs(running_workflow_tasks) self.process_tasks(all_sorted_tasks)