diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 87140b02b1..fc0992c958 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -2969,8 +2969,6 @@ class WorkflowJobTemplateNodeChildrenBaseList(WorkflowsEnforcementMixin, Enforce if not find: sub_node = graph[sub.pk] parent_node = graph[parent.pk] - if sub_node['metadata']['parent'] is not None: - return {"Error": _("Multiple parent relationship not allowed.")} sub_node['metadata']['parent'] = parent_node iter_node = sub_node while iter_node is not None: diff --git a/awx/main/migrations/0050_v331_workflow_convergence.py b/awx/main/migrations/0050_v331_workflow_convergence.py new file mode 100644 index 0000000000..2e6edd42d7 --- /dev/null +++ b/awx/main/migrations/0050_v331_workflow_convergence.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-09-28 14:23 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0049_v330_validate_instance_capacity_adjustment'), + ] + + operations = [ + migrations.AddField( + model_name='workflowjobnode', + name='do_not_run', + field=models.BooleanField(default=False), + ), + ] diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 2be55d2992..946812350b 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -184,6 +184,9 @@ class WorkflowJobNode(WorkflowNodeBase): default={}, editable=False, ) + do_not_run = models.BooleanField( + default=False + ) def get_absolute_url(self, request=None): return reverse('api:workflow_job_node_detail', kwargs={'pk': self.pk}, request=request) diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index c7bde94101..0a078f9821 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -51,7 +51,7 @@ class SimpleDAG(object): for n in self.nodes: doc += "%s [color = %s]\n" % ( short_string_obj(n['node_object']), - "red" if n['node_object'].status == 'running' else "black", + "red" if getattr(n['node_object'], 'status', 'N/A') == 'running' else "black", ) for from_node, to_node, label in self.edges: doc += "%s -> %s [ label=\"%s\" ];\n" % ( diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 924e459929..b49cf185e9 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -1,4 +1,7 @@ +# Python +import copy + # AWX from awx.main.scheduler.dag_simple import SimpleDAG @@ -30,19 +33,19 @@ class WorkflowDAG(SimpleDAG): obj = n['node_object'] job = obj.job - if not job: + if not job and obj.do_not_run is False: nodes_found.append(n) # Job is about to run or is running. Hold our horses and wait for # the job to finish. We can't proceed down the graph path until we # have the job result. - elif job.status not in ['failed', 'successful']: + elif job and job.status not in ['failed', 'successful']: continue - elif job.status == 'failed': + elif job and job.status == 'failed': children_failed = self.get_dependencies(obj, 'failure_nodes') children_always = self.get_dependencies(obj, 'always_nodes') children_all = children_failed + children_always nodes.extend(children_all) - elif job.status == 'successful': + elif job and job.status == 'successful': children_success = self.get_dependencies(obj, 'success_nodes') children_always = self.get_dependencies(obj, 'always_nodes') children_all = children_success + children_always @@ -100,3 +103,40 @@ class WorkflowDAG(SimpleDAG): # have the job result. return False, False return True, is_failed + + def mark_dnr_nodes(self): + root_nodes = self.get_root_nodes() + nodes = copy.copy(root_nodes) + nodes_marked_do_not_run = [] + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job and obj.do_not_run is False and n not in root_nodes: + parent_nodes = [p['node_object'] for p in self.get_dependents(obj)] + all_parents_dnr = True + for p in parent_nodes: + if not p.job and p.do_not_run is False: + all_parents_dnr = False + break + #all_parents_dnr = reduce(lambda p: bool(p.do_not_run == True), parent_nodes) + if all_parents_dnr: + obj.do_not_run = True + nodes_marked_do_not_run.append(n) + + if obj.do_not_run: + children_success = self.get_dependencies(obj, 'success_nodes') + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job and job.status == 'failed': + children_failed = self.get_dependencies(obj, 'success_nodes') + children_all = children_failed + nodes.extend(children_all) + elif job and job.status == 'successful': + children_success = self.get_dependencies(obj, 'failure_nodes') + children_all = children_success + nodes.extend(children_all) + return [n['node_object'] for n in nodes_marked_do_not_run] diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 2854b3ab34..16aa99ee08 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -174,6 +174,8 @@ class TaskManager(): else: is_done, has_failed = dag.is_workflow_done() if not is_done: + workflow_nodes = dag.mark_dnr_nodes() + map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes) continue logger.info('Marking %s as %s.', workflow_job.log_format, 'failed' if has_failed else 'successful') result.append(workflow_job.id) diff --git a/awx/main/tests/functional/models/test_workflow.py b/awx/main/tests/functional/models/test_workflow.py index d7d03a53bb..cc44a9b621 100644 --- a/awx/main/tests/functional/models/test_workflow.py +++ b/awx/main/tests/functional/models/test_workflow.py @@ -100,6 +100,59 @@ class TestWorkflowDAGFunctional(TransactionTestCase): self.assertFalse(has_failed) +@pytest.mark.django_db +class TestWorkflowDNR(): + 'success', 'new' + + @pytest.fixture + def workflow_job_fn(self): + def fn(states=['new', 'new', 'new', 'new', 'new', 'new']): + """ + Workflow topology: + node[0] + /\ + s/ \f + / \ + node[1] node[3] + / \ + s/ \f + / \ + node[2] node[4] + \ / + \ / + \ / + s f + \ / + \ / + node[5] + """ + wfj = WorkflowJob.objects.create() + jt = JobTemplate.objects.create(name='test-jt') + nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)] + for node, state in zip(nodes, states): + if state: + node.job = jt.create_job() + node.job.status = state + node.job.save() + node.save() + nodes[0].success_nodes.add(nodes[1]) + nodes[1].success_nodes.add(nodes[2]) + nodes[0].failure_nodes.add(nodes[3]) + nodes[3].failure_nodes.add(nodes[4]) + nodes[2].success_nodes.add(nodes[5]) + nodes[4].failure_nodes.add(nodes[5]) + return wfj, nodes + return fn + + def test_workflow_dnr_because_parent(self, workflow_job_fn): + wfj, nodes = workflow_job_fn(states=['successful', None, None, None, None, None,]) + dag = WorkflowDAG(workflow_job=wfj) + workflow_nodes = dag.mark_dnr_nodes() + assert 2 == len(workflow_nodes) + assert nodes[3] in workflow_nodes + assert nodes[4] in workflow_nodes + + @pytest.mark.django_db class TestWorkflowJob: @pytest.fixture @@ -193,8 +246,6 @@ class TestWorkflowJobTemplate: nodes[2].always_nodes.add(node_assoc) # test cycle validation assert test_view.is_valid_relation(node_assoc, nodes[0]) == {'Error': 'Cycle detected.'} - # test multi-ancestor validation - assert test_view.is_valid_relation(node_assoc, nodes[1]) == {'Error': 'Multiple parent relationship not allowed.'} # test mutex validation test_view.relationship = 'failure_nodes'