From 255c2e4172d5187b706c29d9fbaf564b5954df59 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Thu, 22 Jun 2023 10:48:48 -0500 Subject: [PATCH] [wsrelay] Give connection tasks time to clean up When we close/cancel a connection to a web node, give the task time to clean up after itself and cleanly exit. Otherwise, the Python GC might clean up the task too early and this leads to ugly log messages like this: "Task was destroyed but it is pending!" Signed-off-by: Rick Elrod --- awx/main/wsrelay.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index c3ab95836e..09d7dfe92b 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -245,13 +245,13 @@ class WebSocketRelayManager(object): if action == "online": self.known_hosts[hostname] = ip elif action == "offline": - self.cleanup_offline_host(hostname) + await self.cleanup_offline_host(hostname) except Exception as e: # This catch-all is the same as the one above. asyncio will eat the exception # but we want to know about it. logger.exception(f"on_ws_heartbeat exception: {e}") - def cleanup_offline_host(self, hostname): + async def cleanup_offline_host(self, hostname): """ Given a hostname, try to cancel its task/connection and remove it from the list of hosts we know about. @@ -260,6 +260,19 @@ class WebSocketRelayManager(object): """ if hostname in self.relay_connections: self.relay_connections[hostname].cancel() + + # Wait for the task to actually run its cancel/completion logic + # otherwise it might get GC'd too early when we del it below. + # Being GC'd too early could generate a scary message in logs: + # "Task was destroyed but it is pending!" + try: + await asyncio.wait_for(self.relay_connections[hostname].async_task, timeout=10) + except asyncio.TimeoutError: + logger.warning(f"Tried to cancel relay connection for {hostname} but it timed out during cleanup.") + except asyncio.CancelledError: + # Handle the case where the task was already cancelled by the time we got here. + pass + del self.relay_connections[hostname] if hostname in self.known_hosts: @@ -314,13 +327,11 @@ class WebSocketRelayManager(object): if deleted_remote_hosts: logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") + await asyncio.gather(self.cleanup_offline_host(h) for h in deleted_remote_hosts) if new_remote_hosts: logger.info(f"Adding {new_remote_hosts} to websocket broadcast list") - for h in deleted_remote_hosts: - self.cleanup_offline_host(h) - for h in new_remote_hosts: stats = self.stats_mgr.new_remote_host_stats(h) relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])