diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index c3ab95836e..09d7dfe92b 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -245,13 +245,13 @@ class WebSocketRelayManager(object): if action == "online": self.known_hosts[hostname] = ip elif action == "offline": - self.cleanup_offline_host(hostname) + await self.cleanup_offline_host(hostname) except Exception as e: # This catch-all is the same as the one above. asyncio will eat the exception # but we want to know about it. logger.exception(f"on_ws_heartbeat exception: {e}") - def cleanup_offline_host(self, hostname): + async def cleanup_offline_host(self, hostname): """ Given a hostname, try to cancel its task/connection and remove it from the list of hosts we know about. @@ -260,6 +260,19 @@ class WebSocketRelayManager(object): """ if hostname in self.relay_connections: self.relay_connections[hostname].cancel() + + # Wait for the task to actually run its cancel/completion logic + # otherwise it might get GC'd too early when we del it below. + # Being GC'd too early could generate a scary message in logs: + # "Task was destroyed but it is pending!" + try: + await asyncio.wait_for(self.relay_connections[hostname].async_task, timeout=10) + except asyncio.TimeoutError: + logger.warning(f"Tried to cancel relay connection for {hostname} but it timed out during cleanup.") + except asyncio.CancelledError: + # Handle the case where the task was already cancelled by the time we got here. + pass + del self.relay_connections[hostname] if hostname in self.known_hosts: @@ -314,13 +327,11 @@ class WebSocketRelayManager(object): if deleted_remote_hosts: logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") + await asyncio.gather(self.cleanup_offline_host(h) for h in deleted_remote_hosts) if new_remote_hosts: logger.info(f"Adding {new_remote_hosts} to websocket broadcast list") - for h in deleted_remote_hosts: - self.cleanup_offline_host(h) - for h in new_remote_hosts: stats = self.stats_mgr.new_remote_host_stats(h) relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])