Files
awx/awx/main/scheduler/dag_workflow.py
chris meyers febf051748 do not mark ujt None nodes dnr
* Leave workflow nodes with no related unified job template nodes
do_not_run = False. If we mark it True, we can't differentiate between
the actual want to not take that path vs. do not run this because I do
not have a valid related unified job template.
2018-11-27 16:12:41 -05:00

213 lines
8.4 KiB
Python

from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import smart_text
# Python
from awx.main.models import (
WorkflowJobTemplateNode,
WorkflowJobNode,
)
# AWX
from awx.main.scheduler.dag_simple import SimpleDAG
class WorkflowDAG(SimpleDAG):
def __init__(self, workflow_job=None):
super(WorkflowDAG, self).__init__()
if workflow_job:
self._init_graph(workflow_job)
def _init_graph(self, workflow_job_or_jt):
if hasattr(workflow_job_or_jt, 'workflow_job_template_nodes'):
vals = ['from_workflowjobtemplatenode_id', 'to_workflowjobtemplatenode_id']
filters = {
'from_workflowjobtemplatenode__workflow_job_template_id': workflow_job_or_jt.id
}
workflow_nodes = workflow_job_or_jt.workflow_job_template_nodes
success_nodes = WorkflowJobTemplateNode.success_nodes.through.objects.filter(**filters).values_list(*vals)
failure_nodes = WorkflowJobTemplateNode.failure_nodes.through.objects.filter(**filters).values_list(*vals)
always_nodes = WorkflowJobTemplateNode.always_nodes.through.objects.filter(**filters).values_list(*vals)
elif hasattr(workflow_job_or_jt, 'workflow_job_nodes'):
vals = ['from_workflowjobnode_id', 'to_workflowjobnode_id']
filters = {
'from_workflowjobnode__workflow_job_id': workflow_job_or_jt.id
}
workflow_nodes = workflow_job_or_jt.workflow_job_nodes
success_nodes = WorkflowJobNode.success_nodes.through.objects.filter(**filters).values_list(*vals)
failure_nodes = WorkflowJobNode.failure_nodes.through.objects.filter(**filters).values_list(*vals)
always_nodes = WorkflowJobNode.always_nodes.through.objects.filter(**filters).values_list(*vals)
else:
raise RuntimeError("Unexpected object {} {}".format(type(workflow_job_or_jt), workflow_job_or_jt))
wfn_by_id = dict()
for workflow_node in workflow_nodes.all():
wfn_by_id[workflow_node.id] = workflow_node
self.add_node(workflow_node)
for edge in success_nodes:
self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'success_nodes')
for edge in failure_nodes:
self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'failure_nodes')
for edge in always_nodes:
self.add_edge(wfn_by_id[edge[0]], wfn_by_id[edge[1]], 'always_nodes')
def _are_relevant_parents_finished(self, node):
obj = node['node_object']
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
for p in parent_nodes:
if p.do_not_run is True:
continue
elif p.unified_job_template is None:
continue
# Node might run a job
elif not p.job:
return False
# Node decidedly got a job; check if job is done
elif p.job and p.job.status not in ['successful', 'failed', 'error', 'canceled']:
return False
return True
def bfs_nodes_to_run(self):
nodes_found = []
for node in self.sort_nodes_topological():
obj = node['node_object']
if obj.do_not_run is True:
continue
elif obj.job:
continue
elif obj.unified_job_template is None:
continue
if self._are_relevant_parents_finished(node):
nodes_found.append(node)
return [n['node_object'] for n in nodes_found]
def cancel_node_jobs(self):
cancel_finished = True
for n in self.nodes:
obj = n['node_object']
job = obj.job
if not job:
continue
elif job.can_cancel:
cancel_finished = False
job.cancel()
return cancel_finished
def is_workflow_done(self):
for node in self.nodes:
obj = node['node_object']
if obj.do_not_run is False and not obj.job and obj.unified_job_template:
return False
elif obj.job and obj.job.status not in ['successful', 'failed', 'canceled', 'error']:
return False
return True
def has_workflow_failed(self):
failed_nodes = []
res = False
failed_path_nodes_id_status = []
failed_unified_job_template_node_ids = []
for node in self.nodes:
obj = node['node_object']
if obj.do_not_run is False and obj.unified_job_template is None:
failed_nodes.append(node)
elif obj.job and obj.job.status in ['failed', 'canceled', 'error']:
failed_nodes.append(node)
for node in failed_nodes:
obj = node['node_object']
if (len(self.get_dependencies(obj, 'failure_nodes')) +
len(self.get_dependencies(obj, 'always_nodes'))) == 0:
if obj.unified_job_template is None:
res = True
failed_unified_job_template_node_ids.append(str(obj.id))
else:
res = True
failed_path_nodes_id_status.append((str(obj.id), obj.job.status))
if res is True:
s = _("No error handle path for workflow job node(s) [{node_status}] workflow job "
"node(s) missing unified job template and error handle path [{no_ufjt}].")
parms = {
'node_status': '',
'no_ufjt': '',
}
if len(failed_path_nodes_id_status) > 0:
parms['node_status'] = ",".join(["({},{})".format(id, status) for id, status in failed_path_nodes_id_status])
if len(failed_unified_job_template_node_ids) > 0:
parms['no_ufjt'] = ",".join(failed_unified_job_template_node_ids)
return True, smart_text(s.format(**parms))
return False, None
r'''
Determine if all nodes have been decided on being marked do_not_run.
Nodes that are do_not_run False may become do_not_run True in the future.
We know a do_not_run False node will NOT be marked do_not_run True if there
is a job run for that node.
:param workflow_nodes: list of workflow_nodes
Return a boolean
'''
def _are_all_nodes_dnr_decided(self, workflow_nodes):
for n in workflow_nodes:
if n.do_not_run is False and not n.job and n.unified_job_template:
return False
return True
r'''
Determine if a node (1) is ready to be marked do_not_run and (2) should
be marked do_not_run.
:param node: SimpleDAG internal node
:param parent_nodes: list of workflow_nodes
Return a boolean
'''
def _should_mark_node_dnr(self, node, parent_nodes):
for p in parent_nodes:
if p.do_not_run is True:
pass
elif p.job:
if p.job.status == 'successful':
if node in (self.get_dependencies(p, 'success_nodes') +
self.get_dependencies(p, 'always_nodes')):
return False
elif p.job.status in ['failed', 'error', 'canceled']:
if node in (self.get_dependencies(p, 'failure_nodes') +
self.get_dependencies(p, 'always_nodes')):
return False
else:
return False
elif p.do_not_run is False and p.unified_job_template is None:
if node in (self.get_dependencies(p, 'failure_nodes') +
self.get_dependencies(p, 'always_nodes')):
return False
else:
return False
return True
def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes_marked_do_not_run = []
for node in self.sort_nodes_topological():
obj = node['node_object']
if obj.do_not_run is False and not obj.job and node not in root_nodes:
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
if self._are_all_nodes_dnr_decided(parent_nodes):
if self._should_mark_node_dnr(node, parent_nodes):
obj.do_not_run = True
nodes_marked_do_not_run.append(node)
return [n['node_object'] for n in nodes_marked_do_not_run]