Modifications to launch the task manager during unit tests

This commit is contained in:
Matthew Jones 2014-03-12 15:06:28 -04:00
parent 5ae324960e
commit 93e68009a7
6 changed files with 22 additions and 1 deletions

View File

@ -26,6 +26,7 @@ from django.test.client import Client
from awx.main.models import *
from awx.main.backend import LDAPSettings
from awx.main.management.commands.run_callback_receiver import run_subscriber
from awx.main.management.commands.run_task_system import run_taskmanager
class BaseTestMixin(object):
@ -61,6 +62,7 @@ class BaseTestMixin(object):
callback_queue_path = '/tmp/callback_receiver_test_%d.ipc' % callback_port
self._temp_project_dirs.append(callback_queue_path)
settings.CALLBACK_QUEUE_PORT = 'ipc://%s' % callback_queue_path
settings.CALLBACK_COMMAND_PORT = 'ipc:///tmp/task_command_receiver_%d.ipc' % callback_port
# Make temp job status directory for unit tests.
job_status_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(job_status_dir)
@ -374,6 +376,15 @@ class BaseTestMixin(object):
for obj in response['results']:
self.assertTrue(set(obj.keys()) <= set(fields))
def start_taskmanager(self, command_port):
self.taskmanager_process = Process(target=run_taskmanager,
args=(command_port,))
self.taskmanager_process.start()
def terminate_taskmanager(self):
if hasattr(self, 'taskmanager_process'):
self.taskmanager_process.terminate()
def start_queue(self, consumer_port, queue_port):
self.queue_process = Process(target=run_subscriber,
args=(consumer_port, queue_port, False,))
@ -382,7 +393,7 @@ class BaseTestMixin(object):
def terminate_queue(self):
if hasattr(self, 'queue_process'):
self.queue_process.terminate()
class BaseTest(BaseTestMixin, django.test.TestCase):
'''
Base class for unit tests.

View File

@ -323,11 +323,13 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
self.project = None
self.credential = None
settings.INTERNAL_API_URL = self.live_server_url
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(CleanupJobsTest, self).tearDown()
self.terminate_queue()
self.terminate_taskmanager()
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)

View File

@ -991,10 +991,12 @@ class InventoryUpdatesTest(BaseTransactionTest):
self.group = self.inventory.groups.create(name='Cloud Group')
self.inventory2 = self.organization.inventories.create(name='Cloud Inventory 2')
self.group2 = self.inventory2.groups.create(name='Cloud Group 2')
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(InventoryUpdatesTest, self).tearDown()
self.terminate_taskmanager()
self.terminate_queue()
def update_inventory_source(self, group, **kwargs):

View File

@ -442,11 +442,13 @@ class BaseJobTestMixin(BaseTestMixin):
def setUp(self):
super(BaseJobTestMixin, self).setUp()
self.populate()
self.start_taskmanager(settings.TASK_COMMAND_PORT)
if settings.CALLBACK_CONSUMER_PORT:
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(BaseJobTestMixin, self).tearDown()
self.terminate_taskmanager()
self.terminate_queue()
class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):

View File

@ -680,10 +680,12 @@ class ProjectUpdatesTest(BaseTransactionTest):
def setUp(self):
super(ProjectUpdatesTest, self).setUp()
self.setup_users()
self.start_taskmanager(settings.TASK_COMMAND_PORT)
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(ProjectUpdatesTest, self).tearDown()
self.terminate_taskmanager()
self.terminate_queue()
def create_project(self, **kwargs):

View File

@ -188,6 +188,7 @@ class RunJobTest(BaseCeleryTest):
return args
RunJob.build_args = new_build_args
settings.INTERNAL_API_URL = self.live_server_url
self.start_taskmanager(settings.TASK_COMMAND_PORT)
if settings.CALLBACK_CONSUMER_PORT:
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
@ -196,6 +197,7 @@ class RunJobTest(BaseCeleryTest):
if self.test_project_path:
shutil.rmtree(self.test_project_path, True)
RunJob.build_args = self.original_build_args
self.terminate_taskmanager()
self.terminate_queue()
def create_test_credential(self, **kwargs):