mirror of
https://github.com/ansible/awx.git
synced 2026-01-21 22:48:02 -03:30
Merge pull request #5621 from rebeccahhh/workflow-convergence
Any/All boolean added in workflow convergence Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
commit
30354dbcd0
@ -3677,7 +3677,7 @@ class WorkflowJobTemplateNodeSerializer(LaunchConfigurationBaseSerializer):
|
||||
class Meta:
|
||||
model = WorkflowJobTemplateNode
|
||||
fields = ('*', 'workflow_job_template', '-name', '-description', 'id', 'url', 'related',
|
||||
'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes',)
|
||||
'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes', 'all_parents_must_converge',)
|
||||
|
||||
def get_related(self, obj):
|
||||
res = super(WorkflowJobTemplateNodeSerializer, self).get_related(obj)
|
||||
@ -3716,8 +3716,8 @@ class WorkflowJobNodeSerializer(LaunchConfigurationBaseSerializer):
|
||||
class Meta:
|
||||
model = WorkflowJobNode
|
||||
fields = ('*', 'job', 'workflow_job', '-name', '-description', 'id', 'url', 'related',
|
||||
'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes',
|
||||
'do_not_run',)
|
||||
'unified_job_template', 'success_nodes', 'failure_nodes', 'always_nodes',
|
||||
'all_parents_must_converge', 'do_not_run',)
|
||||
|
||||
def get_related(self, obj):
|
||||
res = super(WorkflowJobNodeSerializer, self).get_related(obj)
|
||||
|
||||
@ -0,0 +1,23 @@
|
||||
# Generated by Django 2.2.4 on 2020-01-08 22:11
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0106_v370_remove_inventory_groups_with_active_failures'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='workflowjobnode',
|
||||
name='all_parents_must_converge',
|
||||
field=models.BooleanField(default=False, help_text='If enabled then the node will only run if all of the parent nodes have met the criteria to reach this node'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='workflowjobtemplatenode',
|
||||
name='all_parents_must_converge',
|
||||
field=models.BooleanField(default=False, help_text='If enabled then the node will only run if all of the parent nodes have met the criteria to reach this node'),
|
||||
),
|
||||
]
|
||||
@ -79,6 +79,11 @@ class WorkflowNodeBase(CreatedModifiedModel, LaunchTimeConfig):
|
||||
symmetrical=False,
|
||||
related_name='%(class)ss_always',
|
||||
)
|
||||
all_parents_must_converge = models.BooleanField(
|
||||
default=False,
|
||||
help_text=_("If enabled then the node will only run if all of the parent nodes "
|
||||
"have met the criteria to reach this node")
|
||||
)
|
||||
unified_job_template = models.ForeignKey(
|
||||
'UnifiedJobTemplate',
|
||||
related_name='%(class)ss',
|
||||
@ -102,7 +107,7 @@ class WorkflowNodeBase(CreatedModifiedModel, LaunchTimeConfig):
|
||||
'''
|
||||
return ['workflow_job', 'unified_job_template',
|
||||
'extra_data', 'survey_passwords',
|
||||
'inventory', 'credentials', 'char_prompts']
|
||||
'inventory', 'credentials', 'char_prompts', 'all_parents_must_converge']
|
||||
|
||||
def create_workflow_job_node(self, **kwargs):
|
||||
'''
|
||||
@ -130,7 +135,7 @@ class WorkflowJobTemplateNode(WorkflowNodeBase):
|
||||
FIELDS_TO_PRESERVE_AT_COPY = [
|
||||
'unified_job_template', 'workflow_job_template', 'success_nodes', 'failure_nodes',
|
||||
'always_nodes', 'credentials', 'inventory', 'extra_data', 'survey_passwords',
|
||||
'char_prompts'
|
||||
'char_prompts', 'all_parents_must_converge'
|
||||
]
|
||||
REENCRYPTION_BLACKLIST_AT_COPY = ['extra_data', 'survey_passwords']
|
||||
|
||||
|
||||
@ -89,8 +89,8 @@ class SimpleDAG(object):
|
||||
run_status(n['node_object']),
|
||||
color
|
||||
)
|
||||
for label, edges in self.node_from_edges_by_label.iteritems():
|
||||
for from_node, to_nodes in edges.iteritems():
|
||||
for label, edges in self.node_from_edges_by_label.items():
|
||||
for from_node, to_nodes in edges.items():
|
||||
for to_node in to_nodes:
|
||||
doc += "%s -> %s [ label=\"%s\" ];\n" % (
|
||||
run_status(self.nodes[from_node]['node_object']),
|
||||
@ -140,36 +140,36 @@ class SimpleDAG(object):
|
||||
def find_ord(self, obj):
|
||||
return self.node_obj_to_node_index.get(obj, None)
|
||||
|
||||
def _get_dependencies_by_label(self, node_index, label):
|
||||
def _get_children_by_label(self, node_index, label):
|
||||
return [self.nodes[index] for index in
|
||||
self.node_from_edges_by_label.get(label, {})
|
||||
.get(node_index, [])]
|
||||
|
||||
def get_dependencies(self, obj, label=None):
|
||||
def get_children(self, obj, label=None):
|
||||
this_ord = self.find_ord(obj)
|
||||
nodes = []
|
||||
if label:
|
||||
return self._get_dependencies_by_label(this_ord, label)
|
||||
return self._get_children_by_label(this_ord, label)
|
||||
else:
|
||||
nodes = []
|
||||
for l in self.node_from_edges_by_label.keys():
|
||||
nodes.extend(self._get_dependencies_by_label(this_ord, l))
|
||||
nodes.extend(self._get_children_by_label(this_ord, l))
|
||||
return nodes
|
||||
|
||||
def _get_dependents_by_label(self, node_index, label):
|
||||
def _get_parents_by_label(self, node_index, label):
|
||||
return [self.nodes[index] for index in
|
||||
self.node_to_edges_by_label.get(label, {})
|
||||
.get(node_index, [])]
|
||||
|
||||
def get_dependents(self, obj, label=None):
|
||||
def get_parents(self, obj, label=None):
|
||||
this_ord = self.find_ord(obj)
|
||||
nodes = []
|
||||
if label:
|
||||
return self._get_dependents_by_label(this_ord, label)
|
||||
return self._get_parents_by_label(this_ord, label)
|
||||
else:
|
||||
nodes = []
|
||||
for l in self.node_to_edges_by_label.keys():
|
||||
nodes.extend(self._get_dependents_by_label(this_ord, l))
|
||||
nodes.extend(self._get_parents_by_label(this_ord, l))
|
||||
return nodes
|
||||
|
||||
def get_root_nodes(self):
|
||||
@ -188,7 +188,7 @@ class SimpleDAG(object):
|
||||
while stack:
|
||||
node_obj = stack.pop()
|
||||
|
||||
children = [node['node_object'] for node in self.get_dependencies(node_obj)]
|
||||
children = [node['node_object'] for node in self.get_children(node_obj)]
|
||||
children_to_add = list(filter(lambda node_obj: node_obj not in node_objs_visited, children))
|
||||
|
||||
if children_to_add:
|
||||
@ -212,7 +212,7 @@ class SimpleDAG(object):
|
||||
if obj.id in obj_ids_processed:
|
||||
return
|
||||
|
||||
for child in self.get_dependencies(obj):
|
||||
for child in self.get_children(obj):
|
||||
visit(child)
|
||||
obj_ids_processed.add(obj.id)
|
||||
nodes_sorted.appendleft(node)
|
||||
|
||||
@ -55,7 +55,7 @@ class WorkflowDAG(SimpleDAG):
|
||||
|
||||
def _are_relevant_parents_finished(self, node):
|
||||
obj = node['node_object']
|
||||
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
|
||||
parent_nodes = [p['node_object'] for p in self.get_parents(obj)]
|
||||
for p in parent_nodes:
|
||||
if p.do_not_run is True:
|
||||
continue
|
||||
@ -69,33 +69,55 @@ class WorkflowDAG(SimpleDAG):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _all_parents_met_convergence_criteria(self, node):
|
||||
# This function takes any node and checks that all it's parents have met their criteria to run the child.
|
||||
# This returns a boolean and is really only useful if the node is an ALL convergence node and is
|
||||
# intended to be used in conjuction with the node property `all_parents_must_converge`
|
||||
obj = node['node_object']
|
||||
parent_nodes = [p['node_object'] for p in self.get_parents(obj)]
|
||||
for p in parent_nodes:
|
||||
#node has a status
|
||||
if p.job and p.job.status in ["successful", "failed"]:
|
||||
if p.job and p.job.status == "successful":
|
||||
status = "success_nodes"
|
||||
elif p.job and p.job.status == "failed":
|
||||
status = "failure_nodes"
|
||||
#check that the nodes status matches either a pathway of the same status or is an always path.
|
||||
if (p not in [node['node_object'] for node in self.get_parents(obj, status)]
|
||||
and p not in [node['node_object'] for node in self.get_parents(obj, "always_nodes")]):
|
||||
return False
|
||||
return True
|
||||
|
||||
def bfs_nodes_to_run(self):
|
||||
nodes = self.get_root_nodes()
|
||||
nodes_found = []
|
||||
node_ids_visited = set()
|
||||
|
||||
for index, n in enumerate(nodes):
|
||||
obj = n['node_object']
|
||||
if obj.id in node_ids_visited:
|
||||
continue
|
||||
node_ids_visited.add(obj.id)
|
||||
|
||||
if obj.do_not_run is True:
|
||||
continue
|
||||
|
||||
if obj.job:
|
||||
elif obj.job:
|
||||
if obj.job.status in ['failed', 'error', 'canceled']:
|
||||
nodes.extend(self.get_dependencies(obj, 'failure_nodes') +
|
||||
self.get_dependencies(obj, 'always_nodes'))
|
||||
nodes.extend(self.get_children(obj, 'failure_nodes') +
|
||||
self.get_children(obj, 'always_nodes'))
|
||||
elif obj.job.status == 'successful':
|
||||
nodes.extend(self.get_dependencies(obj, 'success_nodes') +
|
||||
self.get_dependencies(obj, 'always_nodes'))
|
||||
nodes.extend(self.get_children(obj, 'success_nodes') +
|
||||
self.get_children(obj, 'always_nodes'))
|
||||
elif obj.unified_job_template is None:
|
||||
nodes.extend(self.get_dependencies(obj, 'failure_nodes') +
|
||||
self.get_dependencies(obj, 'always_nodes'))
|
||||
nodes.extend(self.get_children(obj, 'failure_nodes') +
|
||||
self.get_children(obj, 'always_nodes'))
|
||||
else:
|
||||
if self._are_relevant_parents_finished(n):
|
||||
# This catches root nodes or ANY convergence nodes
|
||||
if not obj.all_parents_must_converge and self._are_relevant_parents_finished(n):
|
||||
nodes_found.append(n)
|
||||
# This catches ALL convergence nodes
|
||||
elif obj.all_parents_must_converge and self._are_relevant_parents_finished(n):
|
||||
if self._all_parents_met_convergence_criteria(n):
|
||||
nodes_found.append(n)
|
||||
|
||||
return [n['node_object'] for n in nodes_found]
|
||||
|
||||
def cancel_node_jobs(self):
|
||||
@ -135,8 +157,8 @@ class WorkflowDAG(SimpleDAG):
|
||||
|
||||
for node in failed_nodes:
|
||||
obj = node['node_object']
|
||||
if (len(self.get_dependencies(obj, 'failure_nodes')) +
|
||||
len(self.get_dependencies(obj, 'always_nodes'))) == 0:
|
||||
if (len(self.get_children(obj, 'failure_nodes')) +
|
||||
len(self.get_children(obj, 'always_nodes'))) == 0:
|
||||
if obj.unified_job_template is None:
|
||||
res = True
|
||||
failed_unified_job_template_node_ids.append(str(obj.id))
|
||||
@ -190,35 +212,48 @@ class WorkflowDAG(SimpleDAG):
|
||||
pass
|
||||
elif p.job:
|
||||
if p.job.status == 'successful':
|
||||
if node in (self.get_dependencies(p, 'success_nodes') +
|
||||
self.get_dependencies(p, 'always_nodes')):
|
||||
if node in (self.get_children(p, 'success_nodes') +
|
||||
self.get_children(p, 'always_nodes')):
|
||||
return False
|
||||
elif p.job.status in ['failed', 'error', 'canceled']:
|
||||
if node in (self.get_dependencies(p, 'failure_nodes') +
|
||||
self.get_dependencies(p, 'always_nodes')):
|
||||
if node in (self.get_children(p, 'failure_nodes') +
|
||||
self.get_children(p, 'always_nodes')):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
elif p.do_not_run is False and p.unified_job_template is None:
|
||||
if node in (self.get_dependencies(p, 'failure_nodes') +
|
||||
self.get_dependencies(p, 'always_nodes')):
|
||||
elif not p.do_not_run and p.unified_job_template is None:
|
||||
if node in (self.get_children(p, 'failure_nodes') +
|
||||
self.get_children(p, 'always_nodes')):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
r'''
|
||||
determine if the current node is a convergence node by checking if all the
|
||||
parents are finished then checking to see if all parents meet the needed
|
||||
path criteria to run the convergence child.
|
||||
(i.e. parent must fail, parent must succeed, etc. to proceed)
|
||||
|
||||
Return a list object
|
||||
'''
|
||||
def mark_dnr_nodes(self):
|
||||
root_nodes = self.get_root_nodes()
|
||||
nodes_marked_do_not_run = []
|
||||
|
||||
for node in self.sort_nodes_topological():
|
||||
obj = node['node_object']
|
||||
|
||||
if obj.do_not_run is False and not obj.job and node not in root_nodes:
|
||||
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
|
||||
if self._are_all_nodes_dnr_decided(parent_nodes):
|
||||
if self._should_mark_node_dnr(node, parent_nodes):
|
||||
parent_nodes = [p['node_object'] for p in self.get_parents(obj)]
|
||||
if not obj.do_not_run and not obj.job and node not in root_nodes:
|
||||
if obj.all_parents_must_converge:
|
||||
if any(p.do_not_run for p in parent_nodes) or not self._all_parents_met_convergence_criteria(node):
|
||||
obj.do_not_run = True
|
||||
nodes_marked_do_not_run.append(node)
|
||||
else:
|
||||
if self._are_all_nodes_dnr_decided(parent_nodes):
|
||||
if self._should_mark_node_dnr(node, parent_nodes):
|
||||
obj.do_not_run = True
|
||||
nodes_marked_do_not_run.append(node)
|
||||
|
||||
return [n['node_object'] for n in nodes_marked_do_not_run]
|
||||
|
||||
@ -171,6 +171,7 @@ class TestWorkflowJobCreate:
|
||||
with mocker.patch('awx.main.models.WorkflowJobNode.objects.create', mock_create):
|
||||
wfjt_node_no_prompts.create_workflow_job_node(workflow_job=workflow_job_unit)
|
||||
mock_create.assert_called_once_with(
|
||||
all_parents_must_converge=False,
|
||||
extra_data={},
|
||||
survey_passwords={},
|
||||
char_prompts=wfjt_node_no_prompts.char_prompts,
|
||||
@ -185,6 +186,7 @@ class TestWorkflowJobCreate:
|
||||
workflow_job=workflow_job_unit
|
||||
)
|
||||
mock_create.assert_called_once_with(
|
||||
all_parents_must_converge=False,
|
||||
extra_data={},
|
||||
survey_passwords={},
|
||||
char_prompts=wfjt_node_with_prompts.char_prompts,
|
||||
|
||||
@ -19,6 +19,7 @@ class WorkflowNode(object):
|
||||
self.job = job
|
||||
self.do_not_run = do_not_run
|
||||
self.unified_job_template = unified_job_template
|
||||
self.all_parents_must_converge = False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -94,7 +95,7 @@ class TestDNR():
|
||||
(g, nodes) = workflow_dag_1
|
||||
|
||||
r'''
|
||||
S0
|
||||
0
|
||||
/\
|
||||
S / \
|
||||
/ \
|
||||
@ -113,7 +114,7 @@ class TestDNR():
|
||||
assert 0 == len(do_not_run_nodes)
|
||||
|
||||
r'''
|
||||
S0
|
||||
0
|
||||
/\
|
||||
S / \
|
||||
/ \
|
||||
@ -132,6 +133,259 @@ class TestDNR():
|
||||
assert 1 == len(do_not_run_nodes)
|
||||
assert nodes[3] == do_not_run_nodes[0]
|
||||
|
||||
class TestAllWorkflowNodes():
|
||||
# test workflow convergence is functioning as expected
|
||||
@pytest.fixture
|
||||
def simple_all_convergence(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(4)]
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
|
||||
r'''
|
||||
0
|
||||
/\
|
||||
S / \ S
|
||||
/ \
|
||||
1 2
|
||||
\ /
|
||||
F \ / S
|
||||
\/
|
||||
3
|
||||
|
||||
'''
|
||||
g.add_edge(nodes[0], nodes[1], "success_nodes")
|
||||
g.add_edge(nodes[0], nodes[2], "success_nodes")
|
||||
g.add_edge(nodes[1], nodes[3], "failure_nodes")
|
||||
g.add_edge(nodes[2], nodes[3], "success_nodes")
|
||||
nodes[3].all_parents_must_converge = True
|
||||
nodes[0].job = Job(status='successful')
|
||||
nodes[1].job = Job(status='failed')
|
||||
nodes[2].job = Job(status='successful')
|
||||
return (g, nodes)
|
||||
|
||||
def test_simple_all_convergence(self, simple_all_convergence):
|
||||
(g, nodes) = simple_all_convergence
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
assert 0 == len(dnr_nodes), "no nodes should be marked DNR"
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 1 == len(nodes_to_run), "Node 3, and only node 3, should be chosen to run"
|
||||
assert nodes[3] == nodes_to_run[0], "Only node 3 should be chosen to run"
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_all_converge_1(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(3)]
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
r'''
|
||||
0
|
||||
|\ F
|
||||
| \
|
||||
S| 1
|
||||
| /
|
||||
|/ A
|
||||
2
|
||||
'''
|
||||
g.add_edge(nodes[0], nodes[1], "failure_nodes")
|
||||
g.add_edge(nodes[0], nodes[2], "success_nodes")
|
||||
g.add_edge(nodes[1], nodes[2], "always_nodes")
|
||||
nodes[2].all_parents_must_converge = True
|
||||
nodes[0].job = Job(status='successful')
|
||||
return (g, nodes)
|
||||
|
||||
def test_all_converge_edge_case_1(self, workflow_all_converge_1):
|
||||
(g, nodes) = workflow_all_converge_1
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
assert 2 == len(dnr_nodes), "node[1] and node[2] should be marked DNR"
|
||||
assert nodes[1] == dnr_nodes[0], "Node 1 should be marked DNR"
|
||||
assert nodes[2] == dnr_nodes[1], "Node 2 should be marked DNR"
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 0 == len(nodes_to_run), "No nodes should be chosen to run"
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_all_converge_2(self, wf_node_generator):
|
||||
"""The ordering of _1 and this test, _2, is _slightly_ different.
|
||||
The hope is that topological sorting results in 2 being processed before 3
|
||||
and/or 3 before 2.
|
||||
"""
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(3)]
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
r'''
|
||||
0
|
||||
|\ S
|
||||
| \
|
||||
F| 1
|
||||
| /
|
||||
|/ A
|
||||
2
|
||||
'''
|
||||
g.add_edge(nodes[0], nodes[1], "success_nodes")
|
||||
g.add_edge(nodes[0], nodes[2], "failure_nodes")
|
||||
g.add_edge(nodes[1], nodes[2], "always_nodes")
|
||||
nodes[2].all_parents_must_converge = True
|
||||
nodes[0].job = Job(status='successful')
|
||||
return (g, nodes)
|
||||
|
||||
def test_all_converge_edge_case_2(self, workflow_all_converge_2):
|
||||
(g, nodes) = workflow_all_converge_2
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
assert 1 == len(dnr_nodes), "1 and only 1 node should be marked DNR"
|
||||
assert nodes[2] == dnr_nodes[0], "Node 3 should be marked DNR"
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 1 == len(nodes_to_run), "Node 2, and only node 2, should be chosen to run"
|
||||
assert nodes[1] == nodes_to_run[0], "Only node 2 should be chosen to run"
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_all_converge_will_run(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(4)]
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
r'''
|
||||
0 1 2
|
||||
S \ F | / S
|
||||
\ | /
|
||||
\ | /
|
||||
\|/
|
||||
|
|
||||
3
|
||||
'''
|
||||
g.add_edge(nodes[0], nodes[3], "success_nodes")
|
||||
g.add_edge(nodes[1], nodes[3], "failure_nodes")
|
||||
g.add_edge(nodes[2], nodes[3], "success_nodes")
|
||||
nodes[3].all_parents_must_converge = True
|
||||
|
||||
nodes[0].job = Job(status='successful')
|
||||
nodes[1].job = Job(status='failed')
|
||||
nodes[2].job = Job(status='running')
|
||||
return (g, nodes)
|
||||
|
||||
def test_workflow_all_converge_will_run(self, workflow_all_converge_will_run):
|
||||
(g, nodes) = workflow_all_converge_will_run
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
assert 0 == len(dnr_nodes), "No nodes should get marked DNR"
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 0 == len(nodes_to_run), "No nodes should run yet"
|
||||
|
||||
nodes[2].job.status = 'successful'
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 1 == len(nodes_to_run), "1 and only 1 node should want to run"
|
||||
assert nodes[3] == nodes_to_run[0], "Convergence node should be chosen to run"
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_all_converge_dnr(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(4)]
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
r'''
|
||||
0 1 2
|
||||
S \ F | / F
|
||||
\ | /
|
||||
\ | /
|
||||
\|/
|
||||
|
|
||||
3
|
||||
'''
|
||||
g.add_edge(nodes[0], nodes[3], "success_nodes")
|
||||
g.add_edge(nodes[1], nodes[3], "failure_nodes")
|
||||
g.add_edge(nodes[2], nodes[3], "failure_nodes")
|
||||
nodes[3].all_parents_must_converge = True
|
||||
|
||||
nodes[0].job = Job(status='successful')
|
||||
nodes[1].job = Job(status='running')
|
||||
nodes[2].job = Job(status='failed')
|
||||
return (g, nodes)
|
||||
|
||||
def test_workflow_all_converge_while_parent_runs(self, workflow_all_converge_dnr):
|
||||
(g, nodes) = workflow_all_converge_dnr
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
assert 0 == len(dnr_nodes), "No nodes should get marked DNR"
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 0 == len(nodes_to_run), "No nodes should run yet"
|
||||
|
||||
def test_workflow_all_converge_with_incorrect_parent(self, workflow_all_converge_dnr):
|
||||
# Another tick of the scheduler
|
||||
(g, nodes) = workflow_all_converge_dnr
|
||||
nodes[1].job.status = 'successful'
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
assert 1 == len(dnr_nodes), "1 and only 1 node should be marked DNR"
|
||||
assert nodes[3] == dnr_nodes[0], "Convergence node should be marked DNR"
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 0 == len(nodes_to_run), "Convergence node should NOT be chosen to run because it is DNR"
|
||||
|
||||
def test_workflow_all_converge_runs(self, workflow_all_converge_dnr):
|
||||
# Trick the scheduler again to make sure the convergence node acutally runs
|
||||
(g, nodes) = workflow_all_converge_dnr
|
||||
nodes[1].job.status = 'failed'
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
assert 0 == len(dnr_nodes), "No nodes should be marked DNR"
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 1 == len(nodes_to_run), "Convergence node should be chosen to run"
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_all_converge_deep_dnr_tree(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(7)]
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
r'''
|
||||
0 1 2
|
||||
\ | /
|
||||
S \ S| / F
|
||||
\ | /
|
||||
\|/
|
||||
|
|
||||
3
|
||||
/\
|
||||
S / \ S
|
||||
/ \
|
||||
4| | 5
|
||||
\ /
|
||||
S \ / S
|
||||
\/
|
||||
6
|
||||
'''
|
||||
g.add_edge(nodes[0], nodes[3], "success_nodes")
|
||||
g.add_edge(nodes[1], nodes[3], "success_nodes")
|
||||
g.add_edge(nodes[2], nodes[3], "failure_nodes")
|
||||
g.add_edge(nodes[3], nodes[4], "success_nodes")
|
||||
g.add_edge(nodes[3], nodes[5], "success_nodes")
|
||||
g.add_edge(nodes[4], nodes[6], "success_nodes")
|
||||
g.add_edge(nodes[5], nodes[6], "success_nodes")
|
||||
nodes[3].all_parents_must_converge = True
|
||||
nodes[4].all_parents_must_converge = True
|
||||
nodes[5].all_parents_must_converge = True
|
||||
nodes[6].all_parents_must_converge = True
|
||||
|
||||
nodes[0].job = Job(status='successful')
|
||||
nodes[1].job = Job(status='successful')
|
||||
nodes[2].job = Job(status='successful')
|
||||
return (g, nodes)
|
||||
|
||||
def test_workflow_all_converge_deep_dnr_tree(self, workflow_all_converge_deep_dnr_tree):
|
||||
(g, nodes) = workflow_all_converge_deep_dnr_tree
|
||||
dnr_nodes = g.mark_dnr_nodes()
|
||||
|
||||
assert 4 == len(dnr_nodes), "All nodes w/ no jobs should be marked DNR"
|
||||
assert nodes[3] in dnr_nodes
|
||||
assert nodes[4] in dnr_nodes
|
||||
assert nodes[5] in dnr_nodes
|
||||
assert nodes[6] in dnr_nodes
|
||||
|
||||
nodes_to_run = g.bfs_nodes_to_run()
|
||||
assert 0 == len(nodes_to_run), "All non-run nodes should be DNR and NOT candidates to run"
|
||||
|
||||
|
||||
class TestIsWorkflowDone():
|
||||
@pytest.fixture
|
||||
|
||||
@ -153,7 +153,10 @@ function TemplatesStrings (BaseString) {
|
||||
TIMED_OUT: t.s('APPROVAL TIMED OUT'),
|
||||
TIMEOUT: t.s('Timeout'),
|
||||
APPROVED: t.s('APPROVED'),
|
||||
DENIED: t.s('DENIED')
|
||||
DENIED: t.s('DENIED'),
|
||||
CONVERGENCE: t.s('Convergence'),
|
||||
ALL: t.s('All'),
|
||||
ANY: t.s('Any'),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@ -115,11 +115,20 @@
|
||||
fill: @default-icon;
|
||||
}
|
||||
|
||||
.WorkflowChart-convergenceTypeRectangle {
|
||||
fill: @default-icon;
|
||||
}
|
||||
|
||||
.WorkflowChart-nodeTypeLetter {
|
||||
fill: @default-bg;
|
||||
font-size: 10px;
|
||||
}
|
||||
|
||||
.WorkflowChart-convergenceTypeLetter {
|
||||
fill: @default-bg;
|
||||
font-size: 10px;
|
||||
}
|
||||
|
||||
.WorkflowChart-nodeStatus--running {
|
||||
fill: @default-icon;
|
||||
}
|
||||
|
||||
@ -34,6 +34,7 @@ export default ['moment', '$timeout', '$window', '$filter', 'TemplatesStrings',
|
||||
nodeH = 60,
|
||||
rootW = startNodeTextWidth + 25,
|
||||
rootH = 40,
|
||||
strokeW = 2, // px
|
||||
startNodeOffsetY = scope.mode === 'details' ? 17 : 10,
|
||||
maxNodeTextLength = 27,
|
||||
windowHeight,
|
||||
@ -118,6 +119,14 @@ export default ['moment', '$timeout', '$window', '$filter', 'TemplatesStrings',
|
||||
};
|
||||
|
||||
const rounded_rect = (x, y, w, h, r, tl, tr, bl, br) => {
|
||||
// x, y - position coordinates
|
||||
// w - width
|
||||
// h - height
|
||||
// r - border radius
|
||||
// round the top-left corner (bool)
|
||||
// round the top-right corner (bool)
|
||||
// round the bottom-left corner (bool)
|
||||
// round the bottom-right corner (bool)
|
||||
let retval;
|
||||
retval = "M" + (x + r) + "," + y;
|
||||
retval += "h" + (w - 2*r);
|
||||
@ -855,6 +864,9 @@ export default ['moment', '$timeout', '$window', '$filter', 'TemplatesStrings',
|
||||
.attr("fill", (d) => { return scope.graphState.addLinkSource === d.id ? "#337AB7" : "#D7D7D7"; })
|
||||
.style("display", (d) => { return scope.graphState.isLinkMode && !d.isInvalidLinkTarget ? null : "none"; });
|
||||
|
||||
baseSvg.selectAll(".WorkflowChart-convergenceTypeRectangle")
|
||||
.style("display", (d) => d.all_parents_must_converge ? null : "none");
|
||||
|
||||
// Add new nodes
|
||||
const nodeEnter = nodes
|
||||
.enter()
|
||||
@ -924,7 +936,7 @@ export default ['moment', '$timeout', '$window', '$filter', 'TemplatesStrings',
|
||||
return "#D7D7D7";
|
||||
}
|
||||
})
|
||||
.attr('stroke-width', "2px")
|
||||
.attr('stroke-width', `${strokeW}px`)
|
||||
.attr("class", (d) => {
|
||||
let classString = d.id === scope.graphState.nodeBeingAdded ? "WorkflowChart-rect WorkflowChart-isNodeBeingAdded" : "WorkflowChart-rect";
|
||||
classString += !_.get(d, 'unifiedJobTemplate.name') ? " WorkflowChart-dashedNode" : "";
|
||||
@ -980,6 +992,34 @@ export default ['moment', '$timeout', '$window', '$filter', 'TemplatesStrings',
|
||||
.html(`<span>${TemplatesStrings.get('workflow_maker.APPROVED')}</span>`)
|
||||
.style("display", (d) => { return d.job && d.job.type === "workflow_approval" && d.job.status === "successful" && !d.job.timed_out ? null : "none"; });
|
||||
|
||||
// Build the 'ALL' symbol for all-convergence nodes
|
||||
const convergenceTypeHeight = nodeH / 5;
|
||||
const convergenceTypeWidth = nodeW / 5;
|
||||
const convergenceTypeXCoord = nodeW / 2 - convergenceTypeWidth / 2;
|
||||
const convergenceTypeYCoord = -convergenceTypeHeight + (strokeW / 2);
|
||||
const convergenceTypeBorderRadius = 3;
|
||||
|
||||
const convergenceRectangle = rounded_rect(
|
||||
convergenceTypeXCoord,
|
||||
convergenceTypeYCoord,
|
||||
convergenceTypeWidth,
|
||||
convergenceTypeHeight,
|
||||
convergenceTypeBorderRadius,
|
||||
true, // round top-left
|
||||
true, // round top-right
|
||||
false, // round bottom-left
|
||||
false // round bottom-right
|
||||
);
|
||||
thisNode.append("path")
|
||||
.attr("d", convergenceRectangle)
|
||||
.attr("class", "WorkflowChart-convergenceTypeRectangle")
|
||||
.style("display", (d) => d.all_parents_must_converge ? null : "none");
|
||||
thisNode.append("text")
|
||||
.attr("y", ((convergenceTypeYCoord + convergenceTypeHeight) / 2) - Math.min(strokeW, 2))
|
||||
.attr("x", convergenceTypeXCoord + (convergenceTypeWidth / 4))
|
||||
.attr("class", "WorkflowChart-convergenceTypeLetter")
|
||||
.text("ALL");
|
||||
|
||||
thisNode.append("circle")
|
||||
.attr("cy", nodeH)
|
||||
.attr("r", 10)
|
||||
|
||||
@ -28,7 +28,8 @@ export default [function(){
|
||||
|
||||
const nodeObj = {
|
||||
index: nodeIdCounter-1,
|
||||
id: nodeIdCounter
|
||||
id: nodeIdCounter,
|
||||
all_parents_must_converge: node.all_parents_must_converge,
|
||||
};
|
||||
|
||||
if(node.summary_fields.job) {
|
||||
|
||||
@ -106,6 +106,10 @@ export default ['$scope', 'TemplatesService', 'JobTemplateModel', 'PromptService
|
||||
element: '#workflow_node_edge',
|
||||
multiple: false
|
||||
});
|
||||
CreateSelect2({
|
||||
element: '#workflow_node_convergence',
|
||||
multiple: false
|
||||
});
|
||||
};
|
||||
|
||||
const formatPopOverDetails = (model) => {
|
||||
@ -500,6 +504,22 @@ export default ['$scope', 'TemplatesService', 'JobTemplateModel', 'PromptService
|
||||
type: 'workflow_job_template,job_template'
|
||||
};
|
||||
|
||||
const all_parents_must_converge = _.get(
|
||||
$scope, ['nodeConfig', 'node', 'all_parents_must_converge'],
|
||||
_.get($scope, ['nodeConfig', 'node', 'originalNodeObject', 'all_parents_must_converge'], false)
|
||||
);
|
||||
$scope.convergenceOptions = [
|
||||
{
|
||||
label: $scope.strings.get('workflow_maker.ALL'),
|
||||
value: true,
|
||||
},
|
||||
{
|
||||
label: $scope.strings.get('workflow_maker.ANY'),
|
||||
value: false,
|
||||
},
|
||||
];
|
||||
$scope.convergenceChoice = $scope.convergenceOptions.find(({ value }) => value === all_parents_must_converge);
|
||||
|
||||
$scope.wf_maker_templates = [];
|
||||
$scope.wf_maker_template_dataset = {};
|
||||
|
||||
@ -617,7 +637,8 @@ export default ['$scope', 'TemplatesService', 'JobTemplateModel', 'PromptService
|
||||
|
||||
$scope.confirmNodeForm = () => {
|
||||
const nodeFormData = {
|
||||
edgeType: $scope.edgeType
|
||||
edgeType: $scope.edgeType,
|
||||
all_parents_must_converge: $scope.convergenceChoice.value,
|
||||
};
|
||||
|
||||
if ($scope.activeTab === "approval") {
|
||||
|
||||
@ -183,6 +183,24 @@
|
||||
</select>
|
||||
</div>
|
||||
</div>
|
||||
<div id="workflow_node_checkbox_group" class="form-group Form-formGroup Form-formGroup--singleColumn" >
|
||||
<label for="edgeType" class="Form-inputLabelContainer">
|
||||
<span class="Form-requiredAsterisk">*</span>
|
||||
<span class="Form-inputLabel">{{:: strings.get('workflow_maker.CONVERGENCE') }}</span>
|
||||
</label>
|
||||
<div>
|
||||
<select
|
||||
id="workflow_node_convergence"
|
||||
ng-options="v as v.label for v in convergenceOptions track by v.value"
|
||||
ng-model="convergenceChoice"
|
||||
class="form-control Form-dropDown"
|
||||
name="convergenceChoice"
|
||||
tabindex="-1"
|
||||
ng-disabled="readOnly"
|
||||
aria-hidden="true">
|
||||
</select>
|
||||
</div>
|
||||
</div>
|
||||
<div ng-show="readOnly">
|
||||
<div
|
||||
class="WorkflowMaker-readOnlyPromptText"
|
||||
|
||||
@ -92,7 +92,8 @@ export default ['$scope', 'TemplatesService',
|
||||
limit: null,
|
||||
diff_mode: null,
|
||||
verbosity: null,
|
||||
credential: null
|
||||
credential: null,
|
||||
all_parents_must_converge: _.get(node, 'all_parents_must_converge', false)
|
||||
};
|
||||
|
||||
if (_.has(node, 'fullUnifiedJobTemplateObject')) {
|
||||
@ -637,9 +638,11 @@ export default ['$scope', 'TemplatesService',
|
||||
});
|
||||
}
|
||||
}
|
||||
nodeRef[$scope.nodeConfig.nodeId].all_parents_must_converge = nodeFormData.all_parents_must_converge;
|
||||
|
||||
$scope.graphState.arrayOfNodesForChart.map( (node) => {
|
||||
if (node.id === nodeId) {
|
||||
node.all_parents_must_converge = nodeFormData.all_parents_must_converge;
|
||||
if (isPauseNode) {
|
||||
node.unifiedJobTemplate = {
|
||||
unified_job_type: 'workflow_approval',
|
||||
@ -650,7 +653,6 @@ export default ['$scope', 'TemplatesService',
|
||||
} else {
|
||||
node.unifiedJobTemplate = selectedTemplate;
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@ -30,7 +30,8 @@ class WorkflowJobTemplateNode(HasCreate, base.Base):
|
||||
'job_type',
|
||||
'skip_tags',
|
||||
'verbosity',
|
||||
'extra_data')
|
||||
'extra_data',
|
||||
'all_parents_must_converge')
|
||||
|
||||
update_payload(payload, optional_fields, kwargs)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user