From 446f29927da1c5751fb389aa7d72cea0f7a22804 Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Wed, 3 Sep 2014 13:30:33 -0500 Subject: [PATCH] Proof of concept Redis PubSub. --- .../management/commands/run_task_system.py | 4 +- awx/main/queue.py | 48 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index c3cf1a1452..72aad3d0cb 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -251,7 +251,9 @@ def process_graph(graph, task_capacity): print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(command_port): - ''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks ''' + """Receive task start and finish signals to rebuild a dependency graph + and manage the actual running of tasks. + """ def shutdown_handler(): def _handler(signum, frame): signal.signal(signum, signal.SIG_DFL) diff --git a/awx/main/queue.py b/awx/main/queue.py index a99f34cc90..c29e805843 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -28,3 +28,51 @@ class FifoQueue(object): """Retrieve a value from the left side of the queue.""" return json.loads(redis.lpop(self._queue_name)) + +class PubSub(object): + """An abstraction class implemented for pubsub. + + Intended to allow alteration of backend details in a single, consistent + way throughout the Tower application. + """ + def __init__(self, queue_name): + """Instantiate a pubsub object, which is able to interact with a + Redis key as a pubsub. + + Ideally this should be used with `contextmanager.closing` to ensure + well-behavedness: + + from contextmanager import closing + + with closing(PubSub('foobar')) as foobar: + for message in foobar.listen(wait=0.1): + + """ + self._queue_name = queue_name + self._ps = redis.pubsub(ignore_subscribe_messages=True) + self._ps.subscribe(queue_name) + + def publish(self, message): + """Publish a message to the given queue.""" + redis.publish(self._queue_name, json.dumps(message)) + + def retrieve(self): + """Retrieve a single message from the subcription channel + and return it. + """ + return self._ps.get_message() + + def listen(self, wait=0.001): + """Listen to content from the subscription channel indefinitely, + and yield messages as they are retrieved. + """ + while True: + message = self.retrieve() + if message is None: + time.sleep(max(wait, 0.001)) + else: + yield json.loads(message) + + def close(self): + """Close the pubsub connection.""" + self._ps.close()