work around a memory leak in channels_redis

see: https://github.com/django/channels_redis/issues/212
see: https://github.com/django/channels_redis/pull/219/
This commit is contained in:
Ryan Petrello
2020-07-31 15:37:44 -04:00
parent 35efbb043d
commit e25da217e8
2 changed files with 31 additions and 1 deletions

View File

@@ -1,3 +1,5 @@
import collections
import functools
import json import json
import logging import logging
import time import time
@@ -12,12 +14,40 @@ from django.contrib.auth.models import User
from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.layers import get_channel_layer from channels.layers import get_channel_layer
from channels.db import database_sync_to_async from channels.db import database_sync_to_async
from channels_redis.core import RedisChannelLayer
logger = logging.getLogger('awx.main.consumers') logger = logging.getLogger('awx.main.consumers')
XRF_KEY = '_auth_user_xrf' XRF_KEY = '_auth_user_xrf'
class BoundedQueue(asyncio.Queue):
def put_nowait(self, item):
if self.full():
# dispose the oldest item
# if we actually get into this code block, it likely means that
# this specific consumer has stopped reading
# unfortunately, channels_redis will just happily continue to
# queue messages specific to their channel until the heat death
# of the sun: https://github.com/django/channels_redis/issues/212
# this isn't a huge deal for browser clients that disconnect,
# but it *does* cause a problem for our global broadcast topic
# that's used to broadcast messages to peers in a cluster
# if we get into this code block, it's better to drop messages
# than to continue to malloc() forever
self.get_nowait()
return super(BoundedQueue, self).put_nowait(item)
class ExpiringRedisChannelLayer(RedisChannelLayer):
def __init__(self, *args, **kw):
super(ExpiringRedisChannelLayer, self).__init__(*args, **kw)
self.receive_buffer = collections.defaultdict(
functools.partial(BoundedQueue, self.capacity)
)
class WebsocketSecretAuthHelper: class WebsocketSecretAuthHelper:
""" """
Middlewareish for websockets to verify node websocket broadcast interconnect. Middlewareish for websockets to verify node websocket broadcast interconnect.

View File

@@ -916,7 +916,7 @@ ASGI_APPLICATION = "awx.main.routing.application"
CHANNEL_LAYERS = { CHANNEL_LAYERS = {
"default": { "default": {
"BACKEND": "channels_redis.core.RedisChannelLayer", "BACKEND": "awx.main.consumers.ExpiringRedisChannelLayer",
"CONFIG": { "CONFIG": {
"hosts": [BROKER_URL], "hosts": [BROKER_URL],
"capacity": 10000, "capacity": 10000,