diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index e935850514..29d6218a93 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -54,11 +54,12 @@ class WebsocketRelayConnection: self.channel_layer = None self.subsystem_metrics = s_metrics.Metrics(instance_name=name) self.producers = dict() + self.connected = False async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): raise RuntimeError("Implement me") - async def connect(self, attempt): + async def connect(self): from awx.main.consumers import WebsocketSecretAuthHelper # noqa logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.") @@ -70,13 +71,6 @@ class WebsocketRelayConnection: if not self.channel_layer: self.channel_layer = get_channel_layer() - try: - if attempt > 0: - await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS) - except asyncio.CancelledError: - logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled") - raise - uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/" timeout = aiohttp.ClientTimeout(total=10) @@ -86,14 +80,12 @@ class WebsocketRelayConnection: async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket: logger.info(f"Connection from {self.name} to {self.remote_host} established.") self.stats.record_connection_established() - attempt = 0 + self.connected = True await self.run_connection(websocket) except asyncio.CancelledError: # TODO: Check if connected and disconnect # Possibly use run_until_complete() if disconnect is async logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.") - self.stats.record_connection_lost() - raise except client_exceptions.ClientConnectorError as e: logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True) except asyncio.TimeoutError: @@ -102,14 +94,13 @@ class WebsocketRelayConnection: # Early on, this is our canary. I'm not sure what exceptions we can really encounter. logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True) else: - logger.warning(f"Connection from {self.name} to {self.remote_host} list.") - - self.stats.record_connection_lost() - self.start(attempt=attempt + 1) - - def start(self, attempt=0): - self.async_task = self.event_loop.create_task(self.connect(attempt=attempt)) + logger.info(f"Connection from {self.name} to {self.remote_host} lost, but no exception was raised.") + finally: + self.connected = False + self.stats.record_connection_lost() + def start(self): + self.async_task = self.event_loop.create_task(self.connect()) return self.async_task def cancel(self): @@ -137,7 +128,6 @@ class WebsocketRelayConnection: origin_channel = payload['origin_channel'] if not self.producers.get(name): producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) - self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} logger.debug(f"Producer {name} started.") else: @@ -181,6 +171,12 @@ class WebsocketRelayConnection: continue await websocket.send_json(wrap_broadcast_msg(group, msg)) + except ConnectionResetError: + # This can be hit when a web node is scaling down and we try to write to it. + # There's really nothing to do in this case and it's a fairly typical thing to happen. + # We'll log it as debug, but it's not really a problem. + logger.debug(f"Producer {name} connection reset.") + pass except Exception: # Note, this is very intentional and important since we do not otherwise # ever check the result of this future. Without this line you will not see an error if @@ -231,7 +227,8 @@ class WebSocketRelayManager(object): del self.known_hosts[hostname] logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") except Exception as e: - # This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. + # This catch-all is the same as the one above. asyncio will eat the exception + # but we want to know about it. logger.exception(f"pg_consumer exception") async def run(self): @@ -261,12 +258,22 @@ class WebSocketRelayManager(object): deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) - remote_addresses = {k: v.remote_host for k, v in self.relay_connections.items()} + # This loop handles if we get an advertisement from a host we already know about but + # the advertisement has a different IP than we are currently connected to. for hostname, address in self.known_hosts.items(): - if hostname in self.relay_connections and address != remote_addresses[hostname]: + if hostname not in self.relay_connections: + # We've picked up a new hostname that we don't know about yet. + continue + + if address != self.relay_connections[hostname].remote_host: deleted_remote_hosts.add(hostname) new_remote_hosts.add(hostname) + # Delete any hosts with closed connections + for hostname, relay_conn in self.relay_connections.items(): + if not relay_conn.connected: + deleted_remote_hosts.add(hostname) + if deleted_remote_hosts: logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") @@ -276,6 +283,7 @@ class WebSocketRelayManager(object): for h in deleted_remote_hosts: self.relay_connections[h].cancel() del self.relay_connections[h] + del self.known_hosts[h] stats_mgr.delete_remote_host_stats(h) for h in new_remote_hosts: @@ -285,7 +293,4 @@ class WebSocketRelayManager(object): relay_connection.start() self.relay_connections[h] = relay_connection - # for host, conn in self.relay_connections.items(): - # logger.info(f"Current producers for {host}: {conn.producers}") - await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)