mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 01:17:37 -02:30
Improve asyncio debugging (#15398)
- use asyncio.get_running_loop() instead of passing around event_loops - add name to all of the asyncio tasks for easier debugging we are trying to figure out which task is ``` Task was destroyed but it is pending! task: <Task pending name='Task-<id>' coro=<RedisConnection._read_data() done, defined at /var/lib/awx/venv/awx/lib64/python3.9/site-packages/aioredis/connection.py:180> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fba77bf1700>()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /var/lib/awx/venv/awx/lib64/python3.9/site-packages/aioredis/connection.py:168]> ``` is referring to
This commit is contained in:
@@ -66,10 +66,8 @@ class FixedSlidingWindow:
|
|||||||
|
|
||||||
|
|
||||||
class RelayWebsocketStatsManager:
|
class RelayWebsocketStatsManager:
|
||||||
def __init__(self, event_loop, local_hostname):
|
def __init__(self, local_hostname):
|
||||||
self._local_hostname = local_hostname
|
self._local_hostname = local_hostname
|
||||||
|
|
||||||
self._event_loop = event_loop
|
|
||||||
self._stats = dict()
|
self._stats = dict()
|
||||||
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME
|
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME
|
||||||
|
|
||||||
@@ -94,7 +92,10 @@ class RelayWebsocketStatsManager:
|
|||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.async_task = self._event_loop.create_task(self.run_loop())
|
self.async_task = asyncio.get_running_loop().create_task(
|
||||||
|
self.run_loop(),
|
||||||
|
name='RelayWebsocketStatsManager.run_loop',
|
||||||
|
)
|
||||||
return self.async_task
|
return self.async_task
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -47,7 +47,6 @@ class WebsocketRelayConnection:
|
|||||||
verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
|
verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
|
||||||
):
|
):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.event_loop = asyncio.get_event_loop()
|
|
||||||
self.stats = stats
|
self.stats = stats
|
||||||
self.remote_host = remote_host
|
self.remote_host = remote_host
|
||||||
self.remote_port = remote_port
|
self.remote_port = remote_port
|
||||||
@@ -110,7 +109,10 @@ class WebsocketRelayConnection:
|
|||||||
self.stats.record_connection_lost()
|
self.stats.record_connection_lost()
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.async_task = self.event_loop.create_task(self.connect())
|
self.async_task = asyncio.get_running_loop().create_task(
|
||||||
|
self.connect(),
|
||||||
|
name=f"WebsocketRelayConnection.connect.{self.name}",
|
||||||
|
)
|
||||||
return self.async_task
|
return self.async_task
|
||||||
|
|
||||||
def cancel(self):
|
def cancel(self):
|
||||||
@@ -121,7 +123,10 @@ class WebsocketRelayConnection:
|
|||||||
# metrics messages
|
# metrics messages
|
||||||
# the "metrics" group is not subscribed to in the typical fashion, so we
|
# the "metrics" group is not subscribed to in the typical fashion, so we
|
||||||
# just explicitly create it
|
# just explicitly create it
|
||||||
producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics"))
|
producer = asyncio.get_running_loop().create_task(
|
||||||
|
self.run_producer("metrics", websocket, "metrics"),
|
||||||
|
name="WebsocketRelayConnection.run_producer.metrics",
|
||||||
|
)
|
||||||
self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}}
|
self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}}
|
||||||
async for msg in websocket:
|
async for msg in websocket:
|
||||||
self.stats.record_message_received()
|
self.stats.record_message_received()
|
||||||
@@ -143,7 +148,10 @@ class WebsocketRelayConnection:
|
|||||||
name = f"{self.remote_host}-{group}"
|
name = f"{self.remote_host}-{group}"
|
||||||
origin_channel = payload['origin_channel']
|
origin_channel = payload['origin_channel']
|
||||||
if not self.producers.get(name):
|
if not self.producers.get(name):
|
||||||
producer = self.event_loop.create_task(self.run_producer(name, websocket, group))
|
producer = asyncio.get_running_loop().create_task(
|
||||||
|
self.run_producer(name, websocket, group),
|
||||||
|
name=f"WebsocketRelayConnection.run_producer.{name}",
|
||||||
|
)
|
||||||
self.producers[name] = {"task": producer, "subscriptions": {origin_channel}}
|
self.producers[name] = {"task": producer, "subscriptions": {origin_channel}}
|
||||||
logger.debug(f"Producer {name} started.")
|
logger.debug(f"Producer {name} started.")
|
||||||
else:
|
else:
|
||||||
@@ -297,9 +305,7 @@ class WebSocketRelayManager(object):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
event_loop = asyncio.get_running_loop()
|
self.stats_mgr = RelayWebsocketStatsManager(self.local_hostname)
|
||||||
|
|
||||||
self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname)
|
|
||||||
self.stats_mgr.start()
|
self.stats_mgr.start()
|
||||||
|
|
||||||
database_conf = deepcopy(settings.DATABASES['default'])
|
database_conf = deepcopy(settings.DATABASES['default'])
|
||||||
@@ -323,7 +329,10 @@ class WebSocketRelayManager(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
await async_conn.set_autocommit(True)
|
await async_conn.set_autocommit(True)
|
||||||
on_ws_heartbeat_task = event_loop.create_task(self.on_ws_heartbeat(async_conn))
|
on_ws_heartbeat_task = asyncio.get_running_loop().create_task(
|
||||||
|
self.on_ws_heartbeat(async_conn),
|
||||||
|
name="WebSocketRelayManager.on_ws_heartbeat",
|
||||||
|
)
|
||||||
|
|
||||||
# Establishes a websocket connection to /websocket/relay on all API servers
|
# Establishes a websocket connection to /websocket/relay on all API servers
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
Reference in New Issue
Block a user