mirror of
https://github.com/ansible/awx.git
synced 2026-05-16 22:07:36 -02:30
Fixes to get flake8 and unit/functional tests passing.
This commit is contained in:
@@ -13,7 +13,6 @@ import tempfile
|
||||
import time
|
||||
import urllib
|
||||
from multiprocessing import Process
|
||||
from subprocess import Popen
|
||||
import re
|
||||
import mock
|
||||
|
||||
@@ -31,7 +30,6 @@ from django.utils.encoding import force_text
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.management.commands.run_callback_receiver import CallbackReceiver
|
||||
from awx.main.management.commands.run_task_system import run_taskmanager
|
||||
from awx.main.utils import get_ansible_version
|
||||
from awx.main.task_engine import TaskEngager as LicenseWriter
|
||||
@@ -45,45 +43,6 @@ TEST_PLAYBOOK = '''- hosts: mygroup
|
||||
command: test 1 = 1
|
||||
'''
|
||||
|
||||
class QueueTestMixin(object):
|
||||
def start_queue(self):
|
||||
self.start_redis()
|
||||
receiver = CallbackReceiver()
|
||||
self.queue_process = Process(target=receiver.run_subscriber,
|
||||
args=(False,))
|
||||
self.queue_process.start()
|
||||
|
||||
def terminate_queue(self):
|
||||
if hasattr(self, 'queue_process'):
|
||||
self.queue_process.terminate()
|
||||
self.stop_redis()
|
||||
|
||||
def start_redis(self):
|
||||
if not getattr(self, 'redis_process', None):
|
||||
# Centos 6.5 redis is runnable by non-root user but is not in a normal users path by default
|
||||
env = dict(os.environ)
|
||||
env['PATH'] = '%s:/usr/sbin/' % env['PATH']
|
||||
self.redis_process = Popen('echo "port 16379" | redis-server - > /dev/null',
|
||||
shell=True, executable='/bin/bash',
|
||||
env=env)
|
||||
|
||||
def stop_redis(self):
|
||||
if getattr(self, 'redis_process', None):
|
||||
self.redis_process.kill()
|
||||
self.redis_process = None
|
||||
|
||||
|
||||
# The observed effect of not calling terminate_queue() if you call start_queue() are
|
||||
# an hang on test cleanup database delete. Thus, to ensure terminate_queue() is called
|
||||
# whenever start_queue() is called just inherit from this class when you want to use the queue.
|
||||
class QueueStartStopTestMixin(QueueTestMixin):
|
||||
def setUp(self):
|
||||
super(QueueStartStopTestMixin, self).setUp()
|
||||
self.start_queue()
|
||||
|
||||
def tearDown(self):
|
||||
super(QueueStartStopTestMixin, self).tearDown()
|
||||
self.terminate_queue()
|
||||
|
||||
class MockCommonlySlowTestMixin(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
@@ -92,7 +51,7 @@ class MockCommonlySlowTestMixin(object):
|
||||
super(MockCommonlySlowTestMixin, self).__init__(*args, **kwargs)
|
||||
|
||||
ansible_version = get_ansible_version()
|
||||
class BaseTestMixin(QueueTestMixin, MockCommonlySlowTestMixin):
|
||||
class BaseTestMixin(MockCommonlySlowTestMixin):
|
||||
'''
|
||||
Mixin with shared code for use by all test cases.
|
||||
'''
|
||||
@@ -733,7 +692,7 @@ class BaseLiveServerTest(BaseTestMixin, django.test.LiveServerTestCase):
|
||||
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
|
||||
ANSIBLE_TRANSPORT='local',
|
||||
DEBUG=True)
|
||||
class BaseJobExecutionTest(QueueStartStopTestMixin, BaseLiveServerTest):
|
||||
class BaseJobExecutionTest(BaseLiveServerTest):
|
||||
'''
|
||||
Base class for celery task tests.
|
||||
'''
|
||||
|
||||
@@ -29,7 +29,7 @@ def mk_instance(persisted=True):
|
||||
if not persisted:
|
||||
raise RuntimeError('creating an Instance requires persisted=True')
|
||||
from django.conf import settings
|
||||
return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, primary=True, hostname="instance.example.org")
|
||||
return Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname="instance.example.org")
|
||||
|
||||
|
||||
def mk_organization(name, description=None, persisted=True):
|
||||
|
||||
@@ -85,7 +85,7 @@ def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible):
|
||||
# Ensure that the message flows from the socket through to process_fact_message()
|
||||
@pytest.mark.django_db
|
||||
def test_run_receiver(mocker, fact_msg_ansible):
|
||||
mocker.patch("awx.main.socket.Socket.listen", return_value=[fact_msg_ansible])
|
||||
mocker.patch("awx.main.socket_queue.Socket.listen", return_value=[fact_msg_ansible])
|
||||
|
||||
receiver = FactCacheReceiver()
|
||||
mocker.patch.object(receiver, 'process_fact_message', return_value=None)
|
||||
|
||||
@@ -152,7 +152,7 @@ def user_project(user):
|
||||
|
||||
@pytest.fixture
|
||||
def instance(settings):
|
||||
return Instance.objects.create(uuid=settings.SYSTEM_UUID, primary=True, hostname="instance.example.org")
|
||||
return Instance.objects.create(uuid=settings.SYSTEM_UUID, hostname="instance.example.org")
|
||||
|
||||
@pytest.fixture
|
||||
def organization(instance):
|
||||
|
||||
Reference in New Issue
Block a user