mirror of
https://github.com/ansible/awx.git
synced 2026-01-21 22:48:02 -03:30
websockets aware of Instance changes
* New tower nodes that are (de)registered in the Instance table are seen by the websocket layer and connected to or disconnected from by the websocket broadcast backplane using a polling mechanism. * This is especially useful for openshift and kubernetes. This will be useful for standalone Tower in the future when the restarting of Tower services is not required.
This commit is contained in:
parent
c06b6306ab
commit
03b73027e8
@ -75,8 +75,13 @@ class WebsocketTask():
|
||||
if not self.channel_layer:
|
||||
self.channel_layer = get_channel_layer()
|
||||
|
||||
if attempt > 0:
|
||||
await asyncio.sleep(5)
|
||||
try:
|
||||
if attempt > 0:
|
||||
await asyncio.sleep(5)
|
||||
except asyncio.CancelledError:
|
||||
logger.warn(f"{self.name} connection to {self.remote_host} cancelled")
|
||||
raise
|
||||
|
||||
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/"
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
|
||||
@ -87,16 +92,23 @@ class WebsocketTask():
|
||||
async with session.ws_connect(uri, ssl=self.verify_ssl) as websocket:
|
||||
attempt = 0
|
||||
await self.run_loop(websocket)
|
||||
except asyncio.CancelledError:
|
||||
# TODO: Check if connected and disconnect
|
||||
# Possibly use run_until_complete() if disconnect is async
|
||||
logger.warn(f"{self.name} connection to {self.remote_host} cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
|
||||
# Does aiohttp throws an exception if a disconnect happens?
|
||||
logger.warn("Websocket broadcast client exception {}".format(e))
|
||||
finally:
|
||||
# Reconnect
|
||||
self.start(attempt=attempt+1)
|
||||
|
||||
def start(self, attempt=0):
|
||||
self.event_loop.create_task(self.connect(attempt=attempt))
|
||||
self.async_task = self.event_loop.create_task(self.connect(attempt=attempt))
|
||||
|
||||
def cancel(self):
|
||||
self.async_task.cancel()
|
||||
|
||||
|
||||
class BroadcastWebsocketTask(WebsocketTask):
|
||||
@ -121,16 +133,45 @@ class BroadcastWebsocketTask(WebsocketTask):
|
||||
await self.channel_layer.group_send(group, {"type": "internal.message", "text": message})
|
||||
|
||||
|
||||
class BroadcastWebsocketManager(object):
|
||||
def __init__(self):
|
||||
self.event_loop = asyncio.get_event_loop()
|
||||
self.broadcast_tasks = dict()
|
||||
|
||||
async def run_loop(self):
|
||||
local_hostname = get_local_host()
|
||||
|
||||
while True:
|
||||
future_remote_hosts = get_broadcast_hosts()
|
||||
current_remote_hosts = self.broadcast_tasks.keys()
|
||||
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
|
||||
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
|
||||
|
||||
if deleted_remote_hosts:
|
||||
logger.warn(f"{local_hostname} going to remove {deleted_remote_hosts} from the websocket broadcast list")
|
||||
if new_remote_hosts:
|
||||
logger.warn(f"{local_hostname} going to add {new_remote_hosts} to the websocket broadcast list")
|
||||
|
||||
for h in deleted_remote_hosts:
|
||||
self.broadcast_tasks[h].cancel()
|
||||
del self.broadcast_tasks[h]
|
||||
|
||||
for h in new_remote_hosts:
|
||||
broadcast_task = BroadcastWebsocketTask(name=local_hostname,
|
||||
event_loop=self.event_loop,
|
||||
remote_host=h)
|
||||
broadcast_task.start()
|
||||
self.broadcast_tasks[h] = broadcast_task
|
||||
|
||||
await asyncio.sleep(settings.BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS)
|
||||
|
||||
def start(self):
|
||||
self.async_task = self.event_loop.create_task(self.run_loop())
|
||||
|
||||
|
||||
class RedisGroupBroadcastChannelLayer(RedisChannelLayer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RedisGroupBroadcastChannelLayer, self).__init__(*args, **kwargs)
|
||||
|
||||
remote_hosts = get_broadcast_hosts()
|
||||
loop = asyncio.get_event_loop()
|
||||
local_hostname = get_local_host()
|
||||
|
||||
broadcast_tasks = [BroadcastWebsocketTask(name=local_hostname,
|
||||
event_loop=loop,
|
||||
remote_host=h) for h in remote_hosts]
|
||||
|
||||
[t.start() for t in broadcast_tasks]
|
||||
broadcast_websocket_mgr = BroadcastWebsocketManager()
|
||||
broadcast_websocket_mgr.start()
|
||||
|
||||
@ -1250,3 +1250,6 @@ BROADCAST_WEBSOCKETS_VERIFY_CERT = False
|
||||
|
||||
# Connect to other AWX nodes using http or https
|
||||
BROADCAST_WEBSOCKETS_PROTOCOL = 'https'
|
||||
|
||||
# How often websocket process will look for changes in the Instance table
|
||||
BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS = 10
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user