diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index b1d0b72482..bd673f5dc1 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -115,45 +115,26 @@ class WorkflowDAG(SimpleDAG): return cancel_finished def is_workflow_done(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - is_failed = False + for node in self.nodes: + obj = node['node_object'] + if obj.do_not_run is False and not obj.job: + return False + elif obj.job and obj.job.status not in ['successful', 'failed', 'canceled', 'error']: + return False + return True - for index, n in enumerate(nodes): - obj = n['node_object'] - job = obj.job - - if obj.unified_job_template is None: - is_failed = True - continue - elif obj.do_not_run is False and not job: - return False, False - elif obj.do_not_run is True: - continue - - children_success = self.get_dependencies(obj, 'success_nodes') - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - if not is_failed and job.status != 'successful': - children_all = children_success + children_failed + children_always - for child in children_all: - if child['node_object'].job: - break - else: - is_failed = True if children_all else job.status in ['failed', 'canceled', 'error'] - - if job.status == 'canceled': - continue - elif job.status in ['error', 'failed']: - nodes.extend(children_failed + children_always) - elif job.status == 'successful': - nodes.extend(children_success + children_always) - else: - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - return False, False - return True, is_failed + def has_workflow_failed(self): + failed_nodes = [] + for node in self.nodes: + obj = node['node_object'] + if obj.job and obj.job.status in ['failed', 'anceled', 'error']: + failed_nodes.append(node) + for node in failed_nodes: + obj = node['node_object'] + if (len(self.get_dependencies(obj, 'failure_nodes')) + + len(self.get_dependencies(obj, 'always_nodes'))) == 0: + return True + return False r''' Determine if all nodes have been decided on being marked do_not_run. @@ -188,7 +169,7 @@ class WorkflowDAG(SimpleDAG): if node in (self.get_dependencies(p, 'success_nodes') + self.get_dependencies(p, 'always_nodes')): return False - elif p.job.status in ['failed', 'error']: + elif p.job.status in ['failed', 'error', 'canceled']: if node in (self.get_dependencies(p, 'failure_nodes') + self.get_dependencies(p, 'always_nodes')): return False @@ -222,4 +203,3 @@ class WorkflowDAG(SimpleDAG): self.get_dependencies(obj, 'failure_nodes') + self.get_dependencies(obj, 'always_nodes')) return [n['node_object'] for n in nodes_marked_do_not_run] - diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index f1d447077e..a60c7f342c 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -163,6 +163,7 @@ class TaskManager(): dag = WorkflowDAG(workflow_job) status_changed = False if workflow_job.cancel_flag: + workflow_job.workflow_nodes.filter(do_not_run=False, job__isnull=True).update(do_not_run=True) logger.debug('Canceling spawned jobs of %s due to cancel flag.', workflow_job.log_format) cancel_finished = dag.cancel_node_jobs() if cancel_finished: @@ -172,11 +173,12 @@ class TaskManager(): workflow_job.save(update_fields=['status', 'start_args']) status_changed = True else: - is_done, has_failed = dag.is_workflow_done() workflow_nodes = dag.mark_dnr_nodes() map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes) + is_done = dag.is_workflow_done() if not is_done: continue + has_failed = dag.has_workflow_failed() logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') result.append(workflow_job.id) new_status = 'failed' if has_failed else 'successful'