fix tests, add pagination tests

* job_created is a fake field as far as Django is concerned. Under the
hood, in postgres, this is the partition key so it is real. sqlite
doesn't support partitioning so we need to fake some things.
Specifically, we need to remove job_created from being auto-added to
get_event_queryset()
* Add pagination tests for <unified_job_name>/<id>/<job_events>?limit=x
endpoint to make sure the paginator is wired up.
This commit is contained in:
Chris Meyers 2021-05-26 10:53:14 -04:00 committed by Jim Ladd
parent b648957c8e
commit ffbbcd2bf6
3 changed files with 62 additions and 2 deletions

View File

@ -160,3 +160,18 @@ def mock_has_unpartitioned_events():
# We mock this out to circumvent the migration query.
with mock.patch.object(UnifiedJob, 'has_unpartitioned_events', new=False) as _fixture:
yield _fixture
@pytest.fixture(scope='session', autouse=True)
def mock_get_event_queryset_no_job_created():
"""
SQLite friendly since partitions aren't supported. Do not add the faked job_created field to the filter. If we do, it will result in an sql query for the
job_created field. That field does not actually exist in a non-partition scenario.
"""
def event_qs(self):
kwargs = {self.event_parent_key: self.id}
return self.event_class.objects.filter(**kwargs)
with mock.patch.object(UnifiedJob, 'get_event_queryset', lambda self: event_qs(self)) as _fixture:
yield _fixture

View File

@ -4,6 +4,7 @@ from unittest.mock import patch
from urllib.parse import urlencode
from awx.main.models.inventory import Group, Host
from awx.main.models.ad_hoc_commands import AdHocCommand
from awx.api.pagination import Pagination
from awx.api.versioning import reverse
@ -61,3 +62,46 @@ def test_pagination_cap_page_size(get, admin, inventory):
assert jdata['previous'] == host_list_url({'page': '1', 'page_size': '5'})
assert jdata['next'] == host_list_url({'page': '3', 'page_size': '5'})
class TestUnifiedJobEventPagination:
@pytest.fixture
def ad_hoc_command(self, ad_hoc_command_factory):
return ad_hoc_command_factory()
def _test_unified_job(self, get, admin, template, job_attribute, list_endpoint):
if isinstance(template, AdHocCommand):
job = template
else:
job = template.create_unified_job()
kwargs = {job_attribute: job.pk}
for i in range(20):
job.event_class.create_from_data(**kwargs).save()
url = reverse(f'api:{list_endpoint}', kwargs={'pk': job.pk}) + '?limit=7'
resp = get(url, user=admin, expect=200)
assert 'count' not in resp.data
assert 'next' not in resp.data
assert 'previous' not in resp.data
assert len(resp.data['results']) == 7
@pytest.mark.django_db
def test_job(self, get, admin, job_template):
self._test_unified_job(get, admin, job_template, 'job_id', 'job_job_events_list')
@pytest.mark.django_db
def test_project_update(self, get, admin, project):
self._test_unified_job(get, admin, project, 'project_update_id', 'project_update_events_list')
@pytest.mark.django_db
def test_inventory_update(self, get, admin, inventory_source):
self._test_unified_job(get, admin, inventory_source, 'inventory_update_id', 'inventory_update_events_list')
@pytest.mark.django_db
def test_system_job(self, get, admin, system_job_template):
self._test_unified_job(get, admin, system_job_template, 'system_job_id', 'system_job_events_list')
@pytest.mark.django_db
def test_adhoc_command(self, get, admin, ad_hoc_command):
self._test_unified_job(get, admin, ad_hoc_command, 'ad_hoc_command_id', 'ad_hoc_command_ad_hoc_command_events_list')

View File

@ -134,7 +134,8 @@ class TestJobDetailSerializerGetHostStatusCountFields(object):
)
mock_qs = namedtuple('mock_qs', ['get'])(mocker.MagicMock(return_value=mock_event))
job.job_events.only = mocker.MagicMock(return_value=mock_qs)
only = mocker.MagicMock(return_value=mock_qs)
job.get_event_queryset = lambda *args, **kwargs: mocker.MagicMock(only=only)
serializer = JobDetailSerializer()
host_status_counts = serializer.get_host_status_counts(job)
@ -142,7 +143,7 @@ class TestJobDetailSerializerGetHostStatusCountFields(object):
assert host_status_counts == {'ok': 1, 'changed': 1, 'dark': 2}
def test_host_status_counts_is_empty_dict_without_stats_event(self, job):
job.job_events = JobEvent.objects.none()
job.get_event_queryset = lambda *args, **kwargs: JobEvent.objects.none()
serializer = JobDetailSerializer()
host_status_counts = serializer.get_host_status_counts(job)