mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 17:37:37 -02:30
Proof of concept Redis PubSub.
This commit is contained in:
@@ -251,7 +251,9 @@ def process_graph(graph, task_capacity):
|
|||||||
print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume)))
|
print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume)))
|
||||||
|
|
||||||
def run_taskmanager(command_port):
|
def run_taskmanager(command_port):
|
||||||
''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks '''
|
"""Receive task start and finish signals to rebuild a dependency graph
|
||||||
|
and manage the actual running of tasks.
|
||||||
|
"""
|
||||||
def shutdown_handler():
|
def shutdown_handler():
|
||||||
def _handler(signum, frame):
|
def _handler(signum, frame):
|
||||||
signal.signal(signum, signal.SIG_DFL)
|
signal.signal(signum, signal.SIG_DFL)
|
||||||
|
|||||||
@@ -28,3 +28,51 @@ class FifoQueue(object):
|
|||||||
"""Retrieve a value from the left side of the queue."""
|
"""Retrieve a value from the left side of the queue."""
|
||||||
return json.loads(redis.lpop(self._queue_name))
|
return json.loads(redis.lpop(self._queue_name))
|
||||||
|
|
||||||
|
|
||||||
|
class PubSub(object):
|
||||||
|
"""An abstraction class implemented for pubsub.
|
||||||
|
|
||||||
|
Intended to allow alteration of backend details in a single, consistent
|
||||||
|
way throughout the Tower application.
|
||||||
|
"""
|
||||||
|
def __init__(self, queue_name):
|
||||||
|
"""Instantiate a pubsub object, which is able to interact with a
|
||||||
|
Redis key as a pubsub.
|
||||||
|
|
||||||
|
Ideally this should be used with `contextmanager.closing` to ensure
|
||||||
|
well-behavedness:
|
||||||
|
|
||||||
|
from contextmanager import closing
|
||||||
|
|
||||||
|
with closing(PubSub('foobar')) as foobar:
|
||||||
|
for message in foobar.listen(wait=0.1):
|
||||||
|
<deal with message>
|
||||||
|
"""
|
||||||
|
self._queue_name = queue_name
|
||||||
|
self._ps = redis.pubsub(ignore_subscribe_messages=True)
|
||||||
|
self._ps.subscribe(queue_name)
|
||||||
|
|
||||||
|
def publish(self, message):
|
||||||
|
"""Publish a message to the given queue."""
|
||||||
|
redis.publish(self._queue_name, json.dumps(message))
|
||||||
|
|
||||||
|
def retrieve(self):
|
||||||
|
"""Retrieve a single message from the subcription channel
|
||||||
|
and return it.
|
||||||
|
"""
|
||||||
|
return self._ps.get_message()
|
||||||
|
|
||||||
|
def listen(self, wait=0.001):
|
||||||
|
"""Listen to content from the subscription channel indefinitely,
|
||||||
|
and yield messages as they are retrieved.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
message = self.retrieve()
|
||||||
|
if message is None:
|
||||||
|
time.sleep(max(wait, 0.001))
|
||||||
|
else:
|
||||||
|
yield json.loads(message)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Close the pubsub connection."""
|
||||||
|
self._ps.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user