Add more tests for different modules

This commit is contained in:
Alexander Komarov 2021-07-30 14:21:41 +05:00 committed by John Westcott IV
parent ffa3cd1fff
commit e53a5da91e
6 changed files with 136 additions and 25 deletions

View File

@ -153,3 +153,13 @@ def test_post_org_approval_notification(get, post, admin, notification_template,
response = get(url, admin)
assert response.status_code == 200
assert len(response.data['results']) == 1
@pytest.mark.django_db
def test_post_wfj_notification(get, post, admin, workflow_job, notification):
workflow_job.notifications.add(notification)
workflow_job.save()
url = reverse("api:workflow_job_notifications_list", kwargs={'pk': workflow_job.pk})
response = get(url, admin)
assert response.status_code == 200
assert len(response.data['results']) == 1

View File

@ -0,0 +1,54 @@
import pytest
from awx.api.versioning import reverse
@pytest.mark.django_db
@pytest.mark.parametrize(
"is_admin, status",
[
[True, 201],
[False, 403],
], # if they're a WFJ admin, they get a 201 # if they're not a WFJ *nor* org admin, they get a 403
)
def test_workflow_job_relaunch(workflow_job, post, admin_user, alice, is_admin, status):
url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk})
if is_admin:
post(url, user=admin_user, expect=status)
else:
post(url, user=alice, expect=status)
@pytest.mark.django_db
def test_workflow_job_relaunch_failure(workflow_job, post, admin_user):
workflow_job.is_sliced_job = True
workflow_job.job_template = None
workflow_job.save()
url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk})
post(url, user=admin_user, expect=400)
@pytest.mark.django_db
def test_workflow_job_relaunch_not_inventory_failure(workflow_job, post, admin_user):
workflow_job.is_sliced_job = True
workflow_job.inventory = None
workflow_job.save()
url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk})
post(url, user=admin_user, expect=400)
@pytest.mark.django_db
@pytest.mark.parametrize(
"is_admin, status",
[
[True, 202],
[False, 403],
], # if they're a WFJ admin, they get a 202 # if they're not a WFJ *nor* org admin, they get a 403
)
def test_workflow_job_cancel(workflow_job, post, admin_user, alice, is_admin, status):
url = reverse("api:workflow_job_cancel", kwargs={'pk': workflow_job.pk})
if is_admin:
post(url, user=admin_user, expect=status)
else:
post(url, user=alice, expect=status)

View File

@ -39,7 +39,7 @@ from awx.main.models.events import (
InventoryUpdateEvent,
SystemJobEvent,
)
from awx.main.models.workflow import WorkflowJobTemplate
from awx.main.models.workflow import WorkflowJobTemplate, WorkflowJob
from awx.main.models.ad_hoc_commands import AdHocCommand
from awx.main.models.oauth import OAuth2Application as Application
from awx.main.models.execution_environments import ExecutionEnvironment
@ -743,6 +743,30 @@ def system_job_factory(system_job_template, admin):
return factory
@pytest.fixture
def wfjt(workflow_job_template_factory, organization):
objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_with_nodes(workflow_job_template_factory, organization, job_template):
objects = workflow_job_template_factory(
'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True
)
return objects.workflow_job_template
@pytest.fixture
def wfjt_node(wfjt_with_nodes):
return wfjt_with_nodes.workflow_job_template_nodes.all()[0]
@pytest.fixture
def workflow_job(wfjt):
return wfjt.workflow_jobs.create(name='test_workflow')
def dumps(value):
return DjangoJSONEncoder().encode(value)

View File

@ -123,6 +123,24 @@ def test_inventory_copy(inventory, group_factory, post, get, alice, organization
assert set(group_2_2_copy.hosts.all()) == set()
@pytest.mark.django_db
@pytest.mark.parametrize(
"is_admin, can_copy, status",
[
[True, True, 200],
[False, False, 200],
],
)
def test_workflow_job_template_copy_access(get, admin_user, alice, workflow_job_template, is_admin, can_copy, status):
url = reverse('api:workflow_job_template_copy', kwargs={'pk': workflow_job_template.pk})
if is_admin:
response = get(url, user=admin_user, expect=status)
else:
workflow_job_template.organization.auditor_role.members.add(alice)
response = get(url, user=alice, expect=status)
assert response.data['can_copy'] == can_copy
@pytest.mark.django_db
def test_workflow_job_template_copy(workflow_job_template, post, get, admin, organization):
'''

View File

@ -1,5 +1,6 @@
from unittest import mock
import pytest
import json
from requests.adapters import HTTPAdapter
from requests.utils import select_proxy
@ -218,3 +219,31 @@ def test_webhook_notification_pointed_to_a_redirect_launch_endpoint(post, admin,
)
assert n1.send("", n1.messages.get("success").get("body")) == 1
@pytest.mark.django_db
def test_update_notification_template(admin, notification_template):
notification_template.messages['workflow_approval'] = {
"running": {
"message": None,
"body": None,
}
}
notification_template.save()
workflow_approval_message = {
"approved": {
"message": None,
"body": None,
},
"running": {
"message": "test-message",
"body": None,
},
}
notification_template.messages['workflow_approval'] = workflow_approval_message
notification_template.save()
subevents = sorted(notification_template.messages["workflow_approval"].keys())
assert subevents == ["approved", "running"]
assert notification_template.messages['workflow_approval'] == workflow_approval_message

View File

@ -13,30 +13,6 @@ from rest_framework.exceptions import PermissionDenied
from awx.main.models import InventorySource, JobLaunchConfig
@pytest.fixture
def wfjt(workflow_job_template_factory, organization):
objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_with_nodes(workflow_job_template_factory, organization, job_template):
objects = workflow_job_template_factory(
'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True
)
return objects.workflow_job_template
@pytest.fixture
def wfjt_node(wfjt_with_nodes):
return wfjt_with_nodes.workflow_job_template_nodes.all()[0]
@pytest.fixture
def workflow_job(wfjt):
return wfjt.workflow_jobs.create(name='test_workflow')
@pytest.mark.django_db
class TestWorkflowJobTemplateAccess:
def test_random_user_no_edit(self, wfjt, rando):