From 99b8b8e193788b88ffbe71282c8c6777ddf2ba38 Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Fri, 21 Nov 2014 08:43:54 -0600 Subject: [PATCH] Beginning work on reintroducing ZeroMQ. --- awx/main/queue.py | 51 +--------------------- awx/main/tests/socket.py | 94 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 50 deletions(-) create mode 100644 awx/main/tests/socket.py diff --git a/awx/main/queue.py b/awx/main/queue.py index 8ea30702f6..54102b6c2f 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -8,7 +8,7 @@ from redis import StrictRedis from django.conf import settings -__all__ = ['FifoQueue', 'PubSub'] +__all__ = ['FifoQueue'] # Determine, based on settings.BROKER_URL (for celery), what the correct Redis @@ -66,52 +66,3 @@ class FifoQueue(object): answer = redis.lpop(self._queue_name) if answer: return json.loads(answer) - - -class PubSub(object): - """An abstraction class implemented for pubsub. - - Intended to allow alteration of backend details in a single, consistent - way throughout the Tower application. - """ - def __init__(self, queue_name): - """Instantiate a pubsub object, which is able to interact with a - Redis key as a pubsub. - - Ideally this should be used with `contextmanager.closing` to ensure - well-behavedness: - - from contextlib import closing - - with closing(PubSub('foobar')) as foobar: - for message in foobar.subscribe(wait=0.1): - - """ - self._queue_name = queue_name - self._ps = redis.pubsub(ignore_subscribe_messages=True) - self._ps.subscribe(queue_name) - - def publish(self, message): - """Publish a message to the given queue.""" - redis.publish(self._queue_name, json.dumps(message)) - - def retrieve(self): - """Retrieve a single message from the subcription channel - and return it. - """ - return self._ps.get_message() - - def subscribe(self, wait=0.001): - """Listen to content from the subscription channel indefinitely, - and yield messages as they are retrieved. - """ - while True: - message = self.retrieve() - if message is None: - time.sleep(max(wait, 0.001)) - else: - yield json.loads(message['data']) - - def close(self): - """Close the pubsub connection.""" - self._ps.close() diff --git a/awx/main/tests/socket.py b/awx/main/tests/socket.py new file mode 100644 index 0000000000..f445aa3ff5 --- /dev/null +++ b/awx/main/tests/socket.py @@ -0,0 +1,94 @@ +# Copyright (c) 2014, Ansible, Inc. +# All Rights Reserved. + +import os + +import zmq + +class Socket(object): + """An abstraction class implemented for a dumb OS socket. + + Intended to allow alteration of backend details in a single, consistent + way throughout the Tower application. + """ + ports = { + 'callbacks': , + 'websocket': , + } + + def __init__(self, bucket, rw, debug=0, logger=None): + """Instantiate a Socket object, which uses ZeroMQ to actually perform + passing a message back and forth. + """ + self._bucket = bucket + self._rw = { + 'r': zmq.REP, + 'w': zmq.REQ, + }[rw.lower()] + + self._connection_pid = None + self._context = None + self._socket = None + + self._debug = debug + self._logger = logger + + def __enter__(self): + self.connect() + return self + + @property + def port(self): + return self.ports[self._bucket] + + def connect(self, purpose): + """Connect to ZeroMQ.""" + + # Make sure that we are clearing everything out if there is + # a problem; PID crossover can cause bad news. + active_pid = os.getpid() + if self._connection_pid is None: + self._connection_pid = active_pid + if self._connection_pid != active_pid: + self._context = None + self._socket = None + self._connection_pid = active_pid + + # Okay, create the connection. + if self._context is None: + self._context = zmq.Context() + self._socket = self._context.socket(purpose) + if purpose == self.WRITE: + self._socket.connect(self.port) + else: + self._socket.bind(self.port) + + def publish(self, message): + """Publish a message over the socket.""" + + # If we are in debug mode; provide the PID. + if self._debug: + message.update({'pid': os.getpid(), + 'connection_pid': self._connection_pid}) + + # Send the message. + for retry in xrange(4): + try: + self.connect() + self._socket.send_json(message) + self._socket.recv() + except Exception as ex: + if self._logger: + self._logger.info('Publish Exception: %r; retry=%d', + ex, retry, exc_info=True) + if retry >= 3: + raise + + def listen(self): + """Retrieve a single message from the subcription channel + and return it. + """ + while True: + message = self._socket.recv_json() + yield message + self._socket.send('1')