optimize mark dnr nodes algorithm

* Compute largest depth of each node and traverse graph by depth. This
allows us to check a node once, and only once, to determine if it needs
to be marked for do not run.
This commit is contained in:
chris meyers 2018-11-18 14:32:56 -05:00 committed by mabashian
parent d1aa52a2a6
commit 4c9a1d6b90
2 changed files with 27 additions and 10 deletions

View File

@ -1,8 +1,11 @@
class SimpleDAG(object):
''' A simple implementation of a directed acyclic graph '''
def __init__(self):
# Depth
self.depth = [set([])]
self.nodes = []
self.root_nodes = set([])
@ -99,6 +102,8 @@ class SimpleDAG(object):
gv_file.close()
def add_node(self, obj, metadata=None):
if not metadata:
metadata = dict()
if self.find_ord(obj) is None:
'''
Assume node is a root node until a child is added
@ -109,6 +114,11 @@ class SimpleDAG(object):
entry = dict(node_object=obj, metadata=metadata)
self.nodes.append(entry)
# Depth
metadata['depth'] = 0
self.depth[0].add(node_index)
return node_index
def add_edge(self, from_obj, to_obj, label):
from_obj_ord = self.find_ord(from_obj)
to_obj_ord = self.find_ord(to_obj)
@ -133,6 +143,19 @@ class SimpleDAG(object):
self.node_from_edges_by_label[label][from_obj_ord].append(to_obj_ord)
self.node_to_edges_by_label[label][to_obj_ord].append(from_obj_ord)
# Depth
parent_depth = self.nodes[from_obj_ord]['metadata']['depth']
current_depth = self.nodes[to_obj_ord]['metadata']['depth']
if parent_depth >= current_depth:
if len(self.depth) <= parent_depth + 1:
self.depth.append(set([]))
self.nodes[to_obj_ord]['metadata']['depth'] = parent_depth + 1
self.depth[current_depth].remove(to_obj_ord)
self.depth[parent_depth + 1].add(to_obj_ord)
def find_ord(self, obj):
return self.node_obj_to_node_index.get(obj, None)

View File

@ -1,6 +1,5 @@
# Python
import copy
from awx.main.models import (
WorkflowJobTemplateNode,
WorkflowJobNode,
@ -201,13 +200,12 @@ class WorkflowDAG(SimpleDAG):
def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes = copy.copy(root_nodes)
nodes_marked_do_not_run = []
for index, node in enumerate(nodes):
obj = node['node_object']
if obj.do_not_run is False and not obj.job:
for node_indexes in self.depth:
for node_index in node_indexes:
node = self.nodes[node_index]
obj = node['node_object']
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
if self._are_all_nodes_dnr_decided(parent_nodes):
if obj.unified_job_template is None:
@ -216,8 +214,4 @@ class WorkflowDAG(SimpleDAG):
elif node not in root_nodes and self._should_mark_node_dnr(node, parent_nodes):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)
nodes.extend(self.get_dependencies(obj, 'success_nodes') +
self.get_dependencies(obj, 'failure_nodes') +
self.get_dependencies(obj, 'always_nodes'))
return [n['node_object'] for n in nodes_marked_do_not_run]