From 91bf49cdb3db48046435b43de765eb1e14f7a58d Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Sun, 11 Dec 2022 01:34:12 -0600 Subject: [PATCH] Remove auto-reconnect logic from wsrelay We no longer need to do this from wsrelay, as it will automatically try to reconnect when it hears the next beacon from heartbeet. This also cleans up the logic for what we do when we want to delete a node we previously knew about. Signed-off-by: Rick Elrod --- awx/main/wsrelay.py | 55 ++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index e935850514..29d6218a93 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -54,11 +54,12 @@ class WebsocketRelayConnection: self.channel_layer = None self.subsystem_metrics = s_metrics.Metrics(instance_name=name) self.producers = dict() + self.connected = False async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): raise RuntimeError("Implement me") - async def connect(self, attempt): + async def connect(self): from awx.main.consumers import WebsocketSecretAuthHelper # noqa logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.") @@ -70,13 +71,6 @@ class WebsocketRelayConnection: if not self.channel_layer: self.channel_layer = get_channel_layer() - try: - if attempt > 0: - await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS) - except asyncio.CancelledError: - logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled") - raise - uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/" timeout = aiohttp.ClientTimeout(total=10) @@ -86,14 +80,12 @@ class WebsocketRelayConnection: async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket: logger.info(f"Connection from {self.name} to {self.remote_host} established.") self.stats.record_connection_established() - attempt = 0 + self.connected = True await self.run_connection(websocket) except asyncio.CancelledError: # TODO: Check if connected and disconnect # Possibly use run_until_complete() if disconnect is async logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.") - self.stats.record_connection_lost() - raise except client_exceptions.ClientConnectorError as e: logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True) except asyncio.TimeoutError: @@ -102,14 +94,13 @@ class WebsocketRelayConnection: # Early on, this is our canary. I'm not sure what exceptions we can really encounter. logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True) else: - logger.warning(f"Connection from {self.name} to {self.remote_host} list.") - - self.stats.record_connection_lost() - self.start(attempt=attempt + 1) - - def start(self, attempt=0): - self.async_task = self.event_loop.create_task(self.connect(attempt=attempt)) + logger.info(f"Connection from {self.name} to {self.remote_host} lost, but no exception was raised.") + finally: + self.connected = False + self.stats.record_connection_lost() + def start(self): + self.async_task = self.event_loop.create_task(self.connect()) return self.async_task def cancel(self): @@ -137,7 +128,6 @@ class WebsocketRelayConnection: origin_channel = payload['origin_channel'] if not self.producers.get(name): producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) - self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} logger.debug(f"Producer {name} started.") else: @@ -181,6 +171,12 @@ class WebsocketRelayConnection: continue await websocket.send_json(wrap_broadcast_msg(group, msg)) + except ConnectionResetError: + # This can be hit when a web node is scaling down and we try to write to it. + # There's really nothing to do in this case and it's a fairly typical thing to happen. + # We'll log it as debug, but it's not really a problem. + logger.debug(f"Producer {name} connection reset.") + pass except Exception: # Note, this is very intentional and important since we do not otherwise # ever check the result of this future. Without this line you will not see an error if @@ -231,7 +227,8 @@ class WebSocketRelayManager(object): del self.known_hosts[hostname] logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") except Exception as e: - # This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. + # This catch-all is the same as the one above. asyncio will eat the exception + # but we want to know about it. logger.exception(f"pg_consumer exception") async def run(self): @@ -261,12 +258,22 @@ class WebSocketRelayManager(object): deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) - remote_addresses = {k: v.remote_host for k, v in self.relay_connections.items()} + # This loop handles if we get an advertisement from a host we already know about but + # the advertisement has a different IP than we are currently connected to. for hostname, address in self.known_hosts.items(): - if hostname in self.relay_connections and address != remote_addresses[hostname]: + if hostname not in self.relay_connections: + # We've picked up a new hostname that we don't know about yet. + continue + + if address != self.relay_connections[hostname].remote_host: deleted_remote_hosts.add(hostname) new_remote_hosts.add(hostname) + # Delete any hosts with closed connections + for hostname, relay_conn in self.relay_connections.items(): + if not relay_conn.connected: + deleted_remote_hosts.add(hostname) + if deleted_remote_hosts: logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") @@ -276,6 +283,7 @@ class WebSocketRelayManager(object): for h in deleted_remote_hosts: self.relay_connections[h].cancel() del self.relay_connections[h] + del self.known_hosts[h] stats_mgr.delete_remote_host_stats(h) for h in new_remote_hosts: @@ -285,7 +293,4 @@ class WebSocketRelayManager(object): relay_connection.start() self.relay_connections[h] = relay_connection - # for host, conn in self.relay_connections.items(): - # logger.info(f"Current producers for {host}: {conn.producers}") - await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)