diff --git a/awx/main/analytics/broadcast_websocket.py b/awx/main/analytics/broadcast_websocket.py index 21567402f3..1191318d1c 100644 --- a/awx/main/analytics/broadcast_websocket.py +++ b/awx/main/analytics/broadcast_websocket.py @@ -66,10 +66,8 @@ class FixedSlidingWindow: class RelayWebsocketStatsManager: - def __init__(self, event_loop, local_hostname): + def __init__(self, local_hostname): self._local_hostname = local_hostname - - self._event_loop = event_loop self._stats = dict() self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME @@ -94,7 +92,10 @@ class RelayWebsocketStatsManager: self.start() def start(self): - self.async_task = self._event_loop.create_task(self.run_loop()) + self.async_task = asyncio.get_running_loop().create_task( + self.run_loop(), + name='RelayWebsocketStatsManager.run_loop', + ) return self.async_task @classmethod diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index dced6c4e83..f6d6aa7335 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -47,7 +47,6 @@ class WebsocketRelayConnection: verify_ssl: bool = settings.BROADCAST_WEBSOCKET_VERIFY_CERT, ): self.name = name - self.event_loop = asyncio.get_event_loop() self.stats = stats self.remote_host = remote_host self.remote_port = remote_port @@ -110,7 +109,10 @@ class WebsocketRelayConnection: self.stats.record_connection_lost() def start(self): - self.async_task = self.event_loop.create_task(self.connect()) + self.async_task = asyncio.get_running_loop().create_task( + self.connect(), + name=f"WebsocketRelayConnection.connect.{self.name}", + ) return self.async_task def cancel(self): @@ -121,7 +123,10 @@ class WebsocketRelayConnection: # metrics messages # the "metrics" group is not subscribed to in the typical fashion, so we # just explicitly create it - producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics")) + producer = asyncio.get_running_loop().create_task( + self.run_producer("metrics", websocket, "metrics"), + name="WebsocketRelayConnection.run_producer.metrics", + ) self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}} async for msg in websocket: self.stats.record_message_received() @@ -143,7 +148,10 @@ class WebsocketRelayConnection: name = f"{self.remote_host}-{group}" origin_channel = payload['origin_channel'] if not self.producers.get(name): - producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) + producer = asyncio.get_running_loop().create_task( + self.run_producer(name, websocket, group), + name=f"WebsocketRelayConnection.run_producer.{name}", + ) self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} logger.debug(f"Producer {name} started.") else: @@ -297,9 +305,7 @@ class WebSocketRelayManager(object): pass async def run(self): - event_loop = asyncio.get_running_loop() - - self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname) + self.stats_mgr = RelayWebsocketStatsManager(self.local_hostname) self.stats_mgr.start() database_conf = deepcopy(settings.DATABASES['default']) @@ -323,7 +329,10 @@ class WebSocketRelayManager(object): ) await async_conn.set_autocommit(True) - on_ws_heartbeat_task = event_loop.create_task(self.on_ws_heartbeat(async_conn)) + on_ws_heartbeat_task = asyncio.get_running_loop().create_task( + self.on_ws_heartbeat(async_conn), + name="WebSocketRelayManager.on_ws_heartbeat", + ) # Establishes a websocket connection to /websocket/relay on all API servers while True: