optimize cycle detection

This commit is contained in:
chris meyers 2018-11-08 11:22:48 -05:00 committed by mabashian
parent b84fc3b111
commit 9f3e272665
3 changed files with 73 additions and 23 deletions

View File

@ -6,6 +6,24 @@ class SimpleDAG(object):
def __init__(self):
self.nodes = []
self.edges = []
self.root_nodes = set([])
'''
Track node_obj->node index
dict where key is a full workflow node object or whatever we are
storing in ['node_object'] and value is an index to be used into
self.nodes
'''
self.node_obj_to_node_index = dict()
'''
Track per-node from->to edges
dict where key is the node index in self.nodes and value is a set of
indexes into self.nodes that represent the to edge
[node_from_index] = set([node_to_index,])
'''
self.node_from_edges = dict()
def __contains__(self, obj):
for node in self.nodes:
@ -60,17 +78,36 @@ class SimpleDAG(object):
def add_node(self, obj, metadata=None):
if self.find_ord(obj) is None:
self.nodes.append(dict(node_object=obj, metadata=metadata))
'''
Assume node is a root node until a child is added
'''
node_index = len(self.nodes)
self.root_nodes.add(node_index)
self.node_obj_to_node_index[obj] = node_index
entry = dict(node_object=obj, metadata=metadata)
self.nodes.append(entry)
def add_edge(self, from_obj, to_obj, label=None):
from_obj_ord = self.find_ord(from_obj)
to_obj_ord = self.find_ord(to_obj)
'''
To node is no longer a root node
'''
self.root_nodes.discard(to_obj_ord)
if from_obj_ord is None and to_obj_ord is None:
raise LookupError("From object {} and to object not found".format(from_obj, to_obj))
elif from_obj_ord is None:
raise LookupError("From object not found {}".format(from_obj))
elif to_obj_ord is None:
raise LookupError("To object not found {}".format(to_obj))
if from_obj_ord not in self.node_from_edges:
self.node_from_edges[from_obj_ord] = set([])
self.node_from_edges[from_obj_ord].add(to_obj_ord)
self.edges.append((from_obj_ord, to_obj_ord, label))
def add_edges(self, edgelist):
@ -78,10 +115,7 @@ class SimpleDAG(object):
self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2])
def find_ord(self, obj):
for idx in range(len(self.nodes)):
if obj == self.nodes[idx]['node_object']:
return idx
return None
return self.node_obj_to_node_index.get(obj, None)
def get_dependencies(self, obj, label=None):
antecedents = []
@ -95,6 +129,12 @@ class SimpleDAG(object):
antecedents.append(self.nodes[dep])
return antecedents
def get_dependencies_label_oblivious(self, obj):
this_ord = self.find_ord(obj)
to_node_indexes = self.node_from_edges.get(this_ord, set([]))
return [self.nodes[index] for index in to_node_indexes]
def get_dependents(self, obj, label=None):
decendents = []
this_ord = self.find_ord(obj)
@ -116,6 +156,10 @@ class SimpleDAG(object):
def get_root_nodes(self):
roots = []
for index in self.root_nodes:
roots.append(self.nodes[index])
return roots
for n in self.nodes:
if len(self.get_dependents(n['node_object'])) < 1:
roots.append(n)
@ -126,6 +170,7 @@ class SimpleDAG(object):
node_objs_visited = set([])
path = set([])
stack = node_objs
res = False
if len(self.nodes) != 0 and len(node_objs) == 0:
return True
@ -133,17 +178,17 @@ class SimpleDAG(object):
while stack:
node_obj = stack.pop()
children = [node['node_object'] for node in self.get_dependencies(node_obj)]
children = [node['node_object'] for node in self.get_dependencies_label_oblivious(node_obj)]
children_to_add = filter(lambda node_obj: node_obj not in node_objs_visited, children)
if children_to_add:
if node_obj in path:
return True
res = True
break
path.add(node_obj)
stack.append(node_obj)
stack.extend(children_to_add)
else:
node_objs_visited.add(node_obj)
path.discard(node_obj)
return False
return res

View File

@ -1,6 +1,7 @@
# Python
import copy
from awx.main.models import WorkflowJobTemplateNode
# AWX
from awx.main.scheduler.dag_simple import SimpleDAG
@ -14,21 +15,28 @@ class WorkflowDAG(SimpleDAG):
def _init_graph(self, workflow_job_or_jt):
if hasattr(workflow_job_or_jt, 'workflow_job_template_nodes'):
node_qs = workflow_job_or_jt.workflow_job_template_nodes
workflow_nodes = workflow_job_or_jt.workflow_job_template_nodes
elif hasattr(workflow_job_or_jt, 'workflow_job_nodes'):
node_qs = workflow_job_or_jt.workflow_job_nodes
workflow_nodes = workflow_job_or_jt.workflow_job_nodes
else:
raise RuntimeError("Unexpected object {} {}".format(type(workflow_job_or_jt), workflow_job_or_jt))
workflow_nodes = node_qs.prefetch_related('success_nodes', 'failure_nodes', 'always_nodes').all()
for workflow_node in workflow_nodes:
success_nodes = WorkflowJobTemplateNode.success_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id')
failure_nodes = WorkflowJobTemplateNode.failure_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id')
always_nodes = WorkflowJobTemplateNode.always_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id')
wfn_by_id = dict()
for workflow_node in workflow_nodes.all():
wfn_by_id[workflow_node.id] = workflow_node
self.add_node(workflow_node)
for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']:
for workflow_node in workflow_nodes:
related_nodes = getattr(workflow_node, node_type).all()
for related_node in related_nodes:
self.add_edge(workflow_node, related_node, node_type)
for edge in success_nodes:
self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'success_nodes')
for edge in failure_nodes:
self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'failure_nodes')
for edge in always_nodes:
self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'always_nodes')
'''
Determine if all, relevant, parents node are finished.

View File

@ -54,11 +54,8 @@ In the event that spawning the workflow would result in recursion, the child wor
will be marked as failed with a message explaining that recursion was detected.
This is to prevent saturation of the task system with an infinite chain of workflows.
### Tree-Graph Formation and Restrictions
The tree-graph structure of a workflow is enforced by associating workflow job template nodes via endpoints `/workflow_job_template_nodes/\d+/*_nodes/`, where `*` has options `success`, `failure` and `always`. However there are restrictions that must be enforced when setting up new connections. Here are the three restrictions that will raise validation error when break:
* Cycle restriction: According to tree definition, no cycle is allowed.
> Note: A node can now have all three types of child nodes.
### DAG Formation and Restrictions
The DAG structure of a workflow is enforced by associating workflow job template nodes via endpoints `/workflow_job_template_nodes/\d+/*_nodes/`, where `*` has options `success`, `failure` and `always`. There is one restriction that is enforced when setting up new connections and that is the cycle restriction, since it's a DAG.
### Workflow Run Details
A typical workflow run starts by either POSTing to endpoint `/workflow_job_templates/\d+/launch/`, or being triggered automatically by related schedule. At the very first, the workflow job template creates workflow job, and all related workflow job template nodes create workflow job nodes. Right after that, all root nodes are populated with corresponding job resources and start running. If nothing goes wrong, each decision tree will follow its own route to completion. The entire workflow finishes running when all its decision trees complete.