diff --git a/awx/main/tests/functional/api/test_notifications.py b/awx/main/tests/functional/api/test_notifications.py index 92f6045191..431065396d 100644 --- a/awx/main/tests/functional/api/test_notifications.py +++ b/awx/main/tests/functional/api/test_notifications.py @@ -153,3 +153,13 @@ def test_post_org_approval_notification(get, post, admin, notification_template, response = get(url, admin) assert response.status_code == 200 assert len(response.data['results']) == 1 + + +@pytest.mark.django_db +def test_post_wfj_notification(get, post, admin, workflow_job, notification): + workflow_job.notifications.add(notification) + workflow_job.save() + url = reverse("api:workflow_job_notifications_list", kwargs={'pk': workflow_job.pk}) + response = get(url, admin) + assert response.status_code == 200 + assert len(response.data['results']) == 1 diff --git a/awx/main/tests/functional/api/test_workflow_job.py b/awx/main/tests/functional/api/test_workflow_job.py new file mode 100644 index 0000000000..36553258db --- /dev/null +++ b/awx/main/tests/functional/api/test_workflow_job.py @@ -0,0 +1,54 @@ +import pytest + + +from awx.api.versioning import reverse + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "is_admin, status", + [ + [True, 201], + [False, 403], + ], # if they're a WFJ admin, they get a 201 # if they're not a WFJ *nor* org admin, they get a 403 +) +def test_workflow_job_relaunch(workflow_job, post, admin_user, alice, is_admin, status): + url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk}) + if is_admin: + post(url, user=admin_user, expect=status) + else: + post(url, user=alice, expect=status) + + +@pytest.mark.django_db +def test_workflow_job_relaunch_failure(workflow_job, post, admin_user): + workflow_job.is_sliced_job = True + workflow_job.job_template = None + workflow_job.save() + url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk}) + post(url, user=admin_user, expect=400) + + +@pytest.mark.django_db +def test_workflow_job_relaunch_not_inventory_failure(workflow_job, post, admin_user): + workflow_job.is_sliced_job = True + workflow_job.inventory = None + workflow_job.save() + url = reverse("api:workflow_job_relaunch", kwargs={'pk': workflow_job.pk}) + post(url, user=admin_user, expect=400) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "is_admin, status", + [ + [True, 202], + [False, 403], + ], # if they're a WFJ admin, they get a 202 # if they're not a WFJ *nor* org admin, they get a 403 +) +def test_workflow_job_cancel(workflow_job, post, admin_user, alice, is_admin, status): + url = reverse("api:workflow_job_cancel", kwargs={'pk': workflow_job.pk}) + if is_admin: + post(url, user=admin_user, expect=status) + else: + post(url, user=alice, expect=status) diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index e1284ce87c..c87f0a6c1a 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -743,6 +743,30 @@ def system_job_factory(system_job_template, admin): return factory +@pytest.fixture +def wfjt(workflow_job_template_factory, organization): + objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True) + return objects.workflow_job_template + + +@pytest.fixture +def wfjt_with_nodes(workflow_job_template_factory, organization, job_template): + objects = workflow_job_template_factory( + 'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True + ) + return objects.workflow_job_template + + +@pytest.fixture +def wfjt_node(wfjt_with_nodes): + return wfjt_with_nodes.workflow_job_template_nodes.all()[0] + + +@pytest.fixture +def workflow_job(wfjt): + return wfjt.workflow_jobs.create(name='test_workflow') + + def dumps(value): return DjangoJSONEncoder().encode(value) diff --git a/awx/main/tests/functional/test_copy.py b/awx/main/tests/functional/test_copy.py index 0574f9ccbd..001ebfbc74 100644 --- a/awx/main/tests/functional/test_copy.py +++ b/awx/main/tests/functional/test_copy.py @@ -123,6 +123,24 @@ def test_inventory_copy(inventory, group_factory, post, get, alice, organization assert set(group_2_2_copy.hosts.all()) == set() +@pytest.mark.django_db +@pytest.mark.parametrize( + "is_admin, can_copy, status", + [ + [True, True, 200], + [False, False, 200], + ], +) +def test_workflow_job_template_copy_access(get, admin_user, alice, workflow_job_template, is_admin, can_copy, status): + url = reverse('api:workflow_job_template_copy', kwargs={'pk': workflow_job_template.pk}) + if is_admin: + response = get(url, user=admin_user, expect=status) + else: + workflow_job_template.organization.auditor_role.members.add(alice) + response = get(url, user=alice, expect=status) + assert response.data['can_copy'] == can_copy + + @pytest.mark.django_db def test_workflow_job_template_copy(workflow_job_template, post, get, admin, organization): ''' diff --git a/awx/main/tests/functional/test_notifications.py b/awx/main/tests/functional/test_notifications.py index 08036db97c..092c970539 100644 --- a/awx/main/tests/functional/test_notifications.py +++ b/awx/main/tests/functional/test_notifications.py @@ -218,3 +218,31 @@ def test_webhook_notification_pointed_to_a_redirect_launch_endpoint(post, admin, ) assert n1.send("", n1.messages.get("success").get("body")) == 1 + + +@pytest.mark.django_db +def test_update_notification_template(admin, notification_template): + notification_template.messages['workflow_approval'] = { + "running": { + "message": None, + "body": None, + } + } + notification_template.save() + + workflow_approval_message = { + "approved": { + "message": None, + "body": None, + }, + "running": { + "message": "test-message", + "body": None, + }, + } + notification_template.messages['workflow_approval'] = workflow_approval_message + notification_template.save() + + subevents = sorted(notification_template.messages["workflow_approval"].keys()) + assert subevents == ["approved", "running"] + assert notification_template.messages['workflow_approval'] == workflow_approval_message diff --git a/awx/main/tests/functional/test_rbac_workflow.py b/awx/main/tests/functional/test_rbac_workflow.py index c2022783ac..0143399ba6 100644 --- a/awx/main/tests/functional/test_rbac_workflow.py +++ b/awx/main/tests/functional/test_rbac_workflow.py @@ -13,30 +13,6 @@ from rest_framework.exceptions import PermissionDenied from awx.main.models import InventorySource, JobLaunchConfig -@pytest.fixture -def wfjt(workflow_job_template_factory, organization): - objects = workflow_job_template_factory('test_workflow', organization=organization, persisted=True) - return objects.workflow_job_template - - -@pytest.fixture -def wfjt_with_nodes(workflow_job_template_factory, organization, job_template): - objects = workflow_job_template_factory( - 'test_workflow', organization=organization, workflow_job_template_nodes=[{'unified_job_template': job_template}], persisted=True - ) - return objects.workflow_job_template - - -@pytest.fixture -def wfjt_node(wfjt_with_nodes): - return wfjt_with_nodes.workflow_job_template_nodes.all()[0] - - -@pytest.fixture -def workflow_job(wfjt): - return wfjt.workflow_jobs.create(name='test_workflow') - - @pytest.mark.django_db class TestWorkflowJobTemplateAccess: def test_random_user_no_edit(self, wfjt, rando):