ZeroMQ changes.

This commit is contained in:
Luke Sneeringer
2014-11-21 16:21:15 -06:00
parent 35824f8d67
commit 1efff8267d
2 changed files with 38 additions and 16 deletions

View File

@@ -5,18 +5,15 @@ import os
import zmq import zmq
from django.conf import settings
class Socket(object): class Socket(object):
"""An abstraction class implemented for a dumb OS socket. """An abstraction class implemented for a dumb OS socket.
Intended to allow alteration of backend details in a single, consistent Intended to allow alteration of backend details in a single, consistent
way throughout the Tower application. way throughout the Tower application.
""" """
ports = {
'callbacks': 5556,
'task_commands': 6559,
'websocket': 6557,
}
def __init__(self, bucket, rw, debug=0, logger=None): def __init__(self, bucket, rw, debug=0, logger=None):
"""Instantiate a Socket object, which uses ZeroMQ to actually perform """Instantiate a Socket object, which uses ZeroMQ to actually perform
passing a message back and forth. passing a message back and forth.
@@ -61,7 +58,11 @@ class Socket(object):
@property @property
def port(self): def port(self):
return self.ports[self._bucket] return {
'callbacks': settings.CALLBACK_CONSUMER_PORT,
'task_commands': settings.TASK_COMMAND_PORT,
'websocket': settings.SOCKETIO_NOTIFICATION_PORT,
}[self._bucket]
def connect(self): def connect(self):
"""Connect to ZeroMQ.""" """Connect to ZeroMQ."""
@@ -76,24 +77,39 @@ class Socket(object):
self._socket = None self._socket = None
self._connection_pid = active_pid self._connection_pid = active_pid
# If the port is an integer, convert it into tcp://
port = self.port
if isinstance(port, int):
port = 'tcp://127.0.0.1:%d' % port
# If the port is None, then this is an intentional dummy;
# honor this. (For testing.)
if port is None:
return
# Okay, create the connection. # Okay, create the connection.
if self._context is None: if self._context is None:
self._context = zmq.Context() self._context = zmq.Context()
self._socket = self._context.socket(self._rw) self._socket = self._context.socket(self._rw)
if self._rw == zmq.REQ: if self._rw == zmq.REQ:
self._socket.connect('tcp://127.0.0.1:%d' % self.port) self._socket.connect(port)
else: else:
self._socket.bind('tcp://127.0.0.1:%d' % self.port) self._socket.bind(port)
def close(self): def close(self):
"""Disconnect and tear down.""" """Disconnect and tear down."""
self._socket.close() if self._socket:
self._socket.close()
self._socket = None self._socket = None
self._context = None self._context = None
def publish(self, message): def publish(self, message):
"""Publish a message over the socket.""" """Publish a message over the socket."""
# If the port is None, no-op.
if self.port is None:
return
# If we are not connected, whine. # If we are not connected, whine.
if not self.is_connected: if not self.is_connected:
raise RuntimeError('Cannot publish a message when not connected ' raise RuntimeError('Cannot publish a message when not connected '
@@ -124,6 +140,10 @@ class Socket(object):
"""Retrieve a single message from the subcription channel """Retrieve a single message from the subcription channel
and return it. and return it.
""" """
# If the port is None, no-op.
if self.port is None:
raise StopIteration
# If we are not connected, whine. # If we are not connected, whine.
if not self.is_connected: if not self.is_connected:
raise RuntimeError('Cannot publish a message when not connected ' raise RuntimeError('Cannot publish a message when not connected '
@@ -135,6 +155,8 @@ class Socket(object):
# Actually listen to the socket. # Actually listen to the socket.
while True: while True:
message = self._socket.recv_json() try:
yield message message = self._socket.recv_json()
self._socket.send('1') yield message
finally:
self._socket.send('1')

View File

@@ -495,12 +495,12 @@ else:
INTERNAL_API_URL = 'http://127.0.0.1:8000' INTERNAL_API_URL = 'http://127.0.0.1:8000'
# ZeroMQ callback settings. # ZeroMQ callback settings.
CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556" CALLBACK_CONSUMER_PORT = 5556
CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc" CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc"
TASK_COMMAND_PORT = "tcp://127.0.0.1:6559" TASK_COMMAND_PORT = 6559
SOCKETIO_NOTIFICATION_PORT = "tcp://127.0.0.1:6557" SOCKETIO_NOTIFICATION_PORT = 6557
SOCKETIO_LISTEN_PORT = 8080 SOCKETIO_LISTEN_PORT = 8080
ORG_ADMINS_CAN_SEE_ALL_USERS = True ORG_ADMINS_CAN_SEE_ALL_USERS = True