From 32df114e413e1839111cd81294d0c96aab36bca4 Mon Sep 17 00:00:00 2001 From: Hao Liu <44379968+TheRealHaoLiu@users.noreply.github.com> Date: Wed, 24 Jul 2024 12:05:02 -0400 Subject: [PATCH] Improve asyncio debugging (#15398) - use asyncio.get_running_loop() instead of passing around event_loops - add name to all of the asyncio tasks for easier debugging we are trying to figure out which task is ``` Task was destroyed but it is pending! task: wait_for=()]> cb=[RedisConnection.__init__..() at /var/lib/awx/venv/awx/lib64/python3.9/site-packages/aioredis/connection.py:168]> ``` is referring to --- awx/main/analytics/broadcast_websocket.py | 9 ++++---- awx/main/wsrelay.py | 25 +++++++++++++++-------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/awx/main/analytics/broadcast_websocket.py b/awx/main/analytics/broadcast_websocket.py index 21567402f3..1191318d1c 100644 --- a/awx/main/analytics/broadcast_websocket.py +++ b/awx/main/analytics/broadcast_websocket.py @@ -66,10 +66,8 @@ class FixedSlidingWindow: class RelayWebsocketStatsManager: - def __init__(self, event_loop, local_hostname): + def __init__(self, local_hostname): self._local_hostname = local_hostname - - self._event_loop = event_loop self._stats = dict() self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME @@ -94,7 +92,10 @@ class RelayWebsocketStatsManager: self.start() def start(self): - self.async_task = self._event_loop.create_task(self.run_loop()) + self.async_task = asyncio.get_running_loop().create_task( + self.run_loop(), + name='RelayWebsocketStatsManager.run_loop', + ) return self.async_task @classmethod diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index dced6c4e83..f6d6aa7335 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -47,7 +47,6 @@ class WebsocketRelayConnection: verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT, ): self.name = name - self.event_loop = asyncio.get_event_loop() self.stats = stats self.remote_host = remote_host self.remote_port = remote_port @@ -110,7 +109,10 @@ class WebsocketRelayConnection: self.stats.record_connection_lost() def start(self): - self.async_task = self.event_loop.create_task(self.connect()) + self.async_task = asyncio.get_running_loop().create_task( + self.connect(), + name=f"WebsocketRelayConnection.connect.{self.name}", + ) return self.async_task def cancel(self): @@ -121,7 +123,10 @@ class WebsocketRelayConnection: # metrics messages # the "metrics" group is not subscribed to in the typical fashion, so we # just explicitly create it - producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics")) + producer = asyncio.get_running_loop().create_task( + self.run_producer("metrics", websocket, "metrics"), + name="WebsocketRelayConnection.run_producer.metrics", + ) self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}} async for msg in websocket: self.stats.record_message_received() @@ -143,7 +148,10 @@ class WebsocketRelayConnection: name = f"{self.remote_host}-{group}" origin_channel = payload['origin_channel'] if not self.producers.get(name): - producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) + producer = asyncio.get_running_loop().create_task( + self.run_producer(name, websocket, group), + name=f"WebsocketRelayConnection.run_producer.{name}", + ) self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} logger.debug(f"Producer {name} started.") else: @@ -297,9 +305,7 @@ class WebSocketRelayManager(object): pass async def run(self): - event_loop = asyncio.get_running_loop() - - self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname) + self.stats_mgr = RelayWebsocketStatsManager(self.local_hostname) self.stats_mgr.start() database_conf = deepcopy(settings.DATABASES['default']) @@ -323,7 +329,10 @@ class WebSocketRelayManager(object): ) await async_conn.set_autocommit(True) - on_ws_heartbeat_task = event_loop.create_task(self.on_ws_heartbeat(async_conn)) + on_ws_heartbeat_task = asyncio.get_running_loop().create_task( + self.on_ws_heartbeat(async_conn), + name="WebSocketRelayManager.on_ws_heartbeat", + ) # Establishes a websocket connection to /websocket/relay on all API servers while True: