diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index a8d96b4be6..2e4903b7a0 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -127,6 +127,9 @@ class SimpleDAG(object): path = set([]) stack = node_objs + if len(self.nodes) != 0 and len(node_objs) == 0: + return True + while stack: node_obj = stack.pop() diff --git a/awx/main/tests/functional/models/test_workflow.py b/awx/main/tests/functional/models/test_workflow.py index f406085aca..63253907a3 100644 --- a/awx/main/tests/functional/models/test_workflow.py +++ b/awx/main/tests/functional/models/test_workflow.py @@ -3,11 +3,17 @@ import pytest # AWX -from awx.main.models.workflow import WorkflowJob, WorkflowJobNode, WorkflowJobTemplateNode, WorkflowJobTemplate +from awx.main.models.workflow import ( + WorkflowJob, + WorkflowJobNode, + WorkflowJobTemplateNode, + WorkflowJobTemplate, +) from awx.main.models.jobs import JobTemplate, Job from awx.main.models.projects import ProjectUpdate from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.api.versioning import reverse +from awx.api.views import WorkflowJobTemplateNodeSuccessNodesList # Django from django.test import TransactionTestCase @@ -237,16 +243,12 @@ class TestWorkflowJobTemplate: assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[1] def test_topology_validator(self, wfjt): - from awx.api.views import WorkflowJobTemplateNodeChildrenBaseList - test_view = WorkflowJobTemplateNodeChildrenBaseList() + test_view = WorkflowJobTemplateNodeSuccessNodesList() nodes = wfjt.workflow_job_template_nodes.all() - node_assoc = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt) - nodes[2].always_nodes.add(node_assoc) # test cycle validation - assert test_view.is_valid_relation(node_assoc, nodes[0]) == {'Error': 'Cycle detected.'} - # test mutex validation - test_view.relationship = 'failure_nodes' - + print(nodes[0].success_nodes.get(id=nodes[1].id).failure_nodes.get(id=nodes[2].id)) + assert test_view.is_valid_relation(nodes[2], nodes[0]) == {'Error': 'Cycle detected.'} + def test_always_success_failure_creation(self, wfjt, admin, get): wfjt_node = wfjt.workflow_job_template_nodes.all()[1] node = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt)