mirror of
https://github.com/ansible/awx.git
synced 2026-01-22 15:08:03 -03:30
handle edge case ring cycle
This commit is contained in:
parent
2f9dc4d075
commit
6e40e9c856
@ -127,6 +127,9 @@ class SimpleDAG(object):
|
||||
path = set([])
|
||||
stack = node_objs
|
||||
|
||||
if len(self.nodes) != 0 and len(node_objs) == 0:
|
||||
return True
|
||||
|
||||
while stack:
|
||||
node_obj = stack.pop()
|
||||
|
||||
|
||||
@ -3,11 +3,17 @@
|
||||
import pytest
|
||||
|
||||
# AWX
|
||||
from awx.main.models.workflow import WorkflowJob, WorkflowJobNode, WorkflowJobTemplateNode, WorkflowJobTemplate
|
||||
from awx.main.models.workflow import (
|
||||
WorkflowJob,
|
||||
WorkflowJobNode,
|
||||
WorkflowJobTemplateNode,
|
||||
WorkflowJobTemplate,
|
||||
)
|
||||
from awx.main.models.jobs import JobTemplate, Job
|
||||
from awx.main.models.projects import ProjectUpdate
|
||||
from awx.main.scheduler.dag_workflow import WorkflowDAG
|
||||
from awx.api.versioning import reverse
|
||||
from awx.api.views import WorkflowJobTemplateNodeSuccessNodesList
|
||||
|
||||
# Django
|
||||
from django.test import TransactionTestCase
|
||||
@ -237,16 +243,12 @@ class TestWorkflowJobTemplate:
|
||||
assert parent_qs[0] == wfjt.workflow_job_template_nodes.all()[1]
|
||||
|
||||
def test_topology_validator(self, wfjt):
|
||||
from awx.api.views import WorkflowJobTemplateNodeChildrenBaseList
|
||||
test_view = WorkflowJobTemplateNodeChildrenBaseList()
|
||||
test_view = WorkflowJobTemplateNodeSuccessNodesList()
|
||||
nodes = wfjt.workflow_job_template_nodes.all()
|
||||
node_assoc = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt)
|
||||
nodes[2].always_nodes.add(node_assoc)
|
||||
# test cycle validation
|
||||
assert test_view.is_valid_relation(node_assoc, nodes[0]) == {'Error': 'Cycle detected.'}
|
||||
# test mutex validation
|
||||
test_view.relationship = 'failure_nodes'
|
||||
|
||||
print(nodes[0].success_nodes.get(id=nodes[1].id).failure_nodes.get(id=nodes[2].id))
|
||||
assert test_view.is_valid_relation(nodes[2], nodes[0]) == {'Error': 'Cycle detected.'}
|
||||
|
||||
def test_always_success_failure_creation(self, wfjt, admin, get):
|
||||
wfjt_node = wfjt.workflow_job_template_nodes.all()[1]
|
||||
node = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user