diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 4419146db1..e2c363023e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -46,6 +46,7 @@ from django.contrib.auth.models import User # AWX from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa +from awx.main.models import UnifiedJob from awx.main.models.label import Label from awx.main.queue import FifoQueue from awx.main.conf import tower_settings diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 1cbd9fddf8..ce88f019ce 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1,4 +1,25 @@ -from awx.main.tasks import run_label_cleanup +import pytest +from contextlib import contextmanager + +from awx.main.models import ( + UnifiedJob, + Notification, +) + +from awx.main.tasks import ( + run_label_cleanup, + send_notifications, + run_administrative_checks, +) + +from awx.main.task_engine import TaskSerializer + + +@contextmanager +def apply_patches(_patches): + [p.start() for p in _patches] + yield + [p.stop() for p in _patches] def test_run_label_cleanup(mocker): qs = mocker.Mock(**{'count.return_value': 3, 'delete.return_value': None}) @@ -10,3 +31,51 @@ def test_run_label_cleanup(mocker): qs.delete.assert_called_with() assert 3 == ret +def test_send_notifications_not_list(): + with pytest.raises(TypeError): + send_notifications(None) + +def test_send_notifications_job_id(mocker): + with mocker.patch('awx.main.models.UnifiedJob.objects.get'): + send_notifications([], job_id=1) + assert UnifiedJob.objects.get.called + assert UnifiedJob.objects.get.called_with(id=1) + +def test_send_notifications_list(mocker): + patches = list() + + mock_job = mocker.MagicMock(spec=UnifiedJob) + patches.append(mocker.patch('awx.main.models.UnifiedJob.objects.get', return_value=mock_job)) + + mock_notification = mocker.MagicMock(spec=Notification, subject="test") + patches.append(mocker.patch('awx.main.models.Notification.objects.get', return_value=mock_notification)) + + with apply_patches(patches): + send_notifications([1,2], job_id=1) + assert Notification.objects.get.call_count == 2 + assert mock_notification.status == "successful" + assert mock_notification.save.called + + assert mock_job.notifications.add.called + assert mock_job.notifications.add.called_with(mock_notification) + +@pytest.mark.parametrize("current_instances,call_count", [(91, 2), (89,1)]) +def test_run_admin_checks_usage(mocker, current_instances, call_count): + patches = list() + patches.append(mocker.patch('awx.main.tasks.tower_settings')) + patches.append(mocker.patch('awx.main.tasks.User')) + + mock_ts = mocker.Mock(spec=TaskSerializer) + mock_ts.from_database.return_value = {'instance_count': 100, 'current_instances': current_instances} + patches.append(mocker.patch('awx.main.tasks.TaskSerializer', return_value=mock_ts)) + + mock_sm = mocker.Mock() + patches.append(mocker.patch('awx.main.tasks.send_mail', wraps=mock_sm)) + + with apply_patches(patches): + run_administrative_checks() + assert mock_sm.called + if call_count == 2: + assert '90%' in mock_sm.call_args_list[0][0][0] + else: + assert 'expire' in mock_sm.call_args_list[0][0][0]