From 0499d419c374e12db47ae4d13d61618d45afc07d Mon Sep 17 00:00:00 2001 From: chris meyers Date: Mon, 12 Nov 2018 15:06:09 -0500 Subject: [PATCH] more efficient graph processing * Getting parent nodes from child was inefficient. Optimize it with a hash table like we did for the getting of children. * Getting leaf nodes was inefficient. Optimize it like we did getting root nodes. A node is assumed to be a leaf node until it gets a child. --- awx/main/scheduler/dag_simple.py | 132 ++++++++++++++++------------- awx/main/scheduler/dag_workflow.py | 27 +++--- 2 files changed, 91 insertions(+), 68 deletions(-) diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index 1e28615cc5..589a18fecd 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -1,14 +1,13 @@ - class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' def __init__(self): self.nodes = [] - self.edges = [] self.root_nodes = set([]) + self.leaf_nodes = set([]) - ''' + r''' Track node_obj->node index dict where key is a full workflow node object or whatever we are storing in ['node_object'] and value is an index to be used into @@ -16,19 +15,41 @@ class SimpleDAG(object): ''' self.node_obj_to_node_index = dict() - ''' + r''' Track per-node from->to edges - dict where key is the node index in self.nodes and value is a set of - indexes into self.nodes that represent the to edge - [node_from_index] = set([node_to_index,]) + i.e. + { + 'success': { + 1: [2, 3], + 4: [2, 3], + }, + 'failed': { + 1: [5], + } + } ''' - self.node_from_edges = dict() + self.node_from_edges_by_label = dict() + + r''' + Track per-node reverse relationship (child to parent) + + i.e. + { + 'success': { + 2: [1, 4], + 3: [1, 4], + }, + 'failed': { + 5: [1], + } + } + ''' + self.node_to_edges_by_label = dict() def __contains__(self, obj): - for node in self.nodes: - if node['node_object'] == obj: - return True + if self.node['node_object'] in self.node_obj_to_node_index: + return True return False def __len__(self): @@ -83,11 +104,12 @@ class SimpleDAG(object): ''' node_index = len(self.nodes) self.root_nodes.add(node_index) + self.leaf_nodes.add(node_index) self.node_obj_to_node_index[obj] = node_index entry = dict(node_object=obj, metadata=metadata) self.nodes.append(entry) - def add_edge(self, from_obj, to_obj, label=None): + def add_edge(self, from_obj, to_obj, label): from_obj_ord = self.find_ord(from_obj) to_obj_ord = self.find_ord(to_obj) @@ -103,67 +125,61 @@ class SimpleDAG(object): elif to_obj_ord is None: raise LookupError("To object not found {}".format(to_obj)) - if from_obj_ord not in self.node_from_edges: - self.node_from_edges[from_obj_ord] = set([]) + self.node_from_edges_by_label.setdefault(label, dict()) \ + .setdefault(from_obj_ord, []) + self.node_to_edges_by_label.setdefault(label, dict()) \ + .setdefault(to_obj_ord, []) - self.node_from_edges[from_obj_ord].add(to_obj_ord) + self.node_from_edges_by_label[label][from_obj_ord].append(to_obj_ord) + self.node_to_edges_by_label[label][to_obj_ord].append(from_obj_ord) - self.edges.append((from_obj_ord, to_obj_ord, label)) - - def add_edges(self, edgelist): - for edge_pair in edgelist: - self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) + ''' + To node is no longer a leaf node + ''' + for l in self.node_to_edges_by_label.keys(): + if len(self.node_to_edges_by_label[l].get(to_obj_ord, [])) != 0: + self.leaf_nodes.discard(to_obj_ord) def find_ord(self, obj): return self.node_obj_to_node_index.get(obj, None) + def _get_dependencies_by_label(self, node_index, label): + return [self.nodes[index] for index in + self.node_from_edges_by_label.get(label, {}) + .get(node_index, [])] + def get_dependencies(self, obj, label=None): - antecedents = [] this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if node == this_ord and lbl == label: - antecedents.append(self.nodes[dep]) - else: - if node == this_ord: - antecedents.append(self.nodes[dep]) - return antecedents + nodes = [] + if label: + return self._get_dependencies_by_label(this_ord, label) + else: + nodes = [] + map(lambda l: nodes.extend(self._get_dependencies_by_label(this_ord, l)), + self.node_from_edges_by_label.keys()) + return nodes - def get_dependencies_label_oblivious(self, obj): - this_ord = self.find_ord(obj) - - to_node_indexes = self.node_from_edges.get(this_ord, set([])) - return [self.nodes[index] for index in to_node_indexes] + def _get_dependents_by_label(self, node_index, label): + return [self.nodes[index] for index in + self.node_to_edges_by_label.get(label, {}) + .get(node_index, [])] def get_dependents(self, obj, label=None): - decendents = [] this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if dep == this_ord and lbl == label: - decendents.append(self.nodes[node]) - else: - if dep == this_ord: - decendents.append(self.nodes[node]) - return decendents + nodes = [] + if label: + return self._get_dependents_by_label(this_ord, label) + else: + nodes = [] + map(lambda l: nodes.extend(self._get_dependents_by_label(this_ord, l)), + self.node_to_edges_by_label.keys()) + return nodes def get_leaf_nodes(self): - leafs = [] - for n in self.nodes: - if len(self.get_dependencies(n['node_object'])) < 1: - leafs.append(n) - return leafs + return [self.nodes[index] for index in self.leaf_nodes] def get_root_nodes(self): - roots = [] - for index in self.root_nodes: - roots.append(self.nodes[index]) - return roots - - for n in self.nodes: - if len(self.get_dependents(n['node_object'])) < 1: - roots.append(n) - return roots + return [self.nodes[index] for index in self.root_nodes] def has_cycle(self): node_objs = [node['node_object'] for node in self.get_root_nodes()] @@ -178,7 +194,7 @@ class SimpleDAG(object): while stack: node_obj = stack.pop() - children = [node['node_object'] for node in self.get_dependencies_label_oblivious(node_obj)] + children = [node['node_object'] for node in self.get_dependencies(node_obj)] children_to_add = filter(lambda node_obj: node_obj not in node_objs_visited, children) if children_to_add: diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 1eadac8432..5e8db1a497 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -18,15 +18,23 @@ class WorkflowDAG(SimpleDAG): def _init_graph(self, workflow_job_or_jt): if hasattr(workflow_job_or_jt, 'workflow_job_template_nodes'): + vals = ['from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id'] + filters = { + 'from_workflowjobtemplatenode__workflow_job_template_id': workflow_job_or_jt.id + } workflow_nodes = workflow_job_or_jt.workflow_job_template_nodes - success_nodes = WorkflowJobTemplateNode.success_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id') - failure_nodes = WorkflowJobTemplateNode.failure_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id') - always_nodes = WorkflowJobTemplateNode.always_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id') + success_nodes = WorkflowJobTemplateNode.success_nodes.through.objects.filter(**filters).values_list(*vals) + failure_nodes = WorkflowJobTemplateNode.failure_nodes.through.objects.filter(**filters).values_list(*vals) + always_nodes = WorkflowJobTemplateNode.always_nodes.through.objects.filter(**filters).values_list(*vals) elif hasattr(workflow_job_or_jt, 'workflow_job_nodes'): + vals = ['from_workflowjobnode_id', 'to_workflowjobnode_id'] + filters = { + 'from_workflowjobnode__workflow_job_id': workflow_job_or_jt.id + } workflow_nodes = workflow_job_or_jt.workflow_job_nodes - success_nodes = WorkflowJobNode.success_nodes.through.objects.filter(from_workflowjobnode__workflow_job_id=workflow_job_or_jt.id).values_list('from_workflowjobnode_id', 'to_workflowjobnode_id') - failure_nodes = WorkflowJobNode.failure_nodes.through.objects.filter(from_workflowjobnode__workflow_job_id=workflow_job_or_jt.id).values_list('from_workflowjobnode_id', 'to_workflowjobnode_id') - always_nodes = WorkflowJobNode.always_nodes.through.objects.filter(from_workflowjobnode__workflow_job_id=workflow_job_or_jt.id).values_list('from_workflowjobnode_id', 'to_workflowjobnode_id') + success_nodes = WorkflowJobNode.success_nodes.through.objects.filter(**filters).values_list(*vals) + failure_nodes = WorkflowJobNode.failure_nodes.through.objects.filter(**filters).values_list(*vals) + always_nodes = WorkflowJobNode.always_nodes.through.objects.filter(**filters).values_list(*vals) else: raise RuntimeError("Unexpected object {} {}".format(type(workflow_job_or_jt), workflow_job_or_jt)) @@ -43,7 +51,7 @@ class WorkflowDAG(SimpleDAG): for edge in always_nodes: self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'always_nodes') - ''' + r''' Determine if all, relevant, parents node are finished. Relevant parents are parents that are marked do_not_run False. @@ -147,7 +155,7 @@ class WorkflowDAG(SimpleDAG): return False, False return True, is_failed - ''' + r''' Determine if all nodes have been decided on being marked do_not_run. Nodes that are do_not_run False may become do_not_run True in the future. We know a do_not_run False node will NOT be marked do_not_run True if there @@ -162,10 +170,9 @@ class WorkflowDAG(SimpleDAG): if n.do_not_run is False and not n.job: return False return True - #return not any((n.do_not_run is False and not n.job) for n in workflow_nodes) - ''' + r''' Determine if a node (1) is ready to be marked do_not_run and (2) should be marked do_not_run.