support workflow convergence nodes

* remove convergence restriction in API
* change task manager logic to be aware of and support convergence nodes
This commit is contained in:
chris meyers 2018-09-27 15:47:51 -04:00 committed by mabashian
parent c53ccc8d4a
commit f5c10f99b0
7 changed files with 123 additions and 9 deletions

View File

@ -2969,8 +2969,6 @@ class WorkflowJobTemplateNodeChildrenBaseList(WorkflowsEnforcementMixin, Enforce
if not find:
sub_node = graph[sub.pk]
parent_node = graph[parent.pk]
if sub_node['metadata']['parent'] is not None:
return {"Error": _("Multiple parent relationship not allowed.")}
sub_node['metadata']['parent'] = parent_node
iter_node = sub_node
while iter_node is not None:

View File

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-09-28 14:23
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0049_v330_validate_instance_capacity_adjustment'),
]
operations = [
migrations.AddField(
model_name='workflowjobnode',
name='do_not_run',
field=models.BooleanField(default=False),
),
]

View File

@ -184,6 +184,9 @@ class WorkflowJobNode(WorkflowNodeBase):
default={},
editable=False,
)
do_not_run = models.BooleanField(
default=False
)
def get_absolute_url(self, request=None):
return reverse('api:workflow_job_node_detail', kwargs={'pk': self.pk}, request=request)

View File

@ -51,7 +51,7 @@ class SimpleDAG(object):
for n in self.nodes:
doc += "%s [color = %s]\n" % (
short_string_obj(n['node_object']),
"red" if n['node_object'].status == 'running' else "black",
"red" if getattr(n['node_object'], 'status', 'N/A') == 'running' else "black",
)
for from_node, to_node, label in self.edges:
doc += "%s -> %s [ label=\"%s\" ];\n" % (

View File

@ -1,4 +1,7 @@
# Python
import copy
# AWX
from awx.main.scheduler.dag_simple import SimpleDAG
@ -30,19 +33,19 @@ class WorkflowDAG(SimpleDAG):
obj = n['node_object']
job = obj.job
if not job:
if not job and obj.do_not_run is False:
nodes_found.append(n)
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'successful']:
elif job and job.status not in ['failed', 'successful']:
continue
elif job.status == 'failed':
elif job and job.status == 'failed':
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status == 'successful':
elif job and job.status == 'successful':
children_success = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
@ -100,3 +103,40 @@ class WorkflowDAG(SimpleDAG):
# have the job result.
return False, False
return True, is_failed
def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes = copy.copy(root_nodes)
nodes_marked_do_not_run = []
for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job
if not job and obj.do_not_run is False and n not in root_nodes:
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
all_parents_dnr = True
for p in parent_nodes:
if not p.job and p.do_not_run is False:
all_parents_dnr = False
break
#all_parents_dnr = reduce(lambda p: bool(p.do_not_run == True), parent_nodes)
if all_parents_dnr:
obj.do_not_run = True
nodes_marked_do_not_run.append(n)
if obj.do_not_run:
children_success = self.get_dependencies(obj, 'success_nodes')
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job and job.status == 'failed':
children_failed = self.get_dependencies(obj, 'success_nodes')
children_all = children_failed
nodes.extend(children_all)
elif job and job.status == 'successful':
children_success = self.get_dependencies(obj, 'failure_nodes')
children_all = children_success
nodes.extend(children_all)
return [n['node_object'] for n in nodes_marked_do_not_run]

View File

@ -174,6 +174,8 @@ class TaskManager():
else:
is_done, has_failed = dag.is_workflow_done()
if not is_done:
workflow_nodes = dag.mark_dnr_nodes()
map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes)
continue
logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful')
result.append(workflow_job.id)

View File

@ -100,6 +100,59 @@ class TestWorkflowDAGFunctional(TransactionTestCase):
self.assertFalse(has_failed)
@pytest.mark.django_db
class TestWorkflowDNR():
'success', 'new'
@pytest.fixture
def workflow_job_fn(self):
def fn(states=['new', 'new', 'new', 'new', 'new', 'new']):
"""
Workflow topology:
node[0]
/\
s/ \f
/ \
node[1] node[3]
/ \
s/ \f
/ \
node[2] node[4]
\ /
\ /
\ /
s f
\ /
\ /
node[5]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
nodes[0].success_nodes.add(nodes[1])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
nodes[2].success_nodes.add(nodes[5])
nodes[4].failure_nodes.add(nodes[5])
return wfj, nodes
return fn
def test_workflow_dnr_because_parent(self, workflow_job_fn):
wfj, nodes = workflow_job_fn(states=['successful', None, None, None, None, None,])
dag = WorkflowDAG(workflow_job=wfj)
workflow_nodes = dag.mark_dnr_nodes()
assert 2 == len(workflow_nodes)
assert nodes[3] in workflow_nodes
assert nodes[4] in workflow_nodes
@pytest.mark.django_db
class TestWorkflowJob:
@pytest.fixture
@ -193,8 +246,6 @@ class TestWorkflowJobTemplate:
nodes[2].always_nodes.add(node_assoc)
# test cycle validation
assert test_view.is_valid_relation(node_assoc, nodes[0]) == {'Error': 'Cycle detected.'}
# test multi-ancestor validation
assert test_view.is_valid_relation(node_assoc, nodes[1]) == {'Error': 'Multiple parent relationship not allowed.'}
# test mutex validation
test_view.relationship = 'failure_nodes'