add unit tests

This commit is contained in:
Chris Meyers 2016-09-13 13:31:10 -04:00
parent 9c12b234b1
commit 32461574ae
15 changed files with 617 additions and 295 deletions

View File

@ -2174,7 +2174,29 @@ class SystemJobCancelSerializer(SystemJobSerializer):
class Meta:
fields = ('can_cancel',)
class WorkflowJobTemplateSerializer(UnifiedJobTemplateSerializer):
class Meta:
model = WorkflowJobTemplate
fields = ('*',)
def get_related(self, obj):
res = super(WorkflowJobTemplateSerializer, self).get_related(obj)
res.update(dict(
jobs = reverse('api:workflow_job_template_jobs_list', args=(obj.pk,)),
#schedules = reverse('api:workflow_job_template_schedules_list', args=(obj.pk,)),
launch = reverse('api:workflow_job_template_launch', args=(obj.pk,)),
workflow_nodes = reverse('api:workflow_job_template_workflow_nodes_list', args=(obj.pk,)),
# TODO: Implement notifications
#notification_templates_any = reverse('api:system_job_template_notification_templates_any_list', args=(obj.pk,)),
#notification_templates_success = reverse('api:system_job_template_notification_templates_success_list', args=(obj.pk,)),
#notification_templates_error = reverse('api:system_job_template_notification_templates_error_list', args=(obj.pk,)),
))
return res
# TODO:
class WorkflowJobTemplateListSerializer(WorkflowJobTemplateSerializer):
pass
# TODO:
class WorkflowJobSerializer(UnifiedJobSerializer):
@ -2198,36 +2220,10 @@ class WorkflowJobSerializer(UnifiedJobSerializer):
'''
return res
# TODO:
class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer):
pass
# TODO:
class WorkflowJobTemplateListSerializer(UnifiedJobTemplateSerializer):
class Meta:
model = WorkflowJobTemplate
fields = ('*',)
def get_related(self, obj):
res = super(WorkflowJobTemplateListSerializer, self).get_related(obj)
res.update(dict(
jobs = reverse('api:workflow_job_template_jobs_list', args=(obj.pk,)),
#schedules = reverse('api:workflow_job_template_schedules_list', args=(obj.pk,)),
launch = reverse('api:workflow_job_template_launch', args=(obj.pk,)),
workflow_nodes = reverse('api:workflow_job_template_workflow_nodes_list', args=(obj.pk,)),
# TODO: Implement notifications
#notification_templates_any = reverse('api:system_job_template_notification_templates_any_list', args=(obj.pk,)),
#notification_templates_success = reverse('api:system_job_template_notification_templates_success_list', args=(obj.pk,)),
#notification_templates_error = reverse('api:system_job_template_notification_templates_error_list', args=(obj.pk,)),
))
return res
class WorkflowJobTemplateSerializer(WorkflowJobTemplateListSerializer):
pass
class WorkflowNodeBaseSerializer(BaseSerializer):
class Meta:
@ -2273,6 +2269,9 @@ class WorkflowJobNodeSerializer(WorkflowNodeBaseSerializer):
class WorkflowJobNodeListSerializer(WorkflowJobNodeSerializer):
pass
class WorkflowJobNodeDetailSerializer(WorkflowJobNodeSerializer):
pass
class WorkflowJobTemplateNodeDetailSerializer(WorkflowJobTemplateNodeSerializer):
'''

View File

@ -359,7 +359,7 @@ v1_urls = patterns('awx.api.views',
url(r'^workflow_jobs/' ,include(workflow_job_urls)),
url(r'^labels/', include(label_urls)),
url(r'^workflow_job_template_nodes/', include(workflow_job_template_node_urls)),
#url(r'^workflow_job_nodes/', include(workflow_job_node_urls)),
url(r'^workflow_job_nodes/', include(workflow_job_node_urls)),
url(r'^unified_job_templates/$','unified_job_template_list'),
url(r'^unified_jobs/$', 'unified_job_list'),
url(r'^activity_stream/', include(activity_stream_urls)),

View File

@ -2614,6 +2614,20 @@ class JobTemplateObjectRolesList(SubListAPIView):
content_type = ContentType.objects.get_for_model(self.parent_model)
return Role.objects.filter(content_type=content_type, object_id=po.pk)
# TODO:
class WorkflowJobNodeList(ListCreateAPIView):
model = WorkflowJobNode
serializer_class = WorkflowJobNodeListSerializer
new_in_310 = True
# TODO:
class WorkflowJobNodeDetail(RetrieveUpdateDestroyAPIView):
model = WorkflowJobNode
serializer_class = WorkflowJobNodeDetailSerializer
new_in_310 = True
# TODO:
class WorkflowJobTemplateNodeList(ListCreateAPIView):
@ -2628,6 +2642,7 @@ class WorkflowJobTemplateNodeDetail(RetrieveUpdateDestroyAPIView):
serializer_class = WorkflowJobTemplateNodeDetailSerializer
new_in_310 = True
class WorkflowJobTemplateNodeChildrenBaseList(EnforceParentRelationshipMixin, SubListCreateAttachDetachAPIView):
model = WorkflowJobTemplateNode
@ -2656,19 +2671,20 @@ class WorkflowJobTemplateNodeFailureNodesList(WorkflowJobTemplateNodeChildrenBas
class WorkflowJobTemplateNodeAlwaysNodesList(WorkflowJobTemplateNodeChildrenBaseList):
relationship = 'always_nodes'
'''
class WorkflowJobNodeChildrenBaseList(EnforceParentRelationshipMixin, SubListCreateAttachDetachAPIView):
class WorkflowJobNodeChildrenBaseList(SubListAPIView):
model = WorkflowJobNode
serializer_class = WorkflowJobNodeListSerializer
always_allow_superuser = True # TODO: RBAC
parent_model = WorkflowJobTemplateNode
parent_model = Job
relationship = ''
'''
enforce_parent_relationship = 'workflow_job_template'
new_in_310 = True
'''
#
#Limit the set of WorkflowJobTemplateNodes to the related nodes of specified by
#Limit the set of WorkflowJobeNodes to the related nodes of specified by
#'relationship'
#
def get_queryset(self):
@ -2684,7 +2700,6 @@ class WorkflowJobNodeFailureNodesList(WorkflowJobNodeChildrenBaseList):
class WorkflowJobNodeAlwaysNodesList(WorkflowJobNodeChildrenBaseList):
relationship = 'always_nodes'
'''
# TODO:

View File

@ -148,11 +148,11 @@ class WorkflowJobTemplate(UnifiedJobTemplate, WorkflowJobOptions):
class WorkflowJobInheritNodesMixin(object):
def _inherit_relationship(self, old_node, new_node, node_ids_map, node_type):
old_related_nodes = getattr(old_node, node_type).all()
old_related_nodes = self._get_all_by_type(old_node, node_type)
new_node_type_mgr = getattr(new_node, node_type)
for old_related_node in old_related_nodes:
new_related_node = self._get_workflowJob_node_by_id(node_ids_map[old_related_node.id])
new_related_node = self._get_workflow_job_node_by_id(node_ids_map[old_related_node.id])
new_node_type_mgr.add(new_related_node)
'''
@ -172,9 +172,12 @@ class WorkflowJobInheritNodesMixin(object):
def _get_workflow_job_template_nodes(self):
return self.workflow_job_template.workflow_job_template_nodes.all()
def _get_workflowJob_node_by_id(self, id):
def _get_workflow_job_node_by_id(self, id):
return WorkflowJobNode.objects.get(id=id)
def _get_all_by_type(node, node_type):
return getattr(node, node_type).all()
def inherit_job_template_workflow_nodes(self):
old_nodes = self._get_workflow_job_template_nodes()
new_nodes = self._create_workflow_job_nodes(old_nodes)

View File

@ -9,6 +9,7 @@ from awx.main.models import (
Inventory,
Job,
Label,
WorkflowJobTemplateNode,
)
from .objects import (
@ -29,7 +30,6 @@ from .fixtures import (
mk_label,
mk_notification_template,
mk_workflow_job_template,
#mk_workflow_job_template_node,
)
@ -345,29 +345,33 @@ def create_notification_template(name, roles=None, persisted=True, **kwargs):
users=_Mapped(users),
superusers=_Mapped(superusers),
teams=teams)
'''
def generate_workflow_job_template_nodes(workflow_job_template,
unified_job_template,
persisted=True,
def generate_workflow_job_template_nodes(workflow_job_template,
persisted,
**kwargs):
'''
# TODO: Implement survey
'''
def create_workflow_job(name, persisted=True, **kwargs):
Objects = generate_objects(["workflow_job_template",
"survey",], kwargs)
workflow_job_template_nodes = kwargs.get('workflow_job_template_nodes', [])
if len(workflow_job_template_nodes) > 0 and not persisted:
raise RuntimeError('workflow job template nodes can not be used when persisted=False')
spec = None
jobs = None
new_nodes = []
extra_vars = kwargs.get('extra_vars', '')
'''
for i, node in enumerate(workflow_job_template_nodes):
new_node = WorkflowJobTemplateNode(workflow_job_template=workflow_job_template,
unified_job_template=node['unified_job_template'],
id=i)
new_nodes.append(new_node)
node_types = ['success_nodes', 'failure_nodes', 'always_nodes']
for node_type in node_types:
for i, new_node in enumerate(new_nodes):
for related_index in workflow_job_template_nodes[i][node_type]:
getattr(new_node, node_type).add(new_nodes[related_index])
# TODO: Implement survey
# TODO: Implement survey and jobs
def create_workflow_job_template(name, persisted=True, **kwargs):
Objects = generate_objects(["workflow_job_template",
"workflow_job_template_nodes",
"survey",], kwargs)
spec = None
@ -378,9 +382,16 @@ def create_workflow_job_template(name, persisted=True, **kwargs):
if 'survey' in kwargs:
spec = create_survey_spec(kwargs['survey'])
wfjt = mk_workflow_job_template(name, spec=spec, extra_vars=extra_vars,
wfjt = mk_workflow_job_template(name,
spec=spec,
extra_vars=extra_vars,
persisted=persisted)
#workflow_nodes = generate_workflow_job_template_nodes(wfjt, persisted, workflow_nodes=kwargs.get('workflow_nodes'))
workflow_jt_nodes = generate_workflow_job_template_nodes(wfjt,
persisted,
workflow_job_template_nodes=kwargs.get('workflow_job_template_nodes', []))
'''
if 'jobs' in kwargs:
@ -393,5 +404,7 @@ def create_workflow_job_template(name, persisted=True, **kwargs):
'''
return Objects(workflow_job_template=wfjt,
#jobs=jobs,
workflow_job_template_nodes=workflow_jt_nodes,
survey=spec,)

View File

@ -1,6 +1,6 @@
# AWX
from awx.main.models import (
WorkflowNode,
WorkflowJobTemplateNode,
WorkflowJobTemplate,
)
from awx.main.models.jobs import JobTemplate
@ -10,17 +10,17 @@ def do_init_workflow(job_template_success, job_template_fail, job_template_never
wfjt.delete()
wfjt, created = WorkflowJobTemplate.objects.get_or_create(name="parallel workflow")
print(wfjt.id)
WorkflowNode.objects.all().delete()
WorkflowJobTemplateNode.objects.all().delete()
if created:
node_success = WorkflowNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_success)
node_success = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_success)
nodes_never = []
for x in range(0, 3):
nodes_never.append(WorkflowNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_never))
nodes_never.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=job_template_never))
nodes_parallel = []
for jt in jts_parallel:
nodes_parallel.append(WorkflowNode.objects.create(workflow_job_template=wfjt, unified_job_template=jt))
nodes_parallel.append(WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt, unified_job_template=jt))
node_success.success_nodes.add(nodes_parallel[0])
node_success.success_nodes.add(nodes_parallel[1])

View File

@ -0,0 +1,46 @@
import pytest
@pytest.fixture
def get_related_assert():
def fn(model_obj, related, resource_name, related_resource_name):
assert related_resource_name in related
assert related[related_resource_name] == '/api/v1/%s/%d/%s/' % (resource_name, model_obj.pk, related_resource_name)
return fn
@pytest.fixture
def get_related_mock_and_run():
def fn(serializer_class, model_obj):
serializer = serializer_class()
related = serializer.get_related(model_obj)
return related
return fn
@pytest.fixture
def test_get_related(get_related_assert, get_related_mock_and_run):
def fn(serializer_class, model_obj, resource_name, related_resource_name):
related = get_related_mock_and_run(serializer_class, model_obj)
get_related_assert(model_obj, related, resource_name, related_resource_name)
return related
return fn
@pytest.fixture
def get_summary_fields_assert():
def fn(summary, summary_field_name):
assert summary_field_name in summary
return fn
@pytest.fixture
def get_summary_fields_mock_and_run():
def fn(serializer_class, model_obj):
serializer = serializer_class()
return serializer.get_summary_fields(model_obj)
return fn
@pytest.fixture
def test_get_summary_fields(get_summary_fields_mock_and_run, get_summary_fields_assert):
def fn(serializer_class, model_obj, summary_field_name):
summary = get_summary_fields_mock_and_run(serializer_class, model_obj)
get_summary_fields_assert(summary, summary_field_name)
return summary
return fn

View File

@ -0,0 +1,47 @@
# Python
import pytest
import mock
from mock import PropertyMock
# AWX
from awx.api.serializers import (
CustomInventoryScriptSerializer,
)
from awx.main.models import (
CustomInventoryScript,
User,
)
#DRF
from rest_framework.request import Request
from rest_framework.test import (
APIRequestFactory,
force_authenticate,
)
class TestCustomInventoryScriptSerializer(object):
@pytest.mark.parametrize("superuser,sysaudit,admin_role,value",
((True, False, False, '#!/python'),
(False, True, False, '#!/python'),
(False, False, True, '#!/python'),
(False, False, False, None)))
def test_to_representation_orphan(self, superuser, sysaudit, admin_role, value):
with mock.patch.object(CustomInventoryScriptSerializer, 'get_summary_fields', return_value={}):
User.add_to_class('is_system_auditor', sysaudit)
user = User(username="root", is_superuser=superuser)
roles = [user] if admin_role else []
with mock.patch('awx.main.models.CustomInventoryScript.admin_role', new_callable=PropertyMock, return_value=roles):
cis = CustomInventoryScript(pk=1, script='#!/python')
serializer = CustomInventoryScriptSerializer()
factory = APIRequestFactory()
wsgi_request = factory.post("/inventory_script/1", {'id':1}, format="json")
force_authenticate(wsgi_request, user)
request = Request(wsgi_request)
serializer.context['request'] = request
representation = serializer.to_representation(cis)
assert representation['script'] == value

View File

@ -0,0 +1,91 @@
# Python
import pytest
import mock
import json
# AWX
from awx.api.serializers import (
JobSerializer,
JobOptionsSerializer,
)
from awx.main.models import (
Label,
Job,
)
def mock_JT_resource_data():
return ({}, [])
@pytest.fixture
def job_template(mocker):
mock_jt = mocker.MagicMock(pk=5)
mock_jt.resource_validation_data = mock_JT_resource_data
return mock_jt
@pytest.fixture
def job(mocker, job_template):
return mocker.MagicMock(pk=5, job_template=job_template)
@pytest.fixture
def labels(mocker):
return [Label(id=x, name='label-%d' % x) for x in xrange(0, 25)]
@pytest.fixture
def jobs(mocker):
return [Job(id=x, name='job-%d' % x) for x in xrange(0, 25)]
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
@mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {})
class TestJobSerializerGetRelated():
@pytest.mark.parametrize("related_resource_name", [
'job_events',
'job_plays',
'job_tasks',
'relaunch',
'labels',
])
def test_get_related(self, test_get_related, job, related_resource_name):
test_get_related(JobSerializer, job, 'jobs', related_resource_name)
def test_job_template_absent(self, job):
job.job_template = None
serializer = JobSerializer()
related = serializer.get_related(job)
assert 'job_template' not in related
def test_job_template_present(self, get_related_mock_and_run, job):
related = get_related_mock_and_run(JobSerializer, job)
assert 'job_template' in related
assert related['job_template'] == '/api/v1/%s/%d/' % ('job_templates', job.job_template.pk)
@mock.patch('awx.api.serializers.BaseSerializer.to_representation', lambda self,obj: {
'extra_vars': obj.extra_vars})
class TestJobSerializerSubstitution():
def test_survey_password_hide(self, mocker):
job = mocker.MagicMock(**{
'display_extra_vars.return_value': '{\"secret_key\": \"$encrypted$\"}',
'extra_vars.return_value': '{\"secret_key\": \"my_password\"}'})
serializer = JobSerializer(job)
rep = serializer.to_representation(job)
extra_vars = json.loads(rep['extra_vars'])
assert extra_vars['secret_key'] == '$encrypted$'
job.display_extra_vars.assert_called_once_with()
assert 'my_password' not in extra_vars
@mock.patch('awx.api.serializers.BaseSerializer.get_summary_fields', lambda x,y: {})
class TestJobOptionsSerializerGetSummaryFields():
def test__summary_field_labels_10_max(self, mocker, job_template, labels):
job_template.labels.all = mocker.MagicMock(**{'order_by.return_value': labels})
job_template.labels.all.return_value = job_template.labels.all
serializer = JobOptionsSerializer()
summary_labels = serializer._summary_field_labels(job_template)
job_template.labels.all.order_by.assert_called_with('name')
assert len(summary_labels['results']) == 10
assert summary_labels['results'] == [{'id': x.id, 'name': x.name} for x in labels[:10]]
def test_labels_exists(self, test_get_summary_fields, job_template):
test_get_summary_fields(JobOptionsSerializer, job_template, 'labels')

View File

@ -0,0 +1,108 @@
# Python
import pytest
import mock
# AWX
from awx.api.serializers import (
JobTemplateSerializer,
)
from awx.main.models import (
Job,
)
#DRF
from rest_framework import serializers
def mock_JT_resource_data():
return ({}, [])
@pytest.fixture
def job_template(mocker):
mock_jt = mocker.MagicMock(pk=5)
mock_jt.resource_validation_data = mock_JT_resource_data
return mock_jt
@pytest.fixture
def job(mocker, job_template):
return mocker.MagicMock(pk=5, job_template=job_template)
@pytest.fixture
def jobs(mocker):
return [Job(id=x, name='job-%d' % x) for x in xrange(0, 25)]
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
@mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {})
class TestJobTemplateSerializerGetRelated():
@pytest.mark.parametrize("related_resource_name", [
'jobs',
'schedules',
'activity_stream',
'launch',
'notification_templates_any',
'notification_templates_success',
'notification_templates_error',
'survey_spec',
'labels',
'callback',
])
def test_get_related(self, test_get_related, job_template, related_resource_name):
test_get_related(JobTemplateSerializer, job_template, 'job_templates', related_resource_name)
def test_callback_absent(self, get_related_mock_and_run, job_template):
job_template.host_config_key = None
related = get_related_mock_and_run(JobTemplateSerializer, job_template)
assert 'callback' not in related
class TestJobTemplateSerializerGetSummaryFields():
def test__recent_jobs(self, mocker, job_template, jobs):
job_template.jobs.all = mocker.MagicMock(**{'order_by.return_value': jobs})
job_template.jobs.all.return_value = job_template.jobs.all
serializer = JobTemplateSerializer()
recent_jobs = serializer._recent_jobs(job_template)
job_template.jobs.all.assert_called_once_with()
job_template.jobs.all.order_by.assert_called_once_with('-created')
assert len(recent_jobs) == 10
for x in jobs[:10]:
assert recent_jobs == [{'id': x.id, 'status': x.status, 'finished': x.finished} for x in jobs[:10]]
def test_survey_spec_exists(self, test_get_summary_fields, mocker, job_template):
job_template.survey_spec = {'name': 'blah', 'description': 'blah blah'}
test_get_summary_fields(JobTemplateSerializer, job_template, 'survey')
def test_survey_spec_absent(self, get_summary_fields_mock_and_run, job_template):
job_template.survey_spec = None
summary = get_summary_fields_mock_and_run(JobTemplateSerializer, job_template)
assert 'survey' not in summary
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_copy_true(self, mocker, job_template):
pass
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_copy_false(self, mocker, job_template):
pass
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_edit_true(self, mocker, job_template):
pass
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_edit_false(self, mocker, job_template):
pass
class TestJobTemplateSerializerValidation(object):
good_extra_vars = ["{\"test\": \"keys\"}", "---\ntest: key"]
bad_extra_vars = ["{\"test\": \"keys\"", "---\ntest: [2"]
def test_validate_extra_vars(self):
serializer = JobTemplateSerializer()
for ev in self.good_extra_vars:
serializer.validate_extra_vars(ev)
for ev in self.bad_extra_vars:
with pytest.raises(serializers.ValidationError):
serializer.validate_extra_vars(ev)

View File

@ -0,0 +1,154 @@
# Python
import pytest
import mock
# AWX
from awx.api.serializers import (
WorkflowJobTemplateSerializer,
WorkflowNodeBaseSerializer,
WorkflowJobTemplateNodeSerializer,
WorkflowJobNodeSerializer,
)
from awx.main.models import (
Job,
WorkflowJobTemplateNode,
WorkflowJob,
WorkflowJobNode,
)
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
class TestWorkflowJobTemplateSerializerGetRelated():
@pytest.fixture
def workflow_job_template(self, workflow_job_template_factory):
wfjt = workflow_job_template_factory('hello world', persisted=False).workflow_job_template
wfjt.pk = 3
return wfjt
@pytest.mark.parametrize("related_resource_name", [
'jobs',
'launch',
'workflow_nodes',
])
def test_get_related(self, mocker, test_get_related, workflow_job_template, related_resource_name):
test_get_related(WorkflowJobTemplateSerializer,
workflow_job_template,
'workflow_job_templates',
related_resource_name)
@mock.patch('awx.api.serializers.BaseSerializer.get_related', lambda x,y: {})
class TestWorkflowNodeBaseSerializerGetRelated():
@pytest.fixture
def job_template(self, job_template_factory):
jt = job_template_factory(name="blah", persisted=False).job_template
jt.pk = 1
return jt
@pytest.fixture
def workflow_job_template_node_related(self, job_template):
return WorkflowJobTemplateNode(pk=1, unified_job_template=job_template)
@pytest.fixture
def workflow_job_template_node(self):
return WorkflowJobTemplateNode(pk=1)
def test_workflow_unified_job_template_present(self, get_related_mock_and_run, workflow_job_template_node_related):
related = get_related_mock_and_run(WorkflowNodeBaseSerializer, workflow_job_template_node_related)
assert 'unified_job_template' in related
assert related['unified_job_template'] == '/api/v1/%s/%d/' % ('job_templates', workflow_job_template_node_related.unified_job_template.pk)
def test_workflow_unified_job_template_absent(self, workflow_job_template_node):
related = WorkflowJobTemplateNodeSerializer().get_related(workflow_job_template_node)
assert 'unified_job_template' not in related
@mock.patch('awx.api.serializers.WorkflowNodeBaseSerializer.get_related', lambda x,y: {})
class TestWorkflowJobTemplateNodeSerializerGetRelated():
@pytest.fixture
def workflow_job_template_node(self):
return WorkflowJobTemplateNode(pk=1)
@pytest.fixture
def workflow_job_template(self, workflow_job_template_factory):
wfjt = workflow_job_template_factory("bliggity", persisted=False).workflow_job_template
wfjt.pk = 1
return wfjt
@pytest.fixture
def job_template(self, job_template_factory):
jt = job_template_factory(name="blah", persisted=False).job_template
jt.pk = 1
return jt
@pytest.fixture
def workflow_job_template_node_related(self, workflow_job_template_node, workflow_job_template):
workflow_job_template_node.workflow_job_template = workflow_job_template
return workflow_job_template_node
@pytest.mark.parametrize("related_resource_name", [
'success_nodes',
'failure_nodes',
'always_nodes',
])
def test_get_related(self, test_get_related, workflow_job_template_node, related_resource_name):
test_get_related(WorkflowJobTemplateNodeSerializer,
workflow_job_template_node,
'workflow_job_template_nodes',
related_resource_name)
def test_workflow_job_template_present(self, get_related_mock_and_run, workflow_job_template_node_related):
related = get_related_mock_and_run(WorkflowJobTemplateNodeSerializer, workflow_job_template_node_related)
assert 'workflow_job_template' in related
assert related['workflow_job_template'] == '/api/v1/%s/%d/' % ('workflow_job_templates', workflow_job_template_node_related.workflow_job_template.pk)
def test_workflow_job_template_absent(self, workflow_job_template_node):
related = WorkflowJobTemplateNodeSerializer().get_related(workflow_job_template_node)
assert 'workflow_job_template' not in related
@mock.patch('awx.api.serializers.WorkflowNodeBaseSerializer.get_related', lambda x,y: {})
class TestWorkflowJobNodeSerializerGetRelated():
@pytest.fixture
def workflow_job_node(self):
return WorkflowJobNode(pk=1)
@pytest.fixture
def workflow_job(self):
return WorkflowJob(pk=1)
@pytest.fixture
def job(self):
return Job(name="blah", pk=1)
@pytest.fixture
def workflow_job_node_related(self, workflow_job_node, workflow_job, job):
workflow_job_node.workflow_job = workflow_job
workflow_job_node.job = job
return workflow_job_node
@pytest.mark.parametrize("related_resource_name", [
'success_nodes',
'failure_nodes',
'always_nodes',
])
def test_get_related(self, test_get_related, workflow_job_node, related_resource_name):
test_get_related(WorkflowJobNodeSerializer,
workflow_job_node,
'workflow_job_nodes',
related_resource_name)
def test_workflow_job_present(self, get_related_mock_and_run, workflow_job_node_related):
related = get_related_mock_and_run(WorkflowJobNodeSerializer, workflow_job_node_related)
assert 'workflow_job' in related
assert related['workflow_job'] == '/api/v1/%s/%d/' % ('workflow_jobs', workflow_job_node_related.workflow_job.pk)
def test_workflow_job_absent(self, workflow_job_node):
related = WorkflowJobNodeSerializer().get_related(workflow_job_node)
assert 'workflow_job' not in related
def test_job_present(self, get_related_mock_and_run, workflow_job_node_related):
related = get_related_mock_and_run(WorkflowJobNodeSerializer, workflow_job_node_related)
assert 'job' in related
assert related['job'] == '/api/v1/%s/%d/' % ('jobs', workflow_job_node_related.job.pk)
def test_job_absent(self, workflow_job_node):
related = WorkflowJobNodeSerializer().get_related(workflow_job_node)
assert 'job' not in related

View File

@ -1,235 +0,0 @@
# Python
import pytest
import mock
from mock import PropertyMock
import json
# AWX
from awx.api.serializers import (
JobTemplateSerializer,
JobSerializer,
JobOptionsSerializer,
CustomInventoryScriptSerializer,
)
from awx.main.models import (
Label,
Job,
CustomInventoryScript,
User,
)
#DRF
from rest_framework.request import Request
from rest_framework import serializers
from rest_framework.test import (
APIRequestFactory,
force_authenticate,
)
def mock_JT_resource_data():
return ({}, [])
@pytest.fixture
def job_template(mocker):
mock_jt = mocker.MagicMock(pk=5)
mock_jt.resource_validation_data = mock_JT_resource_data
return mock_jt
@pytest.fixture
def job(mocker, job_template):
return mocker.MagicMock(pk=5, job_template=job_template)
@pytest.fixture
def labels(mocker):
return [Label(id=x, name='label-%d' % x) for x in xrange(0, 25)]
@pytest.fixture
def jobs(mocker):
return [Job(id=x, name='job-%d' % x) for x in xrange(0, 25)]
class GetRelatedMixin:
def _assert(self, model_obj, related, resource_name, related_resource_name):
assert related_resource_name in related
assert related[related_resource_name] == '/api/v1/%s/%d/%s/' % (resource_name, model_obj.pk, related_resource_name)
def _mock_and_run(self, serializer_class, model_obj):
serializer = serializer_class()
related = serializer.get_related(model_obj)
return related
def _test_get_related(self, serializer_class, model_obj, resource_name, related_resource_name):
related = self._mock_and_run(serializer_class, model_obj)
self._assert(model_obj, related, resource_name, related_resource_name)
return related
class GetSummaryFieldsMixin:
def _assert(self, summary, summary_field_name):
assert summary_field_name in summary
def _mock_and_run(self, serializer_class, model_obj):
serializer = serializer_class()
return serializer.get_summary_fields(model_obj)
def _test_get_summary_fields(self, serializer_class, model_obj, summary_field_name):
summary = self._mock_and_run(serializer_class, model_obj)
self._assert(summary, summary_field_name)
return summary
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
@mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {})
class TestJobTemplateSerializerGetRelated(GetRelatedMixin):
@pytest.mark.parametrize("related_resource_name", [
'jobs',
'schedules',
'activity_stream',
'launch',
'notification_templates_any',
'notification_templates_success',
'notification_templates_error',
'survey_spec',
'labels',
'callback',
])
def test_get_related(self, job_template, related_resource_name):
self._test_get_related(JobTemplateSerializer, job_template, 'job_templates', related_resource_name)
def test_callback_absent(self, job_template):
job_template.host_config_key = None
related = self._mock_and_run(JobTemplateSerializer, job_template)
assert 'callback' not in related
class TestJobTemplateSerializerGetSummaryFields(GetSummaryFieldsMixin):
def test__recent_jobs(self, mocker, job_template, jobs):
job_template.jobs.all = mocker.MagicMock(**{'order_by.return_value': jobs})
job_template.jobs.all.return_value = job_template.jobs.all
serializer = JobTemplateSerializer()
recent_jobs = serializer._recent_jobs(job_template)
job_template.jobs.all.assert_called_once_with()
job_template.jobs.all.order_by.assert_called_once_with('-created')
assert len(recent_jobs) == 10
for x in jobs[:10]:
assert recent_jobs == [{'id': x.id, 'status': x.status, 'finished': x.finished} for x in jobs[:10]]
def test_survey_spec_exists(self, mocker, job_template):
job_template.survey_spec = {'name': 'blah', 'description': 'blah blah'}
self._test_get_summary_fields(JobTemplateSerializer, job_template, 'survey')
def test_survey_spec_absent(self, mocker, job_template):
job_template.survey_spec = None
summary = self._mock_and_run(JobTemplateSerializer, job_template)
assert 'survey' not in summary
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_copy_true(self, mocker, job_template):
pass
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_copy_false(self, mocker, job_template):
pass
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_edit_true(self, mocker, job_template):
pass
@pytest.mark.skip(reason="RBAC needs to land")
def test_can_edit_false(self, mocker, job_template):
pass
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
@mock.patch('awx.api.serializers.JobOptionsSerializer.get_related', lambda x,y: {})
class TestJobSerializerGetRelated(GetRelatedMixin):
@pytest.mark.parametrize("related_resource_name", [
'job_events',
'job_plays',
'job_tasks',
'relaunch',
'labels',
])
def test_get_related(self, mocker, job, related_resource_name):
self._test_get_related(JobSerializer, job, 'jobs', related_resource_name)
def test_job_template_absent(self, mocker, job):
job.job_template = None
serializer = JobSerializer()
related = serializer.get_related(job)
assert 'job_template' not in related
def test_job_template_present(self, job):
related = self._mock_and_run(JobSerializer, job)
assert 'job_template' in related
assert related['job_template'] == '/api/v1/%s/%d/' % ('job_templates', job.job_template.pk)
@mock.patch('awx.api.serializers.BaseSerializer.to_representation', lambda self,obj: {
'extra_vars': obj.extra_vars})
class TestJobSerializerSubstitution():
def test_survey_password_hide(self, mocker):
job = mocker.MagicMock(**{
'display_extra_vars.return_value': '{\"secret_key\": \"$encrypted$\"}',
'extra_vars.return_value': '{\"secret_key\": \"my_password\"}'})
serializer = JobSerializer(job)
rep = serializer.to_representation(job)
extra_vars = json.loads(rep['extra_vars'])
assert extra_vars['secret_key'] == '$encrypted$'
job.display_extra_vars.assert_called_once_with()
assert 'my_password' not in extra_vars
@mock.patch('awx.api.serializers.BaseSerializer.get_summary_fields', lambda x,y: {})
class TestJobOptionsSerializerGetSummaryFields(GetSummaryFieldsMixin):
def test__summary_field_labels_10_max(self, mocker, job_template, labels):
job_template.labels.all = mocker.MagicMock(**{'order_by.return_value': labels})
job_template.labels.all.return_value = job_template.labels.all
serializer = JobOptionsSerializer()
summary_labels = serializer._summary_field_labels(job_template)
job_template.labels.all.order_by.assert_called_with('name')
assert len(summary_labels['results']) == 10
assert summary_labels['results'] == [{'id': x.id, 'name': x.name} for x in labels[:10]]
def test_labels_exists(self, mocker, job_template):
self._test_get_summary_fields(JobOptionsSerializer, job_template, 'labels')
class TestJobTemplateSerializerValidation(object):
good_extra_vars = ["{\"test\": \"keys\"}", "---\ntest: key"]
bad_extra_vars = ["{\"test\": \"keys\"", "---\ntest: [2"]
def test_validate_extra_vars(self):
serializer = JobTemplateSerializer()
for ev in self.good_extra_vars:
serializer.validate_extra_vars(ev)
for ev in self.bad_extra_vars:
with pytest.raises(serializers.ValidationError):
serializer.validate_extra_vars(ev)
class TestCustomInventoryScriptSerializer(object):
@pytest.mark.parametrize("superuser,sysaudit,admin_role,value",
((True, False, False, '#!/python'),
(False, True, False, '#!/python'),
(False, False, True, '#!/python'),
(False, False, False, None)))
def test_to_representation_orphan(self, superuser, sysaudit, admin_role, value):
with mock.patch.object(CustomInventoryScriptSerializer, 'get_summary_fields', return_value={}):
User.add_to_class('is_system_auditor', sysaudit)
user = User(username="root", is_superuser=superuser)
roles = [user] if admin_role else []
with mock.patch('awx.main.models.CustomInventoryScript.admin_role', new_callable=PropertyMock, return_value=roles):
cis = CustomInventoryScript(pk=1, script='#!/python')
serializer = CustomInventoryScriptSerializer()
factory = APIRequestFactory()
wsgi_request = factory.post("/inventory_script/1", {'id':1}, format="json")
force_authenticate(wsgi_request, user)
request = Request(wsgi_request)
serializer.context['request'] = request
representation = serializer.to_representation(cis)
assert representation['script'] == value

View File

@ -3,7 +3,7 @@ from awx.main.management.commands.run_task_system import (
WorkflowDAG,
)
from awx.main.models import Job
from awx.main.models.workflow import WorkflowNode
from awx.main.models.workflow import WorkflowJobNode
import pytest
@pytest.fixture
@ -62,7 +62,7 @@ class TestSimpleDAG(object):
@pytest.fixture
def factory_node():
def fn(id, status):
wfn = WorkflowNode(id=id)
wfn = WorkflowJobNode(id=id)
if status:
j = Job(status=status)
wfn.job = j

View File

@ -0,0 +1,81 @@
import pytest
from awx.main.models.jobs import JobTemplate
from awx.main.models.workflow import WorkflowJobTemplateNode, WorkflowJobInheritNodesMixin, WorkflowJobNode
class TestWorkflowJobInheritNodesMixin():
class TestCreateWorkflowJobNodes():
@pytest.fixture
def job_templates(self):
return [JobTemplate() for i in range(0, 10)]
@pytest.fixture
def job_template_nodes(self, job_templates):
return [WorkflowJobTemplateNode(unified_job_template=job_templates[i]) for i in range(0, 10)]
def test__create_workflow_job_nodes(self, mocker, job_template_nodes):
workflow_job_node_create = mocker.patch('awx.main.models.WorkflowJobNode.objects.create')
mixin = WorkflowJobInheritNodesMixin()
mixin._create_workflow_job_nodes(job_template_nodes)
for job_template_node in job_template_nodes:
workflow_job_node_create.assert_any_call(workflow_job=mixin,
unified_job_template=job_template_node.unified_job_template)
class TestMapWorkflowJobNodes():
@pytest.fixture
def job_template_nodes(self):
return [WorkflowJobTemplateNode(id=i) for i in range(0, 20)]
@pytest.fixture
def job_nodes(self):
return [WorkflowJobNode(id=i) for i in range(100, 120)]
def test__map_workflow_job_nodes(self, job_template_nodes, job_nodes):
mixin = WorkflowJobInheritNodesMixin()
node_ids_map = mixin._map_workflow_job_nodes(job_template_nodes, job_nodes)
assert len(node_ids_map) == len(job_template_nodes)
for i, job_template_node in enumerate(job_template_nodes):
assert node_ids_map[job_template_node.id] == job_nodes[i].id
class TestInheritRelationship():
@pytest.fixture
def job_template_nodes(self, mocker):
nodes = [mocker.MagicMock(id=i) for i in range(0, 10)]
for i in range(0, 9):
nodes[i].success_nodes = [mocker.MagicMock(id=i + 1)]
return nodes
@pytest.fixture
def job_nodes(self, mocker):
nodes = [mocker.MagicMock(id=i) for i in range(100, 110)]
return nodes
@pytest.fixture
def job_nodes_dict(self, job_nodes):
_map = {}
for n in job_nodes:
_map[n.id] = n
return _map
def test__inherit_relationship(self, mocker, job_template_nodes, job_nodes, job_nodes_dict):
mixin = WorkflowJobInheritNodesMixin()
mixin._get_workflow_job_node_by_id = lambda x: job_nodes_dict[x]
mixin._get_all_by_type = lambda x,node_type: x.success_nodes
node_ids_map = mixin._map_workflow_job_nodes(job_template_nodes, job_nodes)
for i, job_template_node in enumerate(job_template_nodes):
mixin._inherit_relationship(job_template_node, job_nodes[i], node_ids_map, 'success_nodes')
for i in range(0, 9):
job_nodes[i].success_nodes.add.assert_any_call(job_nodes[i + 1])