diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 9cc82c318d..65ccf3537b 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -5,6 +5,7 @@ from typing import Dict import aiohttp from aiohttp import client_exceptions +import aioredis from channels.layers import get_channel_layer @@ -180,6 +181,9 @@ class WebsocketRelayConnection: return continue + except aioredis.errors.ConnectionClosedError: + logger.info(f"Producer {name} lost connection to Redis, shutting down.") + return await websocket.send_json(wrap_broadcast_msg(group, msg)) except ConnectionResetError: @@ -230,28 +234,51 @@ class WebSocketRelayManager(object): if ip is None: # If we don't get an IP, just try the hostname, maybe it resolves ip = hostname - if hostname is None or ip is None: - logger.warning(f"Received invalid online heartbeet, missing hostname or ip: {payload}") + if ip is None: + logger.warning(f"Received invalid online heartbeet, missing hostname and ip: {payload}") return self.known_hosts[hostname] = ip logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.") elif payload.get("action") == "offline": hostname = payload.get("hostname") - if hostname is None: - logger.warning(f"Received invalid offline heartbeet, missing hostname: {payload}") + ip = payload.get("ip") + if ip is None: + # If we don't get an IP, just try the hostname, maybe it resolves + ip = hostname + if ip is None: + logger.warning(f"Received invalid offline heartbeet, missing hostname and ip: {payload}") return - del self.known_hosts[hostname] + self.cleanup_offline_host(ip) logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") except Exception as e: # This catch-all is the same as the one above. asyncio will eat the exception # but we want to know about it. logger.exception(f"on_heartbeet exception: {e}") + def cleanup_offline_host(self, hostname): + """ + Given a hostname, try to cancel its task/connection and remove it from + the list of hosts we know about. + If the host isn't in the list, assume that it was already deleted and + don't error. + """ + if hostname in self.relay_connections: + self.relay_connections[hostname].cancel() + del self.relay_connections[hostname] + + if hostname in self.known_hosts: + del self.known_hosts[hostname] + + try: + self.stats_mgr.delete_remote_host_stats(hostname) + except KeyError: + pass + async def run(self): event_loop = asyncio.get_running_loop() - stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname) - stats_mgr.start() + self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname) + self.stats_mgr.start() # Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully. database_conf = settings.DATABASES['default'] @@ -268,7 +295,6 @@ class WebSocketRelayManager(object): # Establishes a websocket connection to /websocket/relay on all API servers while True: - # logger.info("Current known hosts: {}".format(self.known_hosts)) future_remote_hosts = self.known_hosts.keys() current_remote_hosts = self.relay_connections.keys() deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) @@ -297,13 +323,10 @@ class WebSocketRelayManager(object): logger.info(f"Adding {new_remote_hosts} to websocket broadcast list") for h in deleted_remote_hosts: - self.relay_connections[h].cancel() - del self.relay_connections[h] - del self.known_hosts[h] - stats_mgr.delete_remote_host_stats(h) + self.cleanup_offline_host(h) for h in new_remote_hosts: - stats = stats_mgr.new_remote_host_stats(h) + stats = self.stats_mgr.new_remote_host_stats(h) relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) relay_connection.start() self.relay_connections[h] = relay_connection