Initial working test for run_launch_job task.

This commit is contained in:
Chris Church
2013-03-31 17:25:18 -04:00
parent b38cb80efa
commit c149f03119
7 changed files with 113 additions and 17 deletions

View File

@@ -24,10 +24,13 @@ from django.test.client import Client
from lib.main.models import *
class BaseTest(django.test.TestCase):
class BaseTestMixin(object):
'''
Mixin with shared code for use by all test cases.
'''
def setUp(self):
super(BaseTest, self).setUp()
super(BaseTestMixin, self).setUp()
self.object_ctr = 0
def make_user(self, username, password, super_user=False):
@@ -131,3 +134,13 @@ class BaseTest(django.test.TestCase):
data = self.get(collection_url, expect=200, auth=auth)
return [item['url'] for item in data['results']]
class BaseTest(BaseTestMixin, django.test.TestCase):
'''
Base class for unit tests.
'''
class BaseTransactionTest(BaseTestMixin, django.test.TransactionTestCase):
'''
Base class for tests requiring transactions (or where the test database
needs to be accessed by subprocesses).
'''

View File

@@ -16,13 +16,72 @@
# along with Ansible Commander. If not, see <http://www.gnu.org/licenses/>.
import os
import tempfile
from django.conf import settings
from django.test.utils import override_settings
from lib.main.models import *
from lib.main.tests.base import BaseTest
from lib.main.tests.base import BaseTransactionTest
class RunLaunchJobTest(BaseTest):
TEST_PLAYBOOK = '''- hosts: test-group
gather_facts: False
tasks:
- name: should pass
command: test 1 = 1
'''
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
class BaseCeleryTest(BaseTransactionTest):
'''
Base class for celery task tests.
'''
@override_settings(ANSIBLE_TRANSPORT='local')
class RunLaunchJobTest(BaseCeleryTest):
'''
Test cases for run_launch_job celery task.
'''
def setUp(self):
pass
super(RunLaunchJobTest, self).setUp()
self.setup_users()
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.project = self.make_projects(self.normal_django_user, 1)[0]
handle, self.test_playbook = tempfile.mkstemp(suffix='.yml', prefix='playbook-')
test_playbook_file = os.fdopen(handle, 'w')
test_playbook_file.write(TEST_PLAYBOOK)
test_playbook_file.close()
self.project.default_playbook = self.test_playbook
self.project.save()
self.organization.projects.add(self.project)
self.inventory = Inventory.objects.create(name='test-inventory',
description='description for test-inventory',
organization=self.organization)
self.host = self.inventory.hosts.create(name='host.example.com',
inventory=self.inventory)
self.group = self.inventory.groups.create(name='test-group',
inventory=self.inventory)
self.group.hosts.add(self.host)
self.launch_job = LaunchJob.objects.create(name='test-launch-job',
inventory=self.inventory,
project=self.project)
# Pass test database name in environment for use by the inventory script.
os.environ['ACOM_TEST_DATABASE_NAME'] = settings.DATABASES['default']['NAME']
def tearDown(self):
super(RunLaunchJobTest, self).tearDown()
os.environ.pop('ACOM_TEST_DATABASE_NAME', None)
os.remove(self.test_playbook)
def test_run_launch_job(self):
launch_job_status = self.launch_job.start()
self.assertEqual(launch_job_status.status, 'pending')
launch_job_status = LaunchJobStatus.objects.get(pk=launch_job_status.pk)
self.assertEqual(launch_job_status.status, 'successful')
self.assertTrue(launch_job_status.result_stdout)
#print 'stdout:', launch_job_status.result_stdout
#print 'stderr:', launch_job_status.result_stderr
# FIXME: Test with a task that fails.
# FIXME: Get callback working.