prefer simple async -> sync

* asgiref async_to_sync was causing a Redis connection _for each_ call
to emit_channel_notification i.e. every event that the callback receiver
processes. This is a "known" issue
https://github.com/django/channels_redis/pull/130#issuecomment-424274470
and the advise is to slow downn the rate at which you call
async_to_sync. That is not an option for us. Instead, we put the async
group_send call onto the event loop for the current thread and wait for
it to be processed immediately.

The known issue has to do with event loop + socket relationship. Each
connection to redis is achieved via a socket. That conection can only be
waiting on by the event loop that corresponds to the calling thread.
async_to_sync creates a _new thread_ for each invocation. Thus, a new
connection to redis is required. Thus, the excess redis connections that
can be observed via netstat | grep redis | wc -l.
This commit is contained in:
chris meyers
2020-01-22 16:24:08 -05:00
committed by Ryan Petrello
parent dc6c353ecd
commit 5818dcc980

View File

@@ -5,6 +5,7 @@ import logging
import codecs
import datetime
import hmac
import asyncio
from django.utils.encoding import force_bytes
from django.utils.encoding import smart_str
@@ -200,28 +201,66 @@ class EventConsumer(AsyncJsonWebsocketConsumer):
await self.send(event['text'])
def emit_channel_notification(group, payload):
def run_sync(func):
event_loop = None
try:
payload = json.dumps(payload, cls=DjangoJSONEncoder)
event_loop = asyncio.get_event_loop()
except RuntimeError:
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
return event_loop.run_until_complete(func)
def _dump_payload(payload):
try:
return json.dumps(payload, cls=DjangoJSONEncoder)
except ValueError:
logger.error("Invalid payload emitting channel {} on topic: {}".format(group, payload))
logger.error("Invalid payload to emit")
return None
async def emit_channel_notification_async(group, payload):
payload_dumped = _dump_payload(payload)
if payload_dumped is None:
return
channel_layer = get_channel_layer()
await channel_layer.group_send(
group,
{
"type": "internal.message",
"text": payload_dumped
},
)
await channel_layer.group_send(
BROADCAST_GROUP,
{
"type": "internal.message",
"text": wrap_broadcast_msg(group, payload_dumped),
},
)
def emit_channel_notification(group, payload):
payload_dumped = _dump_payload(payload)
if payload_dumped is None:
return
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
run_sync(channel_layer.group_send(
group,
{
"type": "internal.message",
"text": payload
"text": payload_dumped
},
)
))
async_to_sync(channel_layer.group_send)(
run_sync(channel_layer.group_send(
BROADCAST_GROUP,
{
"type": "internal.message",
"text": wrap_broadcast_msg(group, payload),
"text": wrap_broadcast_msg(group, payload_dumped),
},
)
))