mirror of
https://github.com/ansible/awx.git
synced 2026-05-16 22:07:36 -02:30
Merge branch 'devel' of https://github.com/ansible/ansible-tower into can_CRUD
This commit is contained in:
@@ -7,6 +7,7 @@ from awx.main.tests.factories import (
|
||||
create_job_template,
|
||||
create_notification_template,
|
||||
create_survey_spec,
|
||||
create_workflow_job_template,
|
||||
)
|
||||
|
||||
@pytest.fixture
|
||||
@@ -40,6 +41,10 @@ def job_template_with_survey_passwords_factory(job_template_factory):
|
||||
def job_with_secret_key_unit(job_with_secret_key_factory):
|
||||
return job_with_secret_key_factory(persisted=False)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_job_template_factory():
|
||||
return create_workflow_job_template
|
||||
|
||||
@pytest.fixture
|
||||
def get_ssh_version(mocker):
|
||||
return mocker.patch('awx.main.tasks.get_ssh_version', return_value='OpenSSH_6.9p1, LibreSSL 2.1.8')
|
||||
|
||||
@@ -3,6 +3,7 @@ from .tower import (
|
||||
create_job_template,
|
||||
create_notification_template,
|
||||
create_survey_spec,
|
||||
create_workflow_job_template,
|
||||
)
|
||||
|
||||
from .exc import (
|
||||
@@ -14,5 +15,6 @@ __all__ = [
|
||||
'create_job_template',
|
||||
'create_notification_template',
|
||||
'create_survey_spec',
|
||||
'create_workflow_job_template',
|
||||
'NotUnique',
|
||||
]
|
||||
|
||||
@@ -13,6 +13,10 @@ from awx.main.models import (
|
||||
Credential,
|
||||
Inventory,
|
||||
Label,
|
||||
WorkflowJobTemplate,
|
||||
WorkflowJob,
|
||||
WorkflowJobNode,
|
||||
WorkflowJobTemplateNode,
|
||||
)
|
||||
|
||||
# mk methods should create only a single object of a single type.
|
||||
@@ -152,3 +156,60 @@ def mk_job_template(name, job_type='run',
|
||||
if persisted:
|
||||
jt.save()
|
||||
return jt
|
||||
|
||||
def mk_workflow_job(status='new', workflow_job_template=None, extra_vars={},
|
||||
persisted=True):
|
||||
job = WorkflowJob(status=status, extra_vars=json.dumps(extra_vars))
|
||||
|
||||
job.workflow_job_template = workflow_job_template
|
||||
|
||||
if persisted:
|
||||
job.save()
|
||||
return job
|
||||
|
||||
def mk_workflow_job_template(name, extra_vars='', spec=None, persisted=True):
|
||||
if extra_vars:
|
||||
extra_vars = json.dumps(extra_vars)
|
||||
|
||||
wfjt = WorkflowJobTemplate(name=name, extra_vars=extra_vars)
|
||||
|
||||
wfjt.survey_spec = spec
|
||||
if wfjt.survey_spec is not None:
|
||||
wfjt.survey_enabled = True
|
||||
|
||||
if persisted:
|
||||
wfjt.save()
|
||||
return wfjt
|
||||
|
||||
def mk_workflow_job_template_node(workflow_job_template=None,
|
||||
unified_job_template=None,
|
||||
success_nodes=None,
|
||||
failure_nodes=None,
|
||||
always_nodes=None,
|
||||
persisted=True):
|
||||
workflow_node = WorkflowJobTemplateNode(workflow_job_template=workflow_job_template,
|
||||
unified_job_template=unified_job_template,
|
||||
success_nodes=success_nodes,
|
||||
failure_nodes=failure_nodes,
|
||||
always_nodes=always_nodes)
|
||||
if persisted:
|
||||
workflow_node.save()
|
||||
return workflow_node
|
||||
|
||||
def mk_workflow_job_node(unified_job_template=None,
|
||||
success_nodes=None,
|
||||
failure_nodes=None,
|
||||
always_nodes=None,
|
||||
workflow_job=None,
|
||||
job=None,
|
||||
persisted=True):
|
||||
workflow_node = WorkflowJobNode(unified_job_template=unified_job_template,
|
||||
success_nodes=success_nodes,
|
||||
failure_nodes=failure_nodes,
|
||||
always_nodes=always_nodes,
|
||||
workflow_job=workflow_job,
|
||||
job=job)
|
||||
if persisted:
|
||||
workflow_node.save()
|
||||
return workflow_node
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from awx.main.models import (
|
||||
Inventory,
|
||||
Job,
|
||||
Label,
|
||||
WorkflowJobTemplateNode,
|
||||
)
|
||||
|
||||
from .objects import (
|
||||
@@ -28,6 +29,7 @@ from .fixtures import (
|
||||
mk_project,
|
||||
mk_label,
|
||||
mk_notification_template,
|
||||
mk_workflow_job_template,
|
||||
)
|
||||
|
||||
|
||||
@@ -343,3 +345,66 @@ def create_notification_template(name, roles=None, persisted=True, **kwargs):
|
||||
users=_Mapped(users),
|
||||
superusers=_Mapped(superusers),
|
||||
teams=teams)
|
||||
|
||||
def generate_workflow_job_template_nodes(workflow_job_template,
|
||||
persisted,
|
||||
**kwargs):
|
||||
|
||||
workflow_job_template_nodes = kwargs.get('workflow_job_template_nodes', [])
|
||||
if len(workflow_job_template_nodes) > 0 and not persisted:
|
||||
raise RuntimeError('workflow job template nodes can not be used when persisted=False')
|
||||
|
||||
new_nodes = []
|
||||
|
||||
for i, node in enumerate(workflow_job_template_nodes):
|
||||
new_node = WorkflowJobTemplateNode(workflow_job_template=workflow_job_template,
|
||||
unified_job_template=node['unified_job_template'],
|
||||
id=i)
|
||||
new_nodes.append(new_node)
|
||||
|
||||
node_types = ['success_nodes', 'failure_nodes', 'always_nodes']
|
||||
for node_type in node_types:
|
||||
for i, new_node in enumerate(new_nodes):
|
||||
for related_index in workflow_job_template_nodes[i][node_type]:
|
||||
getattr(new_node, node_type).add(new_nodes[related_index])
|
||||
|
||||
# TODO: Implement survey and jobs
|
||||
def create_workflow_job_template(name, persisted=True, **kwargs):
|
||||
Objects = generate_objects(["workflow_job_template",
|
||||
"workflow_job_template_nodes",
|
||||
"survey",], kwargs)
|
||||
|
||||
spec = None
|
||||
#jobs = None
|
||||
|
||||
extra_vars = kwargs.get('extra_vars', '')
|
||||
|
||||
if 'survey' in kwargs:
|
||||
spec = create_survey_spec(kwargs['survey'])
|
||||
|
||||
wfjt = mk_workflow_job_template(name,
|
||||
spec=spec,
|
||||
extra_vars=extra_vars,
|
||||
persisted=persisted)
|
||||
|
||||
|
||||
|
||||
workflow_jt_nodes = generate_workflow_job_template_nodes(wfjt,
|
||||
persisted,
|
||||
workflow_job_template_nodes=kwargs.get('workflow_job_template_nodes', []))
|
||||
|
||||
'''
|
||||
if 'jobs' in kwargs:
|
||||
for i in kwargs['jobs']:
|
||||
if type(i) is Job:
|
||||
jobs[i.pk] = i
|
||||
else:
|
||||
# TODO: Create the job
|
||||
raise RuntimeError("Currently, only already created jobs are supported")
|
||||
'''
|
||||
return Objects(workflow_job_template=wfjt,
|
||||
#jobs=jobs,
|
||||
workflow_job_template_nodes=workflow_jt_nodes,
|
||||
survey=spec,)
|
||||
|
||||
|
||||
|
||||
@@ -319,18 +319,18 @@ def test_cant_change_organization(patch, credential, organization, org_admin):
|
||||
credential.organization = organization
|
||||
credential.save()
|
||||
|
||||
response = patch(reverse('api:credential_detail', args=(organization.id,)), {
|
||||
response = patch(reverse('api:credential_detail', args=(credential.id,)), {
|
||||
'name': 'Some new name',
|
||||
}, org_admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
response = patch(reverse('api:credential_detail', args=(organization.id,)), {
|
||||
response = patch(reverse('api:credential_detail', args=(credential.id,)), {
|
||||
'name': 'Some new name2',
|
||||
'organization': organization.id, # fine for it to be the same
|
||||
}, org_admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
response = patch(reverse('api:credential_detail', args=(organization.id,)), {
|
||||
response = patch(reverse('api:credential_detail', args=(credential.id,)), {
|
||||
'name': 'Some new name3',
|
||||
'organization': None
|
||||
}, org_admin)
|
||||
@@ -339,7 +339,7 @@ def test_cant_change_organization(patch, credential, organization, org_admin):
|
||||
@pytest.mark.django_db
|
||||
def test_cant_add_organization(patch, credential, organization, org_admin):
|
||||
assert credential.organization is None
|
||||
response = patch(reverse('api:credential_detail', args=(organization.id,)), {
|
||||
response = patch(reverse('api:credential_detail', args=(credential.id,)), {
|
||||
'name': 'Some new name',
|
||||
'organization': organization.id
|
||||
}, org_admin)
|
||||
|
||||
34
awx/main/tests/functional/models/test_workflow.py
Normal file
34
awx/main/tests/functional/models/test_workflow.py
Normal file
@@ -0,0 +1,34 @@
|
||||
|
||||
# Python
|
||||
import pytest
|
||||
|
||||
# AWX
|
||||
from awx.main.models.workflow import WorkflowJob, WorkflowJobTemplateNode
|
||||
|
||||
class TestWorkflowJob:
|
||||
@pytest.fixture
|
||||
def workflow_job(self, workflow_job_template_factory):
|
||||
wfjt = workflow_job_template_factory('blah').workflow_job_template
|
||||
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt)
|
||||
|
||||
nodes = [WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt) for i in range(0, 5)]
|
||||
|
||||
nodes[0].success_nodes.add(nodes[1])
|
||||
nodes[1].success_nodes.add(nodes[2])
|
||||
|
||||
nodes[0].failure_nodes.add(nodes[3])
|
||||
nodes[3].failure_nodes.add(nodes[4])
|
||||
|
||||
return wfj
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_inherit_job_template_workflow_nodes(self, mocker, workflow_job):
|
||||
workflow_job.inherit_job_template_workflow_nodes()
|
||||
|
||||
nodes = WorkflowJob.objects.get(id=workflow_job.id).workflow_job_nodes.all().order_by('created')
|
||||
assert nodes[0].success_nodes.filter(id=nodes[1].id).exists()
|
||||
assert nodes[1].success_nodes.filter(id=nodes[2].id).exists()
|
||||
assert nodes[0].failure_nodes.filter(id=nodes[3].id).exists()
|
||||
assert nodes[3].failure_nodes.filter(id=nodes[4].id).exists()
|
||||
|
||||
|
||||
40
awx/main/tests/manual/workflows/linear.py
Executable file
40
awx/main/tests/manual/workflows/linear.py
Executable file
@@ -0,0 +1,40 @@
|
||||
# AWX
|
||||
from awx.main.models import (
|
||||
WorkflowJobTemplateNode,
|
||||
WorkflowJobTemplate,
|
||||
)
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
|
||||
def do_init_workflow(job_template_success, job_template_fail, job_template_never):
|
||||
wfjt, created = WorkflowJobTemplate.objects.get_or_create(name="linear workflow")
|
||||
wfjt.delete()
|
||||
wfjt, created = WorkflowJobTemplate.objects.get_or_create(name="linear workflow")
|
||||
print(wfjt.id)
|
||||
WorkflowJobTemplateNode.objects.all().delete()
|
||||
if created:
|
||||
nodes_success = []
|
||||
nodes_fail = []
|
||||
nodes_never = []
|
||||
for i in range(0, 2):
|
||||
nodes_success.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_success))
|
||||
nodes_fail.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_fail))
|
||||
nodes_never.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_never))
|
||||
nodes_never.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_never))
|
||||
nodes_fail[1].delete()
|
||||
|
||||
nodes_success[0].success_nodes.add(nodes_fail[0])
|
||||
nodes_success[0].failure_nodes.add(nodes_never[0])
|
||||
|
||||
nodes_fail[0].failure_nodes.add(nodes_success[1])
|
||||
nodes_fail[0].success_nodes.add(nodes_never[1])
|
||||
|
||||
nodes_success[1].failure_nodes.add(nodes_never[2])
|
||||
|
||||
def do_init():
|
||||
jt_success = JobTemplate.objects.get(id=5)
|
||||
jt_fail= JobTemplate.objects.get(id=6)
|
||||
jt_never= JobTemplate.objects.get(id=7)
|
||||
do_init_workflow(jt_success, jt_fail, jt_never)
|
||||
|
||||
if __name__ == "__main__":
|
||||
do_init()
|
||||
1
awx/main/tests/manual/workflows/linear.svg
Normal file
1
awx/main/tests/manual/workflows/linear.svg
Normal file
File diff suppressed because one or more lines are too long
|
After Width: | Height: | Size: 8.2 KiB |
45
awx/main/tests/manual/workflows/parallel.py
Executable file
45
awx/main/tests/manual/workflows/parallel.py
Executable file
@@ -0,0 +1,45 @@
|
||||
# AWX
|
||||
from awx.main.models import (
|
||||
WorkflowJobTemplateNode,
|
||||
WorkflowJobTemplate,
|
||||
)
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
|
||||
def do_init_workflow(job_template_success, job_template_fail, job_template_never, jts_parallel):
|
||||
wfjt, created = WorkflowJobTemplate.objects.get_or_create(name="parallel workflow")
|
||||
wfjt.delete()
|
||||
wfjt, created = WorkflowJobTemplate.objects.get_or_create(name="parallel workflow")
|
||||
print(wfjt.id)
|
||||
WorkflowJobTemplateNode.objects.all().delete()
|
||||
if created:
|
||||
node_success = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_success)
|
||||
|
||||
nodes_never = []
|
||||
for x in range(0, 3):
|
||||
nodes_never.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_never))
|
||||
|
||||
nodes_parallel = []
|
||||
for jt in jts_parallel:
|
||||
nodes_parallel.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=jt))
|
||||
|
||||
node_success.success_nodes.add(nodes_parallel[0])
|
||||
node_success.success_nodes.add(nodes_parallel[1])
|
||||
node_success.success_nodes.add(nodes_parallel[2])
|
||||
|
||||
# Add a failure node for each paralell node
|
||||
for i, n in enumerate(nodes_parallel):
|
||||
n.failure_nodes.add(nodes_never[i])
|
||||
|
||||
def do_init():
|
||||
jt_success = JobTemplate.objects.get(id=5)
|
||||
jt_fail= JobTemplate.objects.get(id=6)
|
||||
jt_never= JobTemplate.objects.get(id=7)
|
||||
|
||||
jt_parallel = []
|
||||
jt_parallel.append(JobTemplate.objects.get(id=16))
|
||||
jt_parallel.append(JobTemplate.objects.get(id=17))
|
||||
jt_parallel.append(JobTemplate.objects.get(id=18))
|
||||
do_init_workflow(jt_success, jt_fail, jt_never, jt_parallel)
|
||||
|
||||
if __name__ == "__main__":
|
||||
do_init()
|
||||
1
awx/main/tests/manual/workflows/parallel.svg
Normal file
1
awx/main/tests/manual/workflows/parallel.svg
Normal file
File diff suppressed because one or more lines are too long
|
After Width: | Height: | Size: 8.6 KiB |
0
awx/main/tests/unit/api/serializers/__init__.py
Normal file
0
awx/main/tests/unit/api/serializers/__init__.py
Normal file
46
awx/main/tests/unit/api/serializers/conftest.py
Normal file
46
awx/main/tests/unit/api/serializers/conftest.py
Normal file
@@ -0,0 +1,46 @@
|
||||
|
||||
import pytest
|
||||
|
||||
@pytest.fixture
|
||||
def get_related_assert():
|
||||
def fn(model_obj, related, resource_name, related_resource_name):
|
||||
assert related_resource_name in related
|
||||
assert related[related_resource_name] == '/api/v1/%s/%d/%s/' % (resource_name, model_obj.pk, related_resource_name)
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def get_related_mock_and_run():
|
||||
def fn(serializer_class, model_obj):
|
||||
serializer = serializer_class()
|
||||
related = serializer.get_related(model_obj)
|
||||
return related
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def test_get_related(get_related_assert, get_related_mock_and_run):
|
||||
def fn(serializer_class, model_obj, resource_name, related_resource_name):
|
||||
related = get_related_mock_and_run(serializer_class, model_obj)
|
||||
get_related_assert(model_obj, related, resource_name, related_resource_name)
|
||||
return related
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def get_summary_fields_assert():
|
||||
def fn(summary, summary_field_name):
|
||||
assert summary_field_name in summary
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def get_summary_fields_mock_and_run():
|
||||
def fn(serializer_class, model_obj):
|
||||
serializer = serializer_class()
|
||||
return serializer.get_summary_fields(model_obj)
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def test_get_summary_fields(get_summary_fields_mock_and_run, get_summary_fields_assert):
|
||||
def fn(serializer_class, model_obj, summary_field_name):
|
||||
summary = get_summary_fields_mock_and_run(serializer_class, model_obj)
|
||||
get_summary_fields_assert(summary, summary_field_name)
|
||||
return summary
|
||||
return fn
|
||||
@@ -0,0 +1,47 @@
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
from mock import PropertyMock
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
CustomInventoryScriptSerializer,
|
||||
)
|
||||
from awx.main.models import (
|
||||
CustomInventoryScript,
|
||||
User,
|
||||
)
|
||||
|
||||
#DRF
|
||||
from rest_framework.request import Request
|
||||
from rest_framework.test import (
|
||||
APIRequestFactory,
|
||||
force_authenticate,
|
||||
)
|
||||
|
||||
class TestCustomInventoryScriptSerializer(object):
|
||||
|
||||
@pytest.mark.parametrize("superuser,sysaudit,admin_role,value",
|
||||
((True, False, False, '#!/python'),
|
||||
(False, True, False, '#!/python'),
|
||||
(False, False, True, '#!/python'),
|
||||
(False, False, False, None)))
|
||||
def test_to_representation_orphan(self, superuser, sysaudit, admin_role, value):
|
||||
with mock.patch.object(CustomInventoryScriptSerializer, 'get_summary_fields', return_value={}):
|
||||
User.add_to_class('is_system_auditor', sysaudit)
|
||||
user = User(username="root", is_superuser=superuser)
|
||||
roles = [user] if admin_role else []
|
||||
|
||||
with mock.patch('awx.main.models.CustomInventoryScript.admin_role', new_callable=PropertyMock, return_value=roles):
|
||||
cis = CustomInventoryScript(pk=1, script='#!/python')
|
||||
serializer = CustomInventoryScriptSerializer()
|
||||
|
||||
factory = APIRequestFactory()
|
||||
wsgi_request = factory.post("/inventory_script/1", {'id':1}, format="json")
|
||||
force_authenticate(wsgi_request, user)
|
||||
|
||||
request = Request(wsgi_request)
|
||||
serializer.context['request'] = request
|
||||
|
||||
representation = serializer.to_representation(cis)
|
||||
assert representation['script'] == value
|
||||
91
awx/main/tests/unit/api/serializers/test_job_serializers.py
Normal file
91
awx/main/tests/unit/api/serializers/test_job_serializers.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
import json
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
JobSerializer,
|
||||
JobOptionsSerializer,
|
||||
)
|
||||
from awx.main.models import (
|
||||
Label,
|
||||
Job,
|
||||
)
|
||||
|
||||
def mock_JT_resource_data():
|
||||
return ({}, [])
|
||||
|
||||
@pytest.fixture
|
||||
def job_template(mocker):
|
||||
mock_jt = mocker.MagicMock(pk=5)
|
||||
mock_jt.resource_validation_data = mock_JT_resource_data
|
||||
return mock_jt
|
||||
|
||||
@pytest.fixture
|
||||
def job(mocker, job_template):
|
||||
return mocker.MagicMock(pk=5, job_template=job_template)
|
||||
|
||||
@pytest.fixture
|
||||
def labels(mocker):
|
||||
return [Label(id=x, name='label-%d' % x) for x in xrange(0, 25)]
|
||||
|
||||
@pytest.fixture
|
||||
def jobs(mocker):
|
||||
return [Job(id=x, name='job-%d' % x) for x in xrange(0, 25)]
|
||||
|
||||
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
|
||||
@mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {})
|
||||
class TestJobSerializerGetRelated():
|
||||
@pytest.mark.parametrize("related_resource_name", [
|
||||
'job_events',
|
||||
'job_plays',
|
||||
'job_tasks',
|
||||
'relaunch',
|
||||
'labels',
|
||||
])
|
||||
def test_get_related(self, test_get_related, job, related_resource_name):
|
||||
test_get_related(JobSerializer, job, 'jobs', related_resource_name)
|
||||
|
||||
def test_job_template_absent(self, job):
|
||||
job.job_template = None
|
||||
serializer = JobSerializer()
|
||||
related = serializer.get_related(job)
|
||||
assert 'job_template' not in related
|
||||
|
||||
def test_job_template_present(self, get_related_mock_and_run, job):
|
||||
related = get_related_mock_and_run(JobSerializer, job)
|
||||
assert 'job_template' in related
|
||||
assert related['job_template'] == '/api/v1/%s/%d/' % ('job_templates', job.job_template.pk)
|
||||
|
||||
@mock.patch('awx.api.serializers.BaseSerializer.to_representation', lambda self,obj: {
|
||||
'extra_vars': obj.extra_vars})
|
||||
class TestJobSerializerSubstitution():
|
||||
|
||||
def test_survey_password_hide(self, mocker):
|
||||
job = mocker.MagicMock(**{
|
||||
'display_extra_vars.return_value': '{\"secret_key\": \"$encrypted$\"}',
|
||||
'extra_vars.return_value': '{\"secret_key\": \"my_password\"}'})
|
||||
serializer = JobSerializer(job)
|
||||
rep = serializer.to_representation(job)
|
||||
extra_vars = json.loads(rep['extra_vars'])
|
||||
assert extra_vars['secret_key'] == '$encrypted$'
|
||||
job.display_extra_vars.assert_called_once_with()
|
||||
assert 'my_password' not in extra_vars
|
||||
|
||||
@mock.patch('awx.api.serializers.BaseSerializer.get_summary_fields', lambda x,y: {})
|
||||
class TestJobOptionsSerializerGetSummaryFields():
|
||||
def test__summary_field_labels_10_max(self, mocker, job_template, labels):
|
||||
job_template.labels.all = mocker.MagicMock(**{'order_by.return_value': labels})
|
||||
job_template.labels.all.return_value = job_template.labels.all
|
||||
|
||||
serializer = JobOptionsSerializer()
|
||||
summary_labels = serializer._summary_field_labels(job_template)
|
||||
|
||||
job_template.labels.all.order_by.assert_called_with('name')
|
||||
assert len(summary_labels['results']) == 10
|
||||
assert summary_labels['results'] == [{'id': x.id, 'name': x.name} for x in labels[:10]]
|
||||
|
||||
def test_labels_exists(self, test_get_summary_fields, job_template):
|
||||
test_get_summary_fields(JobOptionsSerializer, job_template, 'labels')
|
||||
|
||||
@@ -0,0 +1,124 @@
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
JobTemplateSerializer,
|
||||
)
|
||||
from awx.api.views import JobTemplateDetail
|
||||
from awx.main.models import (
|
||||
Role,
|
||||
User,
|
||||
Job,
|
||||
)
|
||||
from rest_framework.test import APIRequestFactory
|
||||
|
||||
#DRF
|
||||
from rest_framework import serializers
|
||||
|
||||
def mock_JT_resource_data():
|
||||
return ({}, [])
|
||||
|
||||
@pytest.fixture
|
||||
def job_template(mocker):
|
||||
mock_jt = mocker.MagicMock(pk=5)
|
||||
mock_jt.resource_validation_data = mock_JT_resource_data
|
||||
return mock_jt
|
||||
|
||||
@pytest.fixture
|
||||
def job(mocker, job_template):
|
||||
return mocker.MagicMock(pk=5, job_template=job_template)
|
||||
|
||||
@pytest.fixture
|
||||
def jobs(mocker):
|
||||
return [Job(id=x, name='job-%d' % x) for x in xrange(0, 25)]
|
||||
|
||||
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
|
||||
@mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {})
|
||||
class TestJobTemplateSerializerGetRelated():
|
||||
@pytest.mark.parametrize("related_resource_name", [
|
||||
'jobs',
|
||||
'schedules',
|
||||
'activity_stream',
|
||||
'launch',
|
||||
'notification_templates_any',
|
||||
'notification_templates_success',
|
||||
'notification_templates_error',
|
||||
'survey_spec',
|
||||
'labels',
|
||||
'callback',
|
||||
])
|
||||
def test_get_related(self, test_get_related, job_template, related_resource_name):
|
||||
test_get_related(JobTemplateSerializer, job_template, 'job_templates', related_resource_name)
|
||||
|
||||
def test_callback_absent(self, get_related_mock_and_run, job_template):
|
||||
job_template.host_config_key = None
|
||||
related = get_related_mock_and_run(JobTemplateSerializer, job_template)
|
||||
assert 'callback' not in related
|
||||
|
||||
class TestJobTemplateSerializerGetSummaryFields():
|
||||
def test__recent_jobs(self, mocker, job_template, jobs):
|
||||
|
||||
job_template.jobs.all = mocker.MagicMock(**{'order_by.return_value': jobs})
|
||||
job_template.jobs.all.return_value = job_template.jobs.all
|
||||
|
||||
serializer = JobTemplateSerializer()
|
||||
recent_jobs = serializer._recent_jobs(job_template)
|
||||
|
||||
job_template.jobs.all.assert_called_once_with()
|
||||
job_template.jobs.all.order_by.assert_called_once_with('-created')
|
||||
assert len(recent_jobs) == 10
|
||||
for x in jobs[:10]:
|
||||
assert recent_jobs == [{'id': x.id, 'status': x.status, 'finished': x.finished} for x in jobs[:10]]
|
||||
|
||||
def test_survey_spec_exists(self, test_get_summary_fields, mocker, job_template):
|
||||
job_template.survey_spec = {'name': 'blah', 'description': 'blah blah'}
|
||||
test_get_summary_fields(JobTemplateSerializer, job_template, 'survey')
|
||||
|
||||
def test_survey_spec_absent(self, get_summary_fields_mock_and_run, job_template):
|
||||
job_template.survey_spec = None
|
||||
summary = get_summary_fields_mock_and_run(JobTemplateSerializer, job_template)
|
||||
assert 'survey' not in summary
|
||||
|
||||
def test_copy_edit_standard(self, mocker, job_template_factory):
|
||||
"""Verify that the exact output of the access.py methods
|
||||
are put into the serializer user_capabilities"""
|
||||
|
||||
jt_obj = job_template_factory('testJT', project='proj1', persisted=False).job_template
|
||||
jt_obj.id = 5
|
||||
jt_obj.admin_role = Role(id=9, role_field='admin_role')
|
||||
jt_obj.execute_role = Role(id=8, role_field='execute_role')
|
||||
jt_obj.read_role = Role(id=7, role_field='execute_role')
|
||||
user = User(username="auser")
|
||||
serializer = JobTemplateSerializer(job_template)
|
||||
serializer.show_capabilities = ['copy', 'edit']
|
||||
serializer._summary_field_labels = lambda self: []
|
||||
serializer._recent_jobs = lambda self: []
|
||||
request = APIRequestFactory().get('/api/v1/job_templates/42/')
|
||||
request.user = user
|
||||
view = JobTemplateDetail()
|
||||
view.request = request
|
||||
serializer.context['view'] = view
|
||||
|
||||
with mocker.patch("awx.main.models.rbac.Role.get_description", return_value='Can eat pie'):
|
||||
with mocker.patch("awx.main.access.JobTemplateAccess.can_change", return_value='foobar'):
|
||||
with mocker.patch("awx.main.access.JobTemplateAccess.can_add", return_value='foo'):
|
||||
response = serializer.get_summary_fields(jt_obj)
|
||||
|
||||
assert response['user_capabilities']['copy'] == 'foo'
|
||||
assert response['user_capabilities']['edit'] == 'foobar'
|
||||
|
||||
class TestJobTemplateSerializerValidation(object):
|
||||
|
||||
good_extra_vars = ["{\"test\": \"keys\"}", "---\ntest: key"]
|
||||
bad_extra_vars = ["{\"test\": \"keys\"", "---\ntest: [2"]
|
||||
|
||||
def test_validate_extra_vars(self):
|
||||
serializer = JobTemplateSerializer()
|
||||
for ev in self.good_extra_vars:
|
||||
serializer.validate_extra_vars(ev)
|
||||
for ev in self.bad_extra_vars:
|
||||
with pytest.raises(serializers.ValidationError):
|
||||
serializer.validate_extra_vars(ev)
|
||||
|
||||
154
awx/main/tests/unit/api/serializers/test_workflow_serializers.py
Normal file
154
awx/main/tests/unit/api/serializers/test_workflow_serializers.py
Normal file
@@ -0,0 +1,154 @@
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
WorkflowJobTemplateSerializer,
|
||||
WorkflowNodeBaseSerializer,
|
||||
WorkflowJobTemplateNodeSerializer,
|
||||
WorkflowJobNodeSerializer,
|
||||
)
|
||||
from awx.main.models import (
|
||||
Job,
|
||||
WorkflowJobTemplateNode,
|
||||
WorkflowJob,
|
||||
WorkflowJobNode,
|
||||
)
|
||||
|
||||
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
|
||||
class TestWorkflowJobTemplateSerializerGetRelated():
|
||||
@pytest.fixture
|
||||
def workflow_job_template(self, workflow_job_template_factory):
|
||||
wfjt = workflow_job_template_factory('hello world', persisted=False).workflow_job_template
|
||||
wfjt.pk = 3
|
||||
return wfjt
|
||||
|
||||
@pytest.mark.parametrize("related_resource_name", [
|
||||
'jobs',
|
||||
'launch',
|
||||
'workflow_nodes',
|
||||
])
|
||||
def test_get_related(self, mocker, test_get_related, workflow_job_template, related_resource_name):
|
||||
test_get_related(WorkflowJobTemplateSerializer,
|
||||
workflow_job_template,
|
||||
'workflow_job_templates',
|
||||
related_resource_name)
|
||||
|
||||
@mock.patch('awx.api.serializers.BaseSerializer.get_related', lambda x,y: {})
|
||||
class TestWorkflowNodeBaseSerializerGetRelated():
|
||||
@pytest.fixture
|
||||
def job_template(self, job_template_factory):
|
||||
jt = job_template_factory(name="blah", persisted=False).job_template
|
||||
jt.pk = 1
|
||||
return jt
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_job_template_node_related(self, job_template):
|
||||
return WorkflowJobTemplateNode(pk=1, unified_job_template=job_template)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_job_template_node(self):
|
||||
return WorkflowJobTemplateNode(pk=1)
|
||||
|
||||
def test_workflow_unified_job_template_present(self, get_related_mock_and_run, workflow_job_template_node_related):
|
||||
related = get_related_mock_and_run(WorkflowNodeBaseSerializer, workflow_job_template_node_related)
|
||||
assert 'unified_job_template' in related
|
||||
assert related['unified_job_template'] == '/api/v1/%s/%d/' % ('job_templates', workflow_job_template_node_related.unified_job_template.pk)
|
||||
|
||||
def test_workflow_unified_job_template_absent(self, workflow_job_template_node):
|
||||
related = WorkflowJobTemplateNodeSerializer().get_related(workflow_job_template_node)
|
||||
assert 'unified_job_template' not in related
|
||||
|
||||
@mock.patch('awx.api.serializers.WorkflowNodeBaseSerializer.get_related', lambda x,y: {})
|
||||
class TestWorkflowJobTemplateNodeSerializerGetRelated():
|
||||
@pytest.fixture
|
||||
def workflow_job_template_node(self):
|
||||
return WorkflowJobTemplateNode(pk=1)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_job_template(self, workflow_job_template_factory):
|
||||
wfjt = workflow_job_template_factory("bliggity", persisted=False).workflow_job_template
|
||||
wfjt.pk = 1
|
||||
return wfjt
|
||||
|
||||
@pytest.fixture
|
||||
def job_template(self, job_template_factory):
|
||||
jt = job_template_factory(name="blah", persisted=False).job_template
|
||||
jt.pk = 1
|
||||
return jt
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_job_template_node_related(self, workflow_job_template_node, workflow_job_template):
|
||||
workflow_job_template_node.workflow_job_template = workflow_job_template
|
||||
return workflow_job_template_node
|
||||
|
||||
@pytest.mark.parametrize("related_resource_name", [
|
||||
'success_nodes',
|
||||
'failure_nodes',
|
||||
'always_nodes',
|
||||
])
|
||||
def test_get_related(self, test_get_related, workflow_job_template_node, related_resource_name):
|
||||
test_get_related(WorkflowJobTemplateNodeSerializer,
|
||||
workflow_job_template_node,
|
||||
'workflow_job_template_nodes',
|
||||
related_resource_name)
|
||||
|
||||
def test_workflow_job_template_present(self, get_related_mock_and_run, workflow_job_template_node_related):
|
||||
related = get_related_mock_and_run(WorkflowJobTemplateNodeSerializer, workflow_job_template_node_related)
|
||||
assert 'workflow_job_template' in related
|
||||
assert related['workflow_job_template'] == '/api/v1/%s/%d/' % ('workflow_job_templates', workflow_job_template_node_related.workflow_job_template.pk)
|
||||
|
||||
def test_workflow_job_template_absent(self, workflow_job_template_node):
|
||||
related = WorkflowJobTemplateNodeSerializer().get_related(workflow_job_template_node)
|
||||
assert 'workflow_job_template' not in related
|
||||
|
||||
|
||||
@mock.patch('awx.api.serializers.WorkflowNodeBaseSerializer.get_related', lambda x,y: {})
|
||||
class TestWorkflowJobNodeSerializerGetRelated():
|
||||
@pytest.fixture
|
||||
def workflow_job_node(self):
|
||||
return WorkflowJobNode(pk=1)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_job(self):
|
||||
return WorkflowJob(pk=1)
|
||||
|
||||
@pytest.fixture
|
||||
def job(self):
|
||||
return Job(name="blah", pk=1)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_job_node_related(self, workflow_job_node, workflow_job, job):
|
||||
workflow_job_node.workflow_job = workflow_job
|
||||
workflow_job_node.job = job
|
||||
return workflow_job_node
|
||||
|
||||
@pytest.mark.parametrize("related_resource_name", [
|
||||
'success_nodes',
|
||||
'failure_nodes',
|
||||
'always_nodes',
|
||||
])
|
||||
def test_get_related(self, test_get_related, workflow_job_node, related_resource_name):
|
||||
test_get_related(WorkflowJobNodeSerializer,
|
||||
workflow_job_node,
|
||||
'workflow_job_nodes',
|
||||
related_resource_name)
|
||||
|
||||
def test_workflow_job_present(self, get_related_mock_and_run, workflow_job_node_related):
|
||||
related = get_related_mock_and_run(WorkflowJobNodeSerializer, workflow_job_node_related)
|
||||
assert 'workflow_job' in related
|
||||
assert related['workflow_job'] == '/api/v1/%s/%d/' % ('workflow_jobs', workflow_job_node_related.workflow_job.pk)
|
||||
|
||||
def test_workflow_job_absent(self, workflow_job_node):
|
||||
related = WorkflowJobNodeSerializer().get_related(workflow_job_node)
|
||||
assert 'workflow_job' not in related
|
||||
|
||||
def test_job_present(self, get_related_mock_and_run, workflow_job_node_related):
|
||||
related = get_related_mock_and_run(WorkflowJobNodeSerializer, workflow_job_node_related)
|
||||
assert 'job' in related
|
||||
assert related['job'] == '/api/v1/%s/%d/' % ('jobs', workflow_job_node_related.job.pk)
|
||||
|
||||
def test_job_absent(self, workflow_job_node):
|
||||
related = WorkflowJobNodeSerializer().get_related(workflow_job_node)
|
||||
assert 'job' not in related
|
||||
@@ -43,6 +43,8 @@ class TestApiV1RootView:
|
||||
'unified_job_templates',
|
||||
'unified_jobs',
|
||||
'activity_stream',
|
||||
'workflow_job_templates',
|
||||
'workflow_jobs',
|
||||
]
|
||||
view = ApiV1RootView()
|
||||
ret = view.get(mocker.MagicMock())
|
||||
|
||||
167
awx/main/tests/unit/commands/test_run_task_system.py
Normal file
167
awx/main/tests/unit/commands/test_run_task_system.py
Normal file
@@ -0,0 +1,167 @@
|
||||
from awx.main.management.commands.run_task_system import (
|
||||
SimpleDAG,
|
||||
WorkflowDAG,
|
||||
)
|
||||
from awx.main.models import Job
|
||||
from awx.main.models.workflow import WorkflowJobNode
|
||||
import pytest
|
||||
|
||||
@pytest.fixture
|
||||
def dag_root():
|
||||
dag = SimpleDAG()
|
||||
data = [
|
||||
{1: 1},
|
||||
{2: 2},
|
||||
{3: 3},
|
||||
{4: 4},
|
||||
{5: 5},
|
||||
{6: 6},
|
||||
]
|
||||
# Add all the nodes to the DAG
|
||||
[dag.add_node(d) for d in data]
|
||||
|
||||
dag.add_edge(data[0], data[1])
|
||||
dag.add_edge(data[2], data[3])
|
||||
dag.add_edge(data[4], data[5])
|
||||
|
||||
return dag
|
||||
|
||||
@pytest.fixture
|
||||
def dag_simple_edge_labels():
|
||||
dag = SimpleDAG()
|
||||
data = [
|
||||
{1: 1},
|
||||
{2: 2},
|
||||
{3: 3},
|
||||
{4: 4},
|
||||
{5: 5},
|
||||
{6: 6},
|
||||
]
|
||||
# Add all the nodes to the DAG
|
||||
[dag.add_node(d) for d in data]
|
||||
|
||||
dag.add_edge(data[0], data[1], 'one')
|
||||
dag.add_edge(data[2], data[3], 'two')
|
||||
dag.add_edge(data[4], data[5], 'three')
|
||||
|
||||
return dag
|
||||
|
||||
'''
|
||||
class TestSimpleDAG(object):
|
||||
def test_get_root_nodes(self, dag_root):
|
||||
leafs = dag_root.get_leaf_nodes()
|
||||
|
||||
roots = dag_root.get_root_nodes()
|
||||
|
||||
def test_get_labeled_edges(self, dag_simple_edge_labels):
|
||||
dag = dag_simple_edge_labels
|
||||
nodes = dag.get_dependencies(dag.nodes[0]['node_object'], 'one')
|
||||
nodes = dag.get_dependencies(dag.nodes[0]['node_object'], 'two')
|
||||
'''
|
||||
|
||||
@pytest.fixture
|
||||
def factory_node():
|
||||
def fn(id, status):
|
||||
wfn = WorkflowJobNode(id=id)
|
||||
if status:
|
||||
j = Job(status=status)
|
||||
wfn.job = j
|
||||
return wfn
|
||||
return fn
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_dag_level_2(factory_node):
|
||||
dag = WorkflowDAG()
|
||||
data = [
|
||||
factory_node(0, 'successful'),
|
||||
factory_node(1, 'successful'),
|
||||
factory_node(2, 'successful'),
|
||||
factory_node(3, None),
|
||||
factory_node(4, None),
|
||||
factory_node(5, None),
|
||||
]
|
||||
[dag.add_node(d) for d in data]
|
||||
|
||||
dag.add_edge(data[0], data[3], 'success_nodes')
|
||||
dag.add_edge(data[1], data[4], 'success_nodes')
|
||||
dag.add_edge(data[2], data[5], 'success_nodes')
|
||||
|
||||
return (dag, data[3:6], False)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_dag_multiple_roots(factory_node):
|
||||
dag = WorkflowDAG()
|
||||
data = [
|
||||
factory_node(1, None),
|
||||
factory_node(2, None),
|
||||
factory_node(3, None),
|
||||
factory_node(4, None),
|
||||
factory_node(5, None),
|
||||
factory_node(6, None),
|
||||
]
|
||||
[dag.add_node(d) for d in data]
|
||||
|
||||
dag.add_edge(data[0], data[3], 'success_nodes')
|
||||
dag.add_edge(data[1], data[4], 'success_nodes')
|
||||
dag.add_edge(data[2], data[5], 'success_nodes')
|
||||
|
||||
expected = data[0:3]
|
||||
return (dag, expected, False)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_dag_multiple_edges_labeled(factory_node):
|
||||
dag = WorkflowDAG()
|
||||
data = [
|
||||
factory_node(0, 'failed'),
|
||||
factory_node(1, None),
|
||||
factory_node(2, 'failed'),
|
||||
factory_node(3, None),
|
||||
factory_node(4, 'failed'),
|
||||
factory_node(5, None),
|
||||
]
|
||||
[dag.add_node(d) for d in data]
|
||||
|
||||
dag.add_edge(data[0], data[1], 'success_nodes')
|
||||
dag.add_edge(data[0], data[2], 'failure_nodes')
|
||||
dag.add_edge(data[2], data[3], 'success_nodes')
|
||||
dag.add_edge(data[2], data[4], 'failure_nodes')
|
||||
dag.add_edge(data[4], data[5], 'failure_nodes')
|
||||
|
||||
expected = data[5:6]
|
||||
return (dag, expected, False)
|
||||
|
||||
@pytest.fixture
|
||||
def workflow_dag_finished(factory_node):
|
||||
dag = WorkflowDAG()
|
||||
data = [
|
||||
factory_node(0, 'failed'),
|
||||
factory_node(1, None),
|
||||
factory_node(2, 'failed'),
|
||||
factory_node(3, None),
|
||||
factory_node(4, 'failed'),
|
||||
factory_node(5, 'successful'),
|
||||
]
|
||||
[dag.add_node(d) for d in data]
|
||||
|
||||
dag.add_edge(data[0], data[1], 'success_nodes')
|
||||
dag.add_edge(data[0], data[2], 'failure_nodes')
|
||||
dag.add_edge(data[2], data[3], 'success_nodes')
|
||||
dag.add_edge(data[2], data[4], 'failure_nodes')
|
||||
dag.add_edge(data[4], data[5], 'failure_nodes')
|
||||
|
||||
expected = []
|
||||
return (dag, expected, True)
|
||||
|
||||
@pytest.fixture(params=['workflow_dag_multiple_roots', 'workflow_dag_level_2', 'workflow_dag_multiple_edges_labeled', 'workflow_dag_finished'])
|
||||
def workflow_dag(request):
|
||||
return request.getfuncargvalue(request.param)
|
||||
|
||||
class TestWorkflowDAG():
|
||||
def test_bfs_nodes_to_run(self, workflow_dag):
|
||||
dag, expected, is_done = workflow_dag
|
||||
assert dag.bfs_nodes_to_run() == expected
|
||||
|
||||
def test_is_workflow_done(self, workflow_dag):
|
||||
dag, expected, is_done = workflow_dag
|
||||
assert dag.is_workflow_done() == is_done
|
||||
|
||||
81
awx/main/tests/unit/models/test_workflow_unit.py
Normal file
81
awx/main/tests/unit/models/test_workflow_unit.py
Normal file
@@ -0,0 +1,81 @@
|
||||
import pytest
|
||||
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
from awx.main.models.workflow import WorkflowJobTemplateNode, WorkflowJobInheritNodesMixin, WorkflowJobNode
|
||||
|
||||
class TestWorkflowJobInheritNodesMixin():
|
||||
class TestCreateWorkflowJobNodes():
|
||||
@pytest.fixture
|
||||
def job_templates(self):
|
||||
return [JobTemplate() for i in range(0, 10)]
|
||||
|
||||
@pytest.fixture
|
||||
def job_template_nodes(self, job_templates):
|
||||
return [WorkflowJobTemplateNode(unified_job_template=job_templates[i]) for i in range(0, 10)]
|
||||
|
||||
def test__create_workflow_job_nodes(self, mocker, job_template_nodes):
|
||||
workflow_job_node_create = mocker.patch('awx.main.models.WorkflowJobNode.objects.create')
|
||||
|
||||
mixin = WorkflowJobInheritNodesMixin()
|
||||
mixin._create_workflow_job_nodes(job_template_nodes)
|
||||
|
||||
for job_template_node in job_template_nodes:
|
||||
workflow_job_node_create.assert_any_call(workflow_job=mixin,
|
||||
unified_job_template=job_template_node.unified_job_template)
|
||||
|
||||
class TestMapWorkflowJobNodes():
|
||||
@pytest.fixture
|
||||
def job_template_nodes(self):
|
||||
return [WorkflowJobTemplateNode(id=i) for i in range(0, 20)]
|
||||
|
||||
@pytest.fixture
|
||||
def job_nodes(self):
|
||||
return [WorkflowJobNode(id=i) for i in range(100, 120)]
|
||||
|
||||
def test__map_workflow_job_nodes(self, job_template_nodes, job_nodes):
|
||||
mixin = WorkflowJobInheritNodesMixin()
|
||||
|
||||
node_ids_map = mixin._map_workflow_job_nodes(job_template_nodes, job_nodes)
|
||||
assert len(node_ids_map) == len(job_template_nodes)
|
||||
|
||||
for i, job_template_node in enumerate(job_template_nodes):
|
||||
assert node_ids_map[job_template_node.id] == job_nodes[i].id
|
||||
|
||||
class TestInheritRelationship():
|
||||
@pytest.fixture
|
||||
def job_template_nodes(self, mocker):
|
||||
nodes = [mocker.MagicMock(id=i) for i in range(0, 10)]
|
||||
|
||||
for i in range(0, 9):
|
||||
nodes[i].success_nodes = [mocker.MagicMock(id=i + 1)]
|
||||
|
||||
return nodes
|
||||
|
||||
@pytest.fixture
|
||||
def job_nodes(self, mocker):
|
||||
nodes = [mocker.MagicMock(id=i) for i in range(100, 110)]
|
||||
return nodes
|
||||
|
||||
@pytest.fixture
|
||||
def job_nodes_dict(self, job_nodes):
|
||||
_map = {}
|
||||
for n in job_nodes:
|
||||
_map[n.id] = n
|
||||
return _map
|
||||
|
||||
|
||||
def test__inherit_relationship(self, mocker, job_template_nodes, job_nodes, job_nodes_dict):
|
||||
mixin = WorkflowJobInheritNodesMixin()
|
||||
|
||||
mixin._get_workflow_job_node_by_id = lambda x: job_nodes_dict[x]
|
||||
mixin._get_all_by_type = lambda x,node_type: x.success_nodes
|
||||
|
||||
node_ids_map = mixin._map_workflow_job_nodes(job_template_nodes, job_nodes)
|
||||
|
||||
for i, job_template_node in enumerate(job_template_nodes):
|
||||
mixin._inherit_relationship(job_template_node, job_nodes[i], node_ids_map, 'success_nodes')
|
||||
|
||||
for i in range(0, 9):
|
||||
job_nodes[i].success_nodes.add.assert_any_call(job_nodes[i + 1])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user