Updated zeromq callback to receive callback port via environment variable. Changed production, development and unit tests to all use different ports/queues for the callback.

This commit is contained in:
Chris Church 2014-02-26 14:11:59 -05:00
parent 78bba13bbb
commit 8e61d17cb4
8 changed files with 43 additions and 25 deletions

View File

@ -50,7 +50,12 @@ def run_subscriber(consumer_port, queue_port, use_workers=True):
w.daemon = True
w.start()
workers.append(w)
signal.signal(signal.SIGINT, shutdown_handler(workers))
signal.signal(signal.SIGTERM, shutdown_handler(workers))
if settings.DEBUG:
print 'Started callback receiver (4 workers)'
elif settings.DEBUG:
print 'Started callback receiver (no workers)'
while True: # Handle signal
message = consumer_subscriber.recv_json()
@ -134,4 +139,8 @@ class Command(NoArgsCommand):
self.init_logging()
consumer_port = settings.CALLBACK_CONSUMER_PORT
queue_port = settings.CALLBACK_QUEUE_PORT
run_subscriber(consumer_port, queue_port)
try:
run_subscriber(consumer_port, queue_port)
except KeyboardInterrupt:
pass

View File

@ -388,8 +388,7 @@ class RunJob(BaseTask):
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir
env['REST_API_URL'] = settings.INTERNAL_API_URL
env['REST_API_TOKEN'] = job.task_auth_token or ''
if settings.BROKER_URL.startswith('amqp://'):
env['BROKER_URL'] = settings.BROKER_URL
env['CALLBACK_CONSUMER_PORT'] = settings.CALLBACK_CONSUMER_PORT
if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
env['JOB_CALLBACK_DEBUG'] = '2'
elif settings.DEBUG:
@ -502,14 +501,9 @@ class RunJob(BaseTask):
Hook for actions to run after job/task has completed.
'''
super(RunJob, self).post_run_hook(job, **kwargs)
# Send a special message to this job's event queue after the job has run
# to tell the save job events task to end.
if settings.BROKER_URL.startswith('amqp://'):
pass
# Update job event fields after job has completed (only when using REST
# API callback).
else:
if not settings.CALLBACK_CONSUMER_PORT:
for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True)

View File

@ -7,6 +7,7 @@ import contextlib
import datetime
import json
import os
import random
import shutil
import tempfile
import time
@ -51,11 +52,15 @@ class BaseTestMixin(object):
# commands that run from tests.
for opt in ('ENGINE', 'NAME', 'USER', 'PASSWORD', 'HOST', 'PORT'):
os.environ['AWX_TEST_DATABASE_%s' % opt] = settings.DATABASES['default'][opt]
# For now, prevent tests from trying to use celery for job event
# callbacks.
if settings.BROKER_URL.startswith('amqp://'):
settings.BROKER_URL = 'django://'
# Set flag so that task chain works with unit tests.
settings.CELERY_UNIT_TEST = True
# Create unique random consumer and queue ports for zeromq callback.
if settings.CALLBACK_CONSUMER_PORT:
callback_port = random.randint(55700, 55799)
settings.CALLBACK_CONSUMER_PORT = 'tcp://127.0.0.1:%d' % callback_port
callback_queue_path = '/tmp/callback_receiver_test_%d.ipc' % callback_port
self._temp_project_dirs.append(callback_queue_path)
settings.CALLBACK_QUEUE_PORT = 'ipc://%s' % callback_queue_path
# Make temp job status directory for unit tests.
job_status_dir = tempfile.mkdtemp()
self._temp_project_dirs.append(job_status_dir)
@ -66,7 +71,10 @@ class BaseTestMixin(object):
super(BaseTestMixin, self).tearDown()
for project_dir in self._temp_project_dirs:
if os.path.exists(project_dir):
shutil.rmtree(project_dir, True)
if os.path.isdir(project_dir):
shutil.rmtree(project_dir, True)
else:
os.remove(project_dir)
# Restore previous settings after each test.
settings._wrapped = self._wrapped
@ -372,7 +380,8 @@ class BaseTestMixin(object):
self.queue_process.start()
def terminate_queue(self):
self.queue_process.terminate()
if hasattr(self, 'queue_process'):
self.queue_process.terminate()
class BaseTest(BaseTestMixin, django.test.TestCase):
'''

View File

@ -442,8 +442,8 @@ class BaseJobTestMixin(BaseTestMixin):
def setUp(self):
super(BaseJobTestMixin, self).setUp()
self.populate()
#self.start_queue("ipc:///tmp/test_consumer.ipc", "ipc:///tmp/test_queue.ipc")
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
if settings.CALLBACK_CONSUMER_PORT:
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(BaseJobTestMixin, self).tearDown()
@ -779,7 +779,7 @@ MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'),
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CALLBACK_BYPASS_QUEUE=True,
CALLBACK_CONSUMER_PORT='',
ANSIBLE_TRANSPORT='local',
MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES)
class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):

View File

@ -188,7 +188,8 @@ class RunJobTest(BaseCeleryTest):
return args
RunJob.build_args = new_build_args
settings.INTERNAL_API_URL = self.live_server_url
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
if settings.CALLBACK_CONSUMER_PORT:
self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT)
def tearDown(self):
super(RunJobTest, self).tearDown()

View File

@ -39,14 +39,13 @@ import urllib
import urlparse
import time
# Requests
import requests
# Django
from django.conf import settings
# ZeroMQ
import zmq
class TokenAuth(requests.auth.AuthBase):
def __init__(self, token):
@ -81,6 +80,7 @@ class CallbackModule(object):
self.job_id = int(os.getenv('JOB_ID'))
self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '')
self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '')
self.context = None
self.socket = None
self._init_logging()
@ -111,7 +111,7 @@ class CallbackModule(object):
def _start_connection(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://127.0.0.1:5556")
self.socket.connect(self.callback_consumer_port)
def _post_job_event_queue_msg(self, event, event_data):
msg = {
@ -173,7 +173,7 @@ class CallbackModule(object):
task = getattr(getattr(self, 'task', None), 'name', '')
if task and event not in self.EVENTS_WITHOUT_TASK:
event_data['task'] = task
if not settings.CALLBACK_BYPASS_QUEUE:
if self.callback_consumer_port:
self._post_job_event_queue_msg(event, event_data)
else:
self._post_rest_api_event(event, event_data)

View File

@ -345,9 +345,9 @@ if 'devserver' in INSTALLED_APPS:
else:
INTERNAL_API_URL = 'http://127.0.0.1:8000'
# ZeroMQ callback settings.
CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556"
CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc"
CALLBACK_BYPASS_QUEUE = False
# Logging configuration.
LOGGING = {

View File

@ -17,6 +17,11 @@ from defaults import *
if 'celeryd' in sys.argv:
SQL_DEBUG = False
# Use a different callback consumer/queue for development, to avoid a conflict
# if there is also a nightly install running on the development machine.
CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5557"
CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver_dev.ipc"
# If any local_*.py files are present in awx/settings/, use them to override
# default settings for development. If not present, we can still run using
# only the defaults.