Update unit tests to manage zeromq based tasks

This commit is contained in:
Matthew Jones 2014-02-18 13:54:56 -05:00
parent f6870634c4
commit 11cfacd57a
6 changed files with 35 additions and 0 deletions

View File

@ -10,6 +10,7 @@ import os
import shutil
import tempfile
import time
from multiprocessing import Process
# PyYAML
import yaml
@ -23,6 +24,8 @@ from django.test.client import Client
# AWX
from awx.main.models import *
from awx.main.backend import LDAPSettings
from awx.main.management.commands.run_callback_receiver import run_subscriber
class BaseTestMixin(object):
'''
@ -363,6 +366,14 @@ class BaseTestMixin(object):
for obj in response['results']:
self.assertTrue(set(obj.keys()) <= set(fields))
def start_queue(self, consumer_port, queue_port):
self.queue_process = Process(target=run_subscriber,
args=(consumer_port, queue_port, False,))
self.queue_process.start()
def terminate_queue(self):
self.queue_process.terminate()
class BaseTest(BaseTestMixin, django.test.TestCase):
'''
Base class for unit tests.

View File

@ -301,9 +301,11 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
self.project = None
self.credential = None
settings.INTERNAL_API_URL = self.live_server_url
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(CleanupJobsTest, self).tearDown()
self.terminate_queue()
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)

View File

@ -991,7 +991,12 @@ class InventoryUpdatesTest(BaseTransactionTest):
self.group = self.inventory.groups.create(name='Cloud Group')
self.inventory2 = self.organization.inventories.create(name='Cloud Inventory 2')
self.group2 = self.inventory2.groups.create(name='Cloud Group 2')
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(InventoryUpdatesTest, self).tearDown()
self.terminate_queue()
def update_inventory_source(self, group, **kwargs):
inventory_source = group.inventory_source
update_fields = []

View File

@ -442,6 +442,12 @@ class BaseJobTestMixin(BaseTestMixin):
def setUp(self):
super(BaseJobTestMixin, self).setUp()
self.populate()
#self.start_queue("ipc:///tmp/test_consumer.ipc", "ipc:///tmp/test_queue.ipc")
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(BaseJobTestMixin, self).tearDown()
self.terminate_queue()
class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
@ -773,6 +779,7 @@ MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'),
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CALLBACK_BYPASS_QUEUE=True,
ANSIBLE_TRANSPORT='local',
MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES)
class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
@ -904,6 +911,9 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = self.job_ops_east_run
job.start()
# Wait for events to filter in since we are using a single consumer
time.sleep(30)
# Check that the job detail has been updated.
url = reverse('api:job_detail', args=(job.pk,))
with self.current_user(self.user_sue):

View File

@ -680,6 +680,11 @@ class ProjectUpdatesTest(BaseTransactionTest):
def setUp(self):
super(ProjectUpdatesTest, self).setUp()
self.setup_users()
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(ProjectUpdatesTest, self).tearDown()
self.terminate_queue()
def create_project(self, **kwargs):
cred_fields = ['scm_username', 'scm_password', 'scm_key_data',

View File

@ -188,12 +188,14 @@ class RunJobTest(BaseCeleryTest):
return args
RunJob.build_args = new_build_args
settings.INTERNAL_API_URL = self.live_server_url
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(RunJobTest, self).tearDown()
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)
RunJob.build_args = self.original_build_args
self.terminate_queue()
def create_test_credential(self, **kwargs):
opts = {