From 5818dcc980be31bd521c2670790956491e076154 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Wed, 22 Jan 2020 16:24:08 -0500 Subject: [PATCH] prefer simple async -> sync * asgiref async_to_sync was causing a Redis connection _for each_ call to emit_channel_notification i.e. every event that the callback receiver processes. This is a "known" issue https://github.com/django/channels_redis/pull/130#issuecomment-424274470 and the advise is to slow downn the rate at which you call async_to_sync. That is not an option for us. Instead, we put the async group_send call onto the event loop for the current thread and wait for it to be processed immediately. The known issue has to do with event loop + socket relationship. Each connection to redis is achieved via a socket. That conection can only be waiting on by the event loop that corresponds to the calling thread. async_to_sync creates a _new thread_ for each invocation. Thus, a new connection to redis is required. Thus, the excess redis connections that can be observed via netstat | grep redis | wc -l. --- awx/main/consumers.py | 59 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index bc7aa47089..03173ceffd 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -5,6 +5,7 @@ import logging import codecs import datetime import hmac +import asyncio from django.utils.encoding import force_bytes from django.utils.encoding import smart_str @@ -200,28 +201,66 @@ class EventConsumer(AsyncJsonWebsocketConsumer): await self.send(event['text']) -def emit_channel_notification(group, payload): +def run_sync(func): + event_loop = None try: - payload = json.dumps(payload, cls=DjangoJSONEncoder) + event_loop = asyncio.get_event_loop() + except RuntimeError: + event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(event_loop) + return event_loop.run_until_complete(func) + + +def _dump_payload(payload): + try: + return json.dumps(payload, cls=DjangoJSONEncoder) except ValueError: - logger.error("Invalid payload emitting channel {} on topic: {}".format(group, payload)) + logger.error("Invalid payload to emit") + return None + + +async def emit_channel_notification_async(group, payload): + payload_dumped = _dump_payload(payload) + if payload_dumped is None: + return + + channel_layer = get_channel_layer() + await channel_layer.group_send( + group, + { + "type": "internal.message", + "text": payload_dumped + }, + ) + + await channel_layer.group_send( + BROADCAST_GROUP, + { + "type": "internal.message", + "text": wrap_broadcast_msg(group, payload_dumped), + }, + ) + + +def emit_channel_notification(group, payload): + payload_dumped = _dump_payload(payload) + if payload_dumped is None: return channel_layer = get_channel_layer() - async_to_sync(channel_layer.group_send)( + run_sync(channel_layer.group_send( group, { "type": "internal.message", - "text": payload + "text": payload_dumped }, - ) + )) - async_to_sync(channel_layer.group_send)( + run_sync(channel_layer.group_send( BROADCAST_GROUP, { "type": "internal.message", - "text": wrap_broadcast_msg(group, payload), + "text": wrap_broadcast_msg(group, payload_dumped), }, - ) - + ))