From 4c9a1d6b909fb57b9dce401d75e0e45fa7b2b6ee Mon Sep 17 00:00:00 2001 From: chris meyers Date: Sun, 18 Nov 2018 14:32:56 -0500 Subject: [PATCH] optimize mark dnr nodes algorithm * Compute largest depth of each node and traverse graph by depth. This allows us to check a node once, and only once, to determine if it needs to be marked for do not run. --- awx/main/scheduler/dag_simple.py | 23 +++++++++++++++++++++++ awx/main/scheduler/dag_workflow.py | 14 ++++---------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index fd54b6a34f..ecd4da28ac 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -1,8 +1,11 @@ + class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' def __init__(self): + # Depth + self.depth = [set([])] self.nodes = [] self.root_nodes = set([]) @@ -99,6 +102,8 @@ class SimpleDAG(object): gv_file.close() def add_node(self, obj, metadata=None): + if not metadata: + metadata = dict() if self.find_ord(obj) is None: ''' Assume node is a root node until a child is added @@ -109,6 +114,11 @@ class SimpleDAG(object): entry = dict(node_object=obj, metadata=metadata) self.nodes.append(entry) + # Depth + metadata['depth'] = 0 + self.depth[0].add(node_index) + return node_index + def add_edge(self, from_obj, to_obj, label): from_obj_ord = self.find_ord(from_obj) to_obj_ord = self.find_ord(to_obj) @@ -133,6 +143,19 @@ class SimpleDAG(object): self.node_from_edges_by_label[label][from_obj_ord].append(to_obj_ord) self.node_to_edges_by_label[label][to_obj_ord].append(from_obj_ord) + # Depth + parent_depth = self.nodes[from_obj_ord]['metadata']['depth'] + current_depth = self.nodes[to_obj_ord]['metadata']['depth'] + if parent_depth >= current_depth: + if len(self.depth) <= parent_depth + 1: + self.depth.append(set([])) + + self.nodes[to_obj_ord]['metadata']['depth'] = parent_depth + 1 + + self.depth[current_depth].remove(to_obj_ord) + self.depth[parent_depth + 1].add(to_obj_ord) + + def find_ord(self, obj): return self.node_obj_to_node_index.get(obj, None) diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index b405262218..315fbde5ec 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -1,6 +1,5 @@ # Python -import copy from awx.main.models import ( WorkflowJobTemplateNode, WorkflowJobNode, @@ -201,13 +200,12 @@ class WorkflowDAG(SimpleDAG): def mark_dnr_nodes(self): root_nodes = self.get_root_nodes() - nodes = copy.copy(root_nodes) nodes_marked_do_not_run = [] - for index, node in enumerate(nodes): - obj = node['node_object'] - - if obj.do_not_run is False and not obj.job: + for node_indexes in self.depth: + for node_index in node_indexes: + node = self.nodes[node_index] + obj = node['node_object'] parent_nodes = [p['node_object'] for p in self.get_dependents(obj)] if self._are_all_nodes_dnr_decided(parent_nodes): if obj.unified_job_template is None: @@ -216,8 +214,4 @@ class WorkflowDAG(SimpleDAG): elif node not in root_nodes and self._should_mark_node_dnr(node, parent_nodes): obj.do_not_run = True nodes_marked_do_not_run.append(node) - - nodes.extend(self.get_dependencies(obj, 'success_nodes') + - self.get_dependencies(obj, 'failure_nodes') + - self.get_dependencies(obj, 'always_nodes')) return [n['node_object'] for n in nodes_marked_do_not_run]