mock run_pexpect for adhoc

This commit is contained in:
Chris Meyers 2015-05-17 22:15:17 -04:00
parent e6b7f75ad7
commit dfc3ee0487
2 changed files with 26 additions and 74 deletions

View File

@ -6,6 +6,7 @@ import glob
import os
import subprocess
import tempfile
import mock
# Django
from django.conf import settings
@ -21,7 +22,6 @@ from awx.main.tests.tasks import TEST_SSH_KEY_DATA, TEST_SSH_KEY_DATA_LOCKED, TE
__all__ = ['RunAdHocCommandTest', 'AdHocCommandApiTest']
class BaseAdHocCommandTest(BaseJobExecutionTest):
'''
Common initialization for testing ad hoc commands.
@ -360,6 +360,9 @@ class RunAdHocCommandTest(BaseAdHocCommandTest):
self.check_job_result(ad_hoc_command, 'error', expect_traceback=True)
def run_pexpect_mock(self, *args, **kwargs):
return 'successful', 0
class AdHocCommandApiTest(BaseAdHocCommandTest):
'''
Test API list/detail views for ad hoc commands.
@ -385,7 +388,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
del data[k]
return self.post(url, data, expect=expect)
def test_ad_hoc_command_list(self):
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_list(self, ignore):
url = reverse('api:ad_hoc_command_list')
# Retrieve the empty list of ad hoc commands.
@ -558,7 +562,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
response = self.run_test_ad_hoc_command(become_enabled=True)
self.assertEqual(response['become_enabled'], True)
def test_ad_hoc_command_detail(self):
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_detail(self, ignore):
with self.current_user('admin'):
response1 = self.run_test_ad_hoc_command()
response2 = self.run_test_ad_hoc_command()
@ -622,7 +627,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.delete(url, expect=204)
self.delete(url, expect=404)
def test_ad_hoc_command_cancel(self):
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_cancel(self, ignore):
# Override setting so that ad hoc command isn't actually started.
with self.settings(CELERY_UNIT_TEST=False):
with self.current_user('admin'):
@ -674,7 +680,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.assertEqual(response['can_cancel'], False)
self.post(url, {}, expect=405)
def test_ad_hoc_command_relaunch(self):
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_relaunch(self, ignore):
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
@ -735,6 +742,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
response = self.post(url, {}, expect=400)
def test_ad_hoc_command_events_list(self):
# TODO: Create test events instead of relying on playbooks execution
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command()
@ -823,6 +832,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.delete(url, expect=401)
def test_ad_hoc_command_event_detail(self):
# TODO: Mock pexpect. Create test events instead of relying on playbooks execution
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
@ -877,7 +888,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_ad_hoc_command_activity_stream(self):
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_ad_hoc_command_activity_stream(self, ignore):
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
@ -927,7 +939,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.patch(url, {}, expect=401)
self.delete(url, expect=401)
def test_inventory_ad_hoc_commands_list(self):
@mock.patch('awx.main.tasks.BaseTask.run_pexpect', side_effect=run_pexpect_mock)
def test_inventory_ad_hoc_commands_list(self, ignore):
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command(inventory=self.inventory2.pk)
@ -1030,6 +1043,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.assertTrue(response['can_run_ad_hoc_commands'])
def test_host_ad_hoc_commands_list(self):
# TODO: Figure out why this test needs pexpect
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()
response = self.run_test_ad_hoc_command(limit=self.host2.name)
@ -1079,6 +1094,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.delete(url, expect=401)
def test_group_ad_hoc_commands_list(self):
# TODO: Figure out why this test needs pexpect
with self.current_user('admin'):
response = self.run_test_ad_hoc_command() # self.host + self.host2
response = self.run_test_ad_hoc_command(limit=self.group.name) # self.host
@ -1133,6 +1150,8 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
self.delete(url, expect=401)
def test_host_ad_hoc_command_events_list(self):
# TODO: Mock run_pexpect. Create test events instead of relying on playbooks execution
with self.current_user('admin'):
response = self.run_test_ad_hoc_command()

View File

@ -14,7 +14,6 @@ import time
from multiprocessing import Process
from subprocess import Popen
import re
from copy import copy
import mock
# PyYAML
@ -80,7 +79,6 @@ class QueueStartStopTestMixin(QueueTestMixin):
class MockCommonlySlowTestMixin(object):
def __init__(self, *args, **kwargs):
#def setUp(self):
from awx.api import generics
mock.patch.object(generics, 'get_view_description', return_value=None).start()
super(MockCommonlySlowTestMixin, self).__init__(*args, **kwargs)
@ -696,71 +694,6 @@ class BaseJobExecutionTest(QueueStartStopTestMixin, BaseLiveServerTest):
Base class for celery task tests.
'''
#class SetupTeardownOnceTestMixin(django.test.TestCase):
flag_setup_ran = False
self_backup = {}
class SetupTeardownOnceTestMixin(BaseTest):
def __init__(self, *args, **kwargs):
global flag_setup_ran
self.setup_ran = flag_setup_ran
super(SetupTeardownOnceTestMixin, self).__init__(*args, **kwargs)
@classmethod
def setUpClass(cls):
super(SetupTeardownOnceTestMixin, cls).setUpClass()
'''
def setUp(self):
global flag_setup_ran
super(SetupTeardownOnceTestMixin, self).setUp()
self.setup_ran = flag_setup_ran
'''
'''
def _pre_setup(self):
if not self.setup_ran:
super(SetupTeardownOnceTestMixin, self)._pre_setup()
'''
def _fixture_setup(self):
return
def _real_fixture_setup(self):
super(SetupTeardownOnceTestMixin, self)._fixture_setup()
'''
def _fixture_teardown(self):
return
def _post_teardown(self):
return
'''
def setup(self):
global flag_setup_ran
global self_backup
if not self.setup_ran:
self._real_fixture_setup()
if not flag_setup_ran:
self_backup = copy(self)
flag_setup_ran = True
else:
for k, v in self_backup.__dict__.iteritems():
if k not in self_backup._keys_before:
setattr(self, k, v)
def should_setup(self):
self._keys_before = None
self._keys_before = self.__dict__.keys()
global flag_setup_ran
return not flag_setup_ran
@classmethod
def tearDownClass(cls):
#super(SetupTeardownOnceTestMixin, cls._self)._fixture_teardown()
super(SetupTeardownOnceTestMixin, cls).tearDownClass()
# Helps with test cases.
# Save all components of a uri (i.e. scheme, username, password, etc.) so that
# when we construct a uri string and decompose it, we can verify the decomposition