mirror of
https://github.com/ansible/awx.git
synced 2026-01-13 11:00:03 -03:30
Merge pull request #141 from chrismeyersfsu/refactor-jobs_tests2
split out job test a little bit more
This commit is contained in:
commit
e6f376ebc2
@ -679,6 +679,9 @@ class BaseTransactionTest(BaseTestMixin, django.test.TransactionTestCase):
|
||||
needs to be accessed by subprocesses).
|
||||
'''
|
||||
|
||||
@override_settings(CELERY_ALWAYS_EAGER=True,
|
||||
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
|
||||
ANSIBLE_TRANSPORT='local')
|
||||
class BaseLiveServerTest(BaseTestMixin, django.test.LiveServerTestCase):
|
||||
'''
|
||||
Base class for tests requiring a live test server.
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
from awx.main.tests.jobs.jobs_monolithic import * # noqa
|
||||
from survey_password import * # noqa
|
||||
from base import * # noqa
|
||||
from __future__ import absolute_import
|
||||
|
||||
from .jobs_monolithic import * # noqa
|
||||
from .survey_password import * # noqa
|
||||
from .start_cancel import * # noqa
|
||||
from .base import * # noqa
|
||||
|
||||
@ -25,8 +25,7 @@ import requests
|
||||
from awx.main.models import * # noqa
|
||||
from base import BaseJobTestMixin
|
||||
|
||||
__all__ = ['JobTemplateTest', 'JobTest', 'JobStartCancelTest',
|
||||
'JobTemplateCallbackTest', 'JobTransactionTest', 'JobTemplateSurveyTest']
|
||||
__all__ = ['JobTemplateTest', 'JobTest', 'JobTemplateCallbackTest', 'JobTransactionTest', 'JobTemplateSurveyTest']
|
||||
|
||||
TEST_ASYNC_PLAYBOOK = '''
|
||||
- hosts: all
|
||||
@ -671,270 +670,6 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
|
||||
# and that jobs come back nicely serialized with related resources and so on ...
|
||||
# that we can drill all the way down and can get at host failure lists, etc ...
|
||||
|
||||
|
||||
@override_settings(CELERY_ALWAYS_EAGER=True,
|
||||
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
|
||||
ANSIBLE_TRANSPORT='local')
|
||||
class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
|
||||
'''Job API tests that need to use the celery task backend.'''
|
||||
|
||||
def setUp(self):
|
||||
super(JobStartCancelTest, self).setUp()
|
||||
settings.INTERNAL_API_URL = self.live_server_url
|
||||
|
||||
def tearDown(self):
|
||||
super(JobStartCancelTest, self).tearDown()
|
||||
|
||||
def test_job_start(self):
|
||||
#job = self.job_ops_east_run
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
|
||||
# Test with no auth and with invalid login.
|
||||
self.check_invalid_auth(url)
|
||||
self.check_invalid_auth(url, methods=('post',))
|
||||
|
||||
# Sue can start a job (when passwords are already saved) as long as the
|
||||
# status is new. Reverse list so "new" will be last.
|
||||
for status in reversed([x[0] for x in Job.STATUS_CHOICES]):
|
||||
if status == 'waiting':
|
||||
continue
|
||||
job.status = status
|
||||
job.save()
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
if status == 'new':
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertFalse(response['passwords_needed_to_start'])
|
||||
# response = self.post(url, {}, expect=202)
|
||||
# job = Job.objects.get(pk=job.pk)
|
||||
# self.assertEqual(job.status, 'successful',
|
||||
# job.result_stdout)
|
||||
else:
|
||||
self.assertFalse(response['can_start'])
|
||||
response = self.post(url, {}, expect=405)
|
||||
|
||||
# Test with a job that prompts for SSH and sudo become passwords.
|
||||
#job = self.job_sup_run
|
||||
job = self.make_job(self.jt_sup_run, self.user_sue, 'new')
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_password', 'become_password']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_password'] = 'sshpass'
|
||||
response = self.post(url, data, expect=400)
|
||||
data2 = dict(become_password='sudopass')
|
||||
response = self.post(url, data2, expect=400)
|
||||
data.update(data2)
|
||||
response = self.post(url, data, expect=202)
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
# FIXME: Test run gets the following error in this case:
|
||||
# fatal: [hostname] => sudo output closed while waiting for password prompt:
|
||||
#self.assertEqual(job.status, 'successful')
|
||||
|
||||
# Test with a job that prompts for SSH unlock key, given the wrong key.
|
||||
#job = self.jt_ops_west_run.create_job(
|
||||
# credential=self.cred_greg,
|
||||
# created_by=self.user_sue,
|
||||
#)
|
||||
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
|
||||
job.credential = self.cred_greg
|
||||
job.save()
|
||||
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_key_unlock']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
# The job should start but fail.
|
||||
data['ssh_key_unlock'] = 'sshunlock'
|
||||
response = self.post(url, data, expect=202)
|
||||
# job = Job.objects.get(pk=job.pk)
|
||||
# self.assertEqual(job.status, 'failed')
|
||||
|
||||
# Test with a job that prompts for SSH unlock key, given the right key.
|
||||
from awx.main.tests.tasks import TEST_SSH_KEY_DATA_UNLOCK
|
||||
# job = self.jt_ops_west_run.create_job(
|
||||
# credential=self.cred_greg,
|
||||
# created_by=self.user_sue,
|
||||
# )
|
||||
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
|
||||
job.credential = self.cred_greg
|
||||
job.save()
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_key_unlock']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
|
||||
response = self.post(url, data, expect=202)
|
||||
# job = Job.objects.get(pk=job.pk)
|
||||
# self.assertEqual(job.status, 'successful')
|
||||
|
||||
# FIXME: Test with other users, test when passwords are required.
|
||||
|
||||
def test_job_relaunch(self):
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
|
||||
url = reverse('api:job_relaunch', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.post(url, {}, expect=201)
|
||||
j = Job.objects.get(pk=response['job'])
|
||||
self.assertTrue(j.status == 'successful')
|
||||
# Test with a job that prompts for SSH and sudo passwords.
|
||||
job = self.make_job(self.jt_sup_run, self.user_sue, 'success')
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_password', 'become_password']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_password'] = 'sshpass'
|
||||
response = self.post(url, data, expect=400)
|
||||
data2 = dict(become_password='sudopass')
|
||||
response = self.post(url, data2, expect=400)
|
||||
data.update(data2)
|
||||
response = self.post(url, data, expect=202)
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
|
||||
def test_job_cancel(self):
|
||||
#job = self.job_ops_east_run
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
|
||||
url = reverse('api:job_cancel', args=(job.pk,))
|
||||
|
||||
# Test with no auth and with invalid login.
|
||||
self.check_invalid_auth(url)
|
||||
self.check_invalid_auth(url, methods=('post',))
|
||||
|
||||
# sue can cancel the job, but only when it is pending or running.
|
||||
for status in [x[0] for x in Job.STATUS_CHOICES]:
|
||||
if status == 'waiting':
|
||||
continue
|
||||
job.status = status
|
||||
job.save()
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
if status in ('new', 'pending', 'waiting', 'running'):
|
||||
self.assertTrue(response['can_cancel'])
|
||||
response = self.post(url, {}, expect=202)
|
||||
else:
|
||||
self.assertFalse(response['can_cancel'])
|
||||
response = self.post(url, {}, expect=405)
|
||||
|
||||
# FIXME: Test with other users.
|
||||
|
||||
def test_get_job_results(self):
|
||||
# Start/run a job and then access its results via the API.
|
||||
#job = self.job_ops_east_run
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
|
||||
job.signal_start()
|
||||
|
||||
# Check that the job detail has been updated.
|
||||
url = reverse('api:job_detail', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertEqual(response['status'], 'successful',
|
||||
response['result_traceback'])
|
||||
self.assertTrue(response['result_stdout'])
|
||||
|
||||
# Test job events for completed job.
|
||||
url = reverse('api:job_job_events_list', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = job.job_events.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test individual job event detail records.
|
||||
host_ids = set()
|
||||
for job_event in job.job_events.all():
|
||||
if job_event.host:
|
||||
host_ids.add(job_event.host.pk)
|
||||
url = reverse('api:job_event_detail', args=(job_event.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
|
||||
# Also test job event list for each host.
|
||||
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
|
||||
for host in Host.objects.filter(pk__in=host_ids):
|
||||
url = reverse('api:host_job_events_list', args=(host.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = host.job_events.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test job event list for groups.
|
||||
for group in self.inv_ops_east.groups.all():
|
||||
url = reverse('api:group_job_events_list', args=(group.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = group.job_events.all()
|
||||
self.assertTrue(qs.count(), group)
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test global job event list.
|
||||
url = reverse('api:job_event_list')
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = JobEvent.objects.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test job host summaries for completed job.
|
||||
url = reverse('api:job_job_host_summaries_list', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = job.job_host_summaries.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
# Every host referenced by a job_event should be present as a job
|
||||
# host summary record.
|
||||
self.assertEqual(host_ids,
|
||||
set(qs.values_list('host__pk', flat=True)))
|
||||
|
||||
# Test individual job host summary records.
|
||||
for job_host_summary in job.job_host_summaries.all():
|
||||
url = reverse('api:job_host_summary_detail',
|
||||
args=(job_host_summary.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
|
||||
# Test job host summaries for each host.
|
||||
for host in Host.objects.filter(pk__in=host_ids):
|
||||
url = reverse('api:host_job_host_summaries_list', args=(host.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = host.job_host_summaries.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test job host summaries for groups.
|
||||
for group in self.inv_ops_east.groups.all():
|
||||
url = reverse('api:group_job_host_summaries_list', args=(group.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = group.job_host_summaries.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
@override_settings(CELERY_ALWAYS_EAGER=True,
|
||||
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
|
||||
ANSIBLE_TRANSPORT='local')
|
||||
|
||||
268
awx/main/tests/jobs/start_cancel.py
Normal file
268
awx/main/tests/jobs/start_cancel.py
Normal file
@ -0,0 +1,268 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
from __future__ import absolute_import
|
||||
|
||||
# Django
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.conf import settings
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.tests.base import BaseLiveServerTest
|
||||
from .base import BaseJobTestMixin
|
||||
|
||||
__all__ = ['JobStartCancelTest',]
|
||||
|
||||
class JobStartCancelTest(BaseJobTestMixin, BaseLiveServerTest):
|
||||
|
||||
def test_job_start(self):
|
||||
#job = self.job_ops_east_run
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
|
||||
# Test with no auth and with invalid login.
|
||||
self.check_invalid_auth(url)
|
||||
self.check_invalid_auth(url, methods=('post',))
|
||||
|
||||
# Sue can start a job (when passwords are already saved) as long as the
|
||||
# status is new. Reverse list so "new" will be last.
|
||||
for status in reversed([x[0] for x in Job.STATUS_CHOICES]):
|
||||
if status == 'waiting':
|
||||
continue
|
||||
job.status = status
|
||||
job.save()
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
if status == 'new':
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertFalse(response['passwords_needed_to_start'])
|
||||
# response = self.post(url, {}, expect=202)
|
||||
# job = Job.objects.get(pk=job.pk)
|
||||
# self.assertEqual(job.status, 'successful',
|
||||
# job.result_stdout)
|
||||
else:
|
||||
self.assertFalse(response['can_start'])
|
||||
response = self.post(url, {}, expect=405)
|
||||
|
||||
# Test with a job that prompts for SSH and sudo become passwords.
|
||||
#job = self.job_sup_run
|
||||
job = self.make_job(self.jt_sup_run, self.user_sue, 'new')
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_password', 'become_password']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_password'] = 'sshpass'
|
||||
response = self.post(url, data, expect=400)
|
||||
data2 = dict(become_password='sudopass')
|
||||
response = self.post(url, data2, expect=400)
|
||||
data.update(data2)
|
||||
response = self.post(url, data, expect=202)
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
# FIXME: Test run gets the following error in this case:
|
||||
# fatal: [hostname] => sudo output closed while waiting for password prompt:
|
||||
#self.assertEqual(job.status, 'successful')
|
||||
|
||||
# Test with a job that prompts for SSH unlock key, given the wrong key.
|
||||
#job = self.jt_ops_west_run.create_job(
|
||||
# credential=self.cred_greg,
|
||||
# created_by=self.user_sue,
|
||||
#)
|
||||
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
|
||||
job.credential = self.cred_greg
|
||||
job.save()
|
||||
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_key_unlock']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
# The job should start but fail.
|
||||
data['ssh_key_unlock'] = 'sshunlock'
|
||||
response = self.post(url, data, expect=202)
|
||||
# job = Job.objects.get(pk=job.pk)
|
||||
# self.assertEqual(job.status, 'failed')
|
||||
|
||||
# Test with a job that prompts for SSH unlock key, given the right key.
|
||||
from awx.main.tests.tasks import TEST_SSH_KEY_DATA_UNLOCK
|
||||
# job = self.jt_ops_west_run.create_job(
|
||||
# credential=self.cred_greg,
|
||||
# created_by=self.user_sue,
|
||||
# )
|
||||
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
|
||||
job.credential = self.cred_greg
|
||||
job.save()
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertTrue(response['can_start'])
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_key_unlock']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
|
||||
response = self.post(url, data, expect=202)
|
||||
# job = Job.objects.get(pk=job.pk)
|
||||
# self.assertEqual(job.status, 'successful')
|
||||
|
||||
# FIXME: Test with other users, test when passwords are required.
|
||||
|
||||
def test_job_relaunch(self):
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
|
||||
url = reverse('api:job_relaunch', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.post(url, {}, expect=201)
|
||||
j = Job.objects.get(pk=response['job'])
|
||||
self.assertTrue(j.status == 'successful')
|
||||
# Test with a job that prompts for SSH and sudo passwords.
|
||||
job = self.make_job(self.jt_sup_run, self.user_sue, 'success')
|
||||
url = reverse('api:job_start', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertEqual(set(response['passwords_needed_to_start']),
|
||||
set(['ssh_password', 'become_password']))
|
||||
data = dict()
|
||||
response = self.post(url, data, expect=400)
|
||||
data['ssh_password'] = 'sshpass'
|
||||
response = self.post(url, data, expect=400)
|
||||
data2 = dict(become_password='sudopass')
|
||||
response = self.post(url, data2, expect=400)
|
||||
data.update(data2)
|
||||
response = self.post(url, data, expect=202)
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
|
||||
def test_job_cancel(self):
|
||||
#job = self.job_ops_east_run
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
|
||||
url = reverse('api:job_cancel', args=(job.pk,))
|
||||
|
||||
# Test with no auth and with invalid login.
|
||||
self.check_invalid_auth(url)
|
||||
self.check_invalid_auth(url, methods=('post',))
|
||||
|
||||
# sue can cancel the job, but only when it is pending or running.
|
||||
for status in [x[0] for x in Job.STATUS_CHOICES]:
|
||||
if status == 'waiting':
|
||||
continue
|
||||
job.status = status
|
||||
job.save()
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
if status in ('new', 'pending', 'waiting', 'running'):
|
||||
self.assertTrue(response['can_cancel'])
|
||||
response = self.post(url, {}, expect=202)
|
||||
else:
|
||||
self.assertFalse(response['can_cancel'])
|
||||
response = self.post(url, {}, expect=405)
|
||||
|
||||
# FIXME: Test with other users.
|
||||
|
||||
def test_get_job_results(self):
|
||||
# Start/run a job and then access its results via the API.
|
||||
#job = self.job_ops_east_run
|
||||
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
|
||||
job.signal_start()
|
||||
|
||||
# Check that the job detail has been updated.
|
||||
url = reverse('api:job_detail', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
self.assertEqual(response['status'], 'successful',
|
||||
response['result_traceback'])
|
||||
self.assertTrue(response['result_stdout'])
|
||||
|
||||
# Test job events for completed job.
|
||||
url = reverse('api:job_job_events_list', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = job.job_events.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test individual job event detail records.
|
||||
host_ids = set()
|
||||
for job_event in job.job_events.all():
|
||||
if job_event.host:
|
||||
host_ids.add(job_event.host.pk)
|
||||
url = reverse('api:job_event_detail', args=(job_event.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
|
||||
# Also test job event list for each host.
|
||||
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
|
||||
for host in Host.objects.filter(pk__in=host_ids):
|
||||
url = reverse('api:host_job_events_list', args=(host.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = host.job_events.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test job event list for groups.
|
||||
for group in self.inv_ops_east.groups.all():
|
||||
url = reverse('api:group_job_events_list', args=(group.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = group.job_events.all()
|
||||
self.assertTrue(qs.count(), group)
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test global job event list.
|
||||
url = reverse('api:job_event_list')
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = JobEvent.objects.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test job host summaries for completed job.
|
||||
url = reverse('api:job_job_host_summaries_list', args=(job.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = job.job_host_summaries.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
# Every host referenced by a job_event should be present as a job
|
||||
# host summary record.
|
||||
self.assertEqual(host_ids,
|
||||
set(qs.values_list('host__pk', flat=True)))
|
||||
|
||||
# Test individual job host summary records.
|
||||
for job_host_summary in job.job_host_summaries.all():
|
||||
url = reverse('api:job_host_summary_detail',
|
||||
args=(job_host_summary.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
|
||||
# Test job host summaries for each host.
|
||||
for host in Host.objects.filter(pk__in=host_ids):
|
||||
url = reverse('api:host_job_host_summaries_list', args=(host.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = host.job_host_summaries.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
|
||||
# Test job host summaries for groups.
|
||||
for group in self.inv_ops_east.groups.all():
|
||||
url = reverse('api:group_job_host_summaries_list', args=(group.pk,))
|
||||
with self.current_user(self.user_sue):
|
||||
response = self.get(url)
|
||||
qs = group.job_host_summaries.all()
|
||||
self.assertTrue(qs.count())
|
||||
self.check_pagination_and_size(response, qs.count())
|
||||
self.check_list_ids(response, qs)
|
||||
Loading…
x
Reference in New Issue
Block a user