Merge pull request #13838 from john-westcott-iv/oweel_additional_tests

Added more tests for different modules
This commit is contained in:
John Westcott IV 2023-04-12 13:14:37 -04:00 committed by GitHub
commit eb704dbaad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 24 deletions

View File

@ -153,3 +153,13 @@ def test_post_org_approval_notification(get, post, admin, notification_template,
response = get(url, admin)
assert response.status_code == 200
assert len(response.data['results']) == 1
@pytest.mark.django_db
def test_post_wfj_notification(get, post, admin, workflow_job, notification):
workflow_job.notifications.add(notification)
workflow_job.save()
url = reverse("api:workflow_job_notifications_list", kwargs={'pk': workflow_job.pk})
response = get(url, admin)
assert response.status_code == 200
assert len(response.data['results']) == 1

View File

@ -0,0 +1,54 @@
import pytest
from awx.api.versioning import reverse
@pytest.mark.django_db
@pytest.mark.parametrize(
"is_admin, status",
[
[True, 201],
[False, 403],
], # if they're a WFJ admin, they get a 201 # if they're not a WFJ *nor* org admin, they get a 403
)
def test_workflow_job_relaunch(workflow_job, post, admin_user, alice, is_admin, status):
url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk})
if is_admin:
post(url, user=admin_user, expect=status)
else:
post(url, user=alice, expect=status)
@pytest.mark.django_db
def test_workflow_job_relaunch_failure(workflow_job, post, admin_user):
workflow_job.is_sliced_job = True
workflow_job.job_template = None
workflow_job.save()
url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk})
post(url, user=admin_user, expect=400)
@pytest.mark.django_db
def test_workflow_job_relaunch_not_inventory_failure(workflow_job, post, admin_user):
workflow_job.is_sliced_job = True
workflow_job.inventory = None
workflow_job.save()
url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk})
post(url, user=admin_user, expect=400)
@pytest.mark.django_db
@pytest.mark.parametrize(
"is_admin, status",
[
[True, 202],
[False, 403],
], # if they're a WFJ admin, they get a 202 # if they're not a WFJ *nor* org admin, they get a 403
)
def test_workflow_job_cancel(workflow_job, post, admin_user, alice, is_admin, status):
url = reverse("api:workflow_job_cancel", kwargs={'pk': workflow_job.pk})
if is_admin:
post(url, user=admin_user, expect=status)
else:
post(url, user=alice, expect=status)

View File

@ -743,6 +743,30 @@ def system_job_factory(system_job_template, admin):
return factory
@pytest.fixture
def wfjt(workflow_job_template_factory, organization):
objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_with_nodes(workflow_job_template_factory, organization, job_template):
objects = workflow_job_template_factory(
'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True
)
return objects.workflow_job_template
@pytest.fixture
def wfjt_node(wfjt_with_nodes):
return wfjt_with_nodes.workflow_job_template_nodes.all()[0]
@pytest.fixture
def workflow_job(wfjt):
return wfjt.workflow_jobs.create(name='test_workflow')
def dumps(value):
return DjangoJSONEncoder().encode(value)

View File

@ -123,6 +123,24 @@ def test_inventory_copy(inventory, group_factory, post, get, alice, organization
assert set(group_2_2_copy.hosts.all()) == set()
@pytest.mark.django_db
@pytest.mark.parametrize(
"is_admin, can_copy, status",
[
[True, True, 200],
[False, False, 200],
],
)
def test_workflow_job_template_copy_access(get, admin_user, alice, workflow_job_template, is_admin, can_copy, status):
url = reverse('api:workflow_job_template_copy', kwargs={'pk': workflow_job_template.pk})
if is_admin:
response = get(url, user=admin_user, expect=status)
else:
workflow_job_template.organization.auditor_role.members.add(alice)
response = get(url, user=alice, expect=status)
assert response.data['can_copy'] == can_copy
@pytest.mark.django_db
def test_workflow_job_template_copy(workflow_job_template, post, get, admin, organization):
'''

View File

@ -218,3 +218,31 @@ def test_webhook_notification_pointed_to_a_redirect_launch_endpoint(post, admin,
)
assert n1.send("", n1.messages.get("success").get("body")) == 1
@pytest.mark.django_db
def test_update_notification_template(admin, notification_template):
notification_template.messages['workflow_approval'] = {
"running": {
"message": None,
"body": None,
}
}
notification_template.save()
workflow_approval_message = {
"approved": {
"message": None,
"body": None,
},
"running": {
"message": "test-message",
"body": None,
},
}
notification_template.messages['workflow_approval'] = workflow_approval_message
notification_template.save()
subevents = sorted(notification_template.messages["workflow_approval"].keys())
assert subevents == ["approved", "running"]
assert notification_template.messages['workflow_approval'] == workflow_approval_message

View File

@ -13,30 +13,6 @@ from rest_framework.exceptions import PermissionDenied
from awx.main.models import InventorySource, JobLaunchConfig
@pytest.fixture
def wfjt(workflow_job_template_factory, organization):
objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True)
return objects.workflow_job_template
@pytest.fixture
def wfjt_with_nodes(workflow_job_template_factory, organization, job_template):
objects = workflow_job_template_factory(
'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True
)
return objects.workflow_job_template
@pytest.fixture
def wfjt_node(wfjt_with_nodes):
return wfjt_with_nodes.workflow_job_template_nodes.all()[0]
@pytest.fixture
def workflow_job(wfjt):
return wfjt.workflow_jobs.create(name='test_workflow')
@pytest.mark.django_db
class TestWorkflowJobTemplateAccess:
def test_random_user_no_edit(self, wfjt, rando):