more efficient graph processing

* Getting parent nodes from child was inefficient. Optimize it with a
hash table like we did for the getting of children.
* Getting leaf nodes was inefficient. Optimize it like we did getting
root nodes. A node is assumed to be a leaf node until it gets a child.
This commit is contained in:
chris meyers
2018-11-12 15:06:09 -05:00
committed by mabashian
parent 700860e040
commit 0499d419c3
2 changed files with 91 additions and 68 deletions

View File

@@ -18,15 +18,23 @@ class WorkflowDAG(SimpleDAG):
def _init_graph(self, workflow_job_or_jt):
if hasattr(workflow_job_or_jt, 'workflow_job_template_nodes'):
vals = ['from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id']
filters = {
'from_workflowjobtemplatenode__workflow_job_template_id': workflow_job_or_jt.id
}
workflow_nodes = workflow_job_or_jt.workflow_job_template_nodes
success_nodes = WorkflowJobTemplateNode.success_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id')
failure_nodes = WorkflowJobTemplateNode.failure_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id')
always_nodes = WorkflowJobTemplateNode.always_nodes.through.objects.filter(from_workflowjobtemplatenode__workflow_job_template_id=workflow_job_or_jt.id).values_list('from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id')
success_nodes = WorkflowJobTemplateNode.success_nodes.through.objects.filter(**filters).values_list(*vals)
failure_nodes = WorkflowJobTemplateNode.failure_nodes.through.objects.filter(**filters).values_list(*vals)
always_nodes = WorkflowJobTemplateNode.always_nodes.through.objects.filter(**filters).values_list(*vals)
elif hasattr(workflow_job_or_jt, 'workflow_job_nodes'):
vals = ['from_workflowjobnode_id', 'to_workflowjobnode_id']
filters = {
'from_workflowjobnode__workflow_job_id': workflow_job_or_jt.id
}
workflow_nodes = workflow_job_or_jt.workflow_job_nodes
success_nodes = WorkflowJobNode.success_nodes.through.objects.filter(from_workflowjobnode__workflow_job_id=workflow_job_or_jt.id).values_list('from_workflowjobnode_id', 'to_workflowjobnode_id')
failure_nodes = WorkflowJobNode.failure_nodes.through.objects.filter(from_workflowjobnode__workflow_job_id=workflow_job_or_jt.id).values_list('from_workflowjobnode_id', 'to_workflowjobnode_id')
always_nodes = WorkflowJobNode.always_nodes.through.objects.filter(from_workflowjobnode__workflow_job_id=workflow_job_or_jt.id).values_list('from_workflowjobnode_id', 'to_workflowjobnode_id')
success_nodes = WorkflowJobNode.success_nodes.through.objects.filter(**filters).values_list(*vals)
failure_nodes = WorkflowJobNode.failure_nodes.through.objects.filter(**filters).values_list(*vals)
always_nodes = WorkflowJobNode.always_nodes.through.objects.filter(**filters).values_list(*vals)
else:
raise RuntimeError("Unexpected object {} {}".format(type(workflow_job_or_jt), workflow_job_or_jt))
@@ -43,7 +51,7 @@ class WorkflowDAG(SimpleDAG):
for edge in always_nodes:
self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'always_nodes')
'''
r'''
Determine if all, relevant, parents node are finished.
Relevant parents are parents that are marked do_not_run False.
@@ -147,7 +155,7 @@ class WorkflowDAG(SimpleDAG):
return False, False
return True, is_failed
'''
r'''
Determine if all nodes have been decided on being marked do_not_run.
Nodes that are do_not_run False may become do_not_run True in the future.
We know a do_not_run False node will NOT be marked do_not_run True if there
@@ -162,10 +170,9 @@ class WorkflowDAG(SimpleDAG):
if n.do_not_run is False and not n.job:
return False
return True
#return not any((n.do_not_run is False and not n.job) for n in workflow_nodes)
'''
r'''
Determine if a node (1) is ready to be marked do_not_run and (2) should
be marked do_not_run.