Beginning work on reintroducing ZeroMQ.

This commit is contained in:
Luke Sneeringer 2014-11-21 08:43:54 -06:00
parent ee0e549978
commit 99b8b8e193
2 changed files with 95 additions and 50 deletions

View File

@ -8,7 +8,7 @@ from redis import StrictRedis
from django.conf import settings
__all__ = ['FifoQueue', 'PubSub']
__all__ = ['FifoQueue']
# Determine, based on settings.BROKER_URL (for celery), what the correct Redis
@ -66,52 +66,3 @@ class FifoQueue(object):
answer = redis.lpop(self._queue_name)
if answer:
return json.loads(answer)
class PubSub(object):
"""An abstraction class implemented for pubsub.
Intended to allow alteration of backend details in a single, consistent
way throughout the Tower application.
"""
def __init__(self, queue_name):
"""Instantiate a pubsub object, which is able to interact with a
Redis key as a pubsub.
Ideally this should be used with `contextmanager.closing` to ensure
well-behavedness:
from contextlib import closing
with closing(PubSub('foobar')) as foobar:
for message in foobar.subscribe(wait=0.1):
<deal with message>
"""
self._queue_name = queue_name
self._ps = redis.pubsub(ignore_subscribe_messages=True)
self._ps.subscribe(queue_name)
def publish(self, message):
"""Publish a message to the given queue."""
redis.publish(self._queue_name, json.dumps(message))
def retrieve(self):
"""Retrieve a single message from the subcription channel
and return it.
"""
return self._ps.get_message()
def subscribe(self, wait=0.001):
"""Listen to content from the subscription channel indefinitely,
and yield messages as they are retrieved.
"""
while True:
message = self.retrieve()
if message is None:
time.sleep(max(wait, 0.001))
else:
yield json.loads(message['data'])
def close(self):
"""Close the pubsub connection."""
self._ps.close()

94
awx/main/tests/socket.py Normal file
View File

@ -0,0 +1,94 @@
# Copyright (c) 2014, Ansible, Inc.
# All Rights Reserved.
import os
import zmq
class Socket(object):
"""An abstraction class implemented for a dumb OS socket.
Intended to allow alteration of backend details in a single, consistent
way throughout the Tower application.
"""
ports = {
'callbacks': ,
'websocket': ,
}
def __init__(self, bucket, rw, debug=0, logger=None):
"""Instantiate a Socket object, which uses ZeroMQ to actually perform
passing a message back and forth.
"""
self._bucket = bucket
self._rw = {
'r': zmq.REP,
'w': zmq.REQ,
}[rw.lower()]
self._connection_pid = None
self._context = None
self._socket = None
self._debug = debug
self._logger = logger
def __enter__(self):
self.connect()
return self
@property
def port(self):
return self.ports[self._bucket]
def connect(self, purpose):
"""Connect to ZeroMQ."""
# Make sure that we are clearing everything out if there is
# a problem; PID crossover can cause bad news.
active_pid = os.getpid()
if self._connection_pid is None:
self._connection_pid = active_pid
if self._connection_pid != active_pid:
self._context = None
self._socket = None
self._connection_pid = active_pid
# Okay, create the connection.
if self._context is None:
self._context = zmq.Context()
self._socket = self._context.socket(purpose)
if purpose == self.WRITE:
self._socket.connect(self.port)
else:
self._socket.bind(self.port)
def publish(self, message):
"""Publish a message over the socket."""
# If we are in debug mode; provide the PID.
if self._debug:
message.update({'pid': os.getpid(),
'connection_pid': self._connection_pid})
# Send the message.
for retry in xrange(4):
try:
self.connect()
self._socket.send_json(message)
self._socket.recv()
except Exception as ex:
if self._logger:
self._logger.info('Publish Exception: %r; retry=%d',
ex, retry, exc_info=True)
if retry >= 3:
raise
def listen(self):
"""Retrieve a single message from the subcription channel
and return it.
"""
while True:
message = self._socket.recv_json()
yield message
self._socket.send('1')