[wsrelay] Handle heartbeet shutdown and redis drop (#13991)

This fixes two different exceptions in wsrelay.

* One resulted from heartbeet getting ability in #13858 to gracefully
  shut down. When we saw the message come through, we didn't fully
  clean up the connection to the web node.

* The second resulted when Redis disappeared. We still want to exit in
  that case, but it's better to log a message and exit gracefully
  instead of crashing out.

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod
2023-05-15 10:46:23 -05:00
committed by GitHub
parent e434b1e0f3
commit f3fa75d832

View File

@@ -5,6 +5,7 @@ from typing import Dict
import aiohttp import aiohttp
from aiohttp import client_exceptions from aiohttp import client_exceptions
import aioredis
from channels.layers import get_channel_layer from channels.layers import get_channel_layer
@@ -180,6 +181,9 @@ class WebsocketRelayConnection:
return return
continue continue
except aioredis.errors.ConnectionClosedError:
logger.info(f"Producer {name} lost connection to Redis, shutting down.")
return
await websocket.send_json(wrap_broadcast_msg(group, msg)) await websocket.send_json(wrap_broadcast_msg(group, msg))
except ConnectionResetError: except ConnectionResetError:
@@ -230,28 +234,51 @@ class WebSocketRelayManager(object):
if ip is None: if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves # If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname ip = hostname
if hostname is None or ip is None: if ip is None:
logger.warning(f"Received invalid online heartbeet, missing hostname or ip: {payload}") logger.warning(f"Received invalid online heartbeet, missing hostname and ip: {payload}")
return return
self.known_hosts[hostname] = ip self.known_hosts[hostname] = ip
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.") logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
elif payload.get("action") == "offline": elif payload.get("action") == "offline":
hostname = payload.get("hostname") hostname = payload.get("hostname")
if hostname is None: ip = payload.get("ip")
logger.warning(f"Received invalid offline heartbeet, missing hostname: {payload}") if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
if ip is None:
logger.warning(f"Received invalid offline heartbeet, missing hostname and ip: {payload}")
return return
del self.known_hosts[hostname] self.cleanup_offline_host(ip)
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e: except Exception as e:
# This catch-all is the same as the one above. asyncio will eat the exception # This catch-all is the same as the one above. asyncio will eat the exception
# but we want to know about it. # but we want to know about it.
logger.exception(f"on_heartbeet exception: {e}") logger.exception(f"on_heartbeet exception: {e}")
def cleanup_offline_host(self, hostname):
"""
Given a hostname, try to cancel its task/connection and remove it from
the list of hosts we know about.
If the host isn't in the list, assume that it was already deleted and
don't error.
"""
if hostname in self.relay_connections:
self.relay_connections[hostname].cancel()
del self.relay_connections[hostname]
if hostname in self.known_hosts:
del self.known_hosts[hostname]
try:
self.stats_mgr.delete_remote_host_stats(hostname)
except KeyError:
pass
async def run(self): async def run(self):
event_loop = asyncio.get_running_loop() event_loop = asyncio.get_running_loop()
stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname) self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname)
stats_mgr.start() self.stats_mgr.start()
# Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully. # Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully.
database_conf = settings.DATABASES['default'] database_conf = settings.DATABASES['default']
@@ -268,7 +295,6 @@ class WebSocketRelayManager(object):
# Establishes a websocket connection to /websocket/relay on all API servers # Establishes a websocket connection to /websocket/relay on all API servers
while True: while True:
# logger.info("Current known hosts: {}".format(self.known_hosts))
future_remote_hosts = self.known_hosts.keys() future_remote_hosts = self.known_hosts.keys()
current_remote_hosts = self.relay_connections.keys() current_remote_hosts = self.relay_connections.keys()
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
@@ -297,13 +323,10 @@ class WebSocketRelayManager(object):
logger.info(f"Adding {new_remote_hosts} to websocket broadcast list") logger.info(f"Adding {new_remote_hosts} to websocket broadcast list")
for h in deleted_remote_hosts: for h in deleted_remote_hosts:
self.relay_connections[h].cancel() self.cleanup_offline_host(h)
del self.relay_connections[h]
del self.known_hosts[h]
stats_mgr.delete_remote_host_stats(h)
for h in new_remote_hosts: for h in new_remote_hosts:
stats = stats_mgr.new_remote_host_stats(h) stats = self.stats_mgr.new_remote_host_stats(h)
relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])
relay_connection.start() relay_connection.start()
self.relay_connections[h] = relay_connection self.relay_connections[h] = relay_connection