diff --git a/awx/main/channels.py b/awx/main/channels.py index 60bde215c2..3d287d6622 100644 --- a/awx/main/channels.py +++ b/awx/main/channels.py @@ -75,8 +75,13 @@ class WebsocketTask(): if not self.channel_layer: self.channel_layer = get_channel_layer() - if attempt > 0: - await asyncio.sleep(5) + try: + if attempt > 0: + await asyncio.sleep(5) + except asyncio.CancelledError: + logger.warn(f"{self.name} connection to {self.remote_host} cancelled") + raise + uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/" timeout = aiohttp.ClientTimeout(total=10) @@ -87,16 +92,23 @@ class WebsocketTask(): async with session.ws_connect(uri, ssl=self.verify_ssl) as websocket: attempt = 0 await self.run_loop(websocket) + except asyncio.CancelledError: + # TODO: Check if connected and disconnect + # Possibly use run_until_complete() if disconnect is async + logger.warn(f"{self.name} connection to {self.remote_host} cancelled") + raise except Exception as e: # Early on, this is our canary. I'm not sure what exceptions we can really encounter. # Does aiohttp throws an exception if a disconnect happens? logger.warn("Websocket broadcast client exception {}".format(e)) - finally: # Reconnect self.start(attempt=attempt+1) def start(self, attempt=0): - self.event_loop.create_task(self.connect(attempt=attempt)) + self.async_task = self.event_loop.create_task(self.connect(attempt=attempt)) + + def cancel(self): + self.async_task.cancel() class BroadcastWebsocketTask(WebsocketTask): @@ -121,16 +133,45 @@ class BroadcastWebsocketTask(WebsocketTask): await self.channel_layer.group_send(group, {"type": "internal.message", "text": message}) +class BroadcastWebsocketManager(object): + def __init__(self): + self.event_loop = asyncio.get_event_loop() + self.broadcast_tasks = dict() + + async def run_loop(self): + local_hostname = get_local_host() + + while True: + future_remote_hosts = get_broadcast_hosts() + current_remote_hosts = self.broadcast_tasks.keys() + deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) + new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) + + if deleted_remote_hosts: + logger.warn(f"{local_hostname} going to remove {deleted_remote_hosts} from the websocket broadcast list") + if new_remote_hosts: + logger.warn(f"{local_hostname} going to add {new_remote_hosts} to the websocket broadcast list") + + for h in deleted_remote_hosts: + self.broadcast_tasks[h].cancel() + del self.broadcast_tasks[h] + + for h in new_remote_hosts: + broadcast_task = BroadcastWebsocketTask(name=local_hostname, + event_loop=self.event_loop, + remote_host=h) + broadcast_task.start() + self.broadcast_tasks[h] = broadcast_task + + await asyncio.sleep(settings.BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS) + + def start(self): + self.async_task = self.event_loop.create_task(self.run_loop()) + + class RedisGroupBroadcastChannelLayer(RedisChannelLayer): def __init__(self, *args, **kwargs): super(RedisGroupBroadcastChannelLayer, self).__init__(*args, **kwargs) - remote_hosts = get_broadcast_hosts() - loop = asyncio.get_event_loop() - local_hostname = get_local_host() - - broadcast_tasks = [BroadcastWebsocketTask(name=local_hostname, - event_loop=loop, - remote_host=h) for h in remote_hosts] - - [t.start() for t in broadcast_tasks] + broadcast_websocket_mgr = BroadcastWebsocketManager() + broadcast_websocket_mgr.start() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 798f8ba131..def4b9135e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1250,3 +1250,6 @@ BROADCAST_WEBSOCKETS_VERIFY_CERT = False # Connect to other AWX nodes using http or https BROADCAST_WEBSOCKETS_PROTOCOL = 'https' + +# How often websocket process will look for changes in the Instance table +BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS = 10