mirror of
https://github.com/ansible/awx.git
synced 2026-02-22 05:30:18 -03:30
Revert "work around a memory leak in channels_redis"
This reverts commit e25da217e8.
This commit is contained in:
@@ -1,5 +1,3 @@
|
|||||||
import collections
|
|
||||||
import functools
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
@@ -14,40 +12,12 @@ from django.contrib.auth.models import User
|
|||||||
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
||||||
from channels.layers import get_channel_layer
|
from channels.layers import get_channel_layer
|
||||||
from channels.db import database_sync_to_async
|
from channels.db import database_sync_to_async
|
||||||
from channels_redis.core import RedisChannelLayer
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.consumers')
|
logger = logging.getLogger('awx.main.consumers')
|
||||||
XRF_KEY = '_auth_user_xrf'
|
XRF_KEY = '_auth_user_xrf'
|
||||||
|
|
||||||
|
|
||||||
class BoundedQueue(asyncio.Queue):
|
|
||||||
|
|
||||||
def put_nowait(self, item):
|
|
||||||
if self.full():
|
|
||||||
# dispose the oldest item
|
|
||||||
# if we actually get into this code block, it likely means that
|
|
||||||
# this specific consumer has stopped reading
|
|
||||||
# unfortunately, channels_redis will just happily continue to
|
|
||||||
# queue messages specific to their channel until the heat death
|
|
||||||
# of the sun: https://github.com/django/channels_redis/issues/212
|
|
||||||
# this isn't a huge deal for browser clients that disconnect,
|
|
||||||
# but it *does* cause a problem for our global broadcast topic
|
|
||||||
# that's used to broadcast messages to peers in a cluster
|
|
||||||
# if we get into this code block, it's better to drop messages
|
|
||||||
# than to continue to malloc() forever
|
|
||||||
self.get_nowait()
|
|
||||||
return super(BoundedQueue, self).put_nowait(item)
|
|
||||||
|
|
||||||
|
|
||||||
class ExpiringRedisChannelLayer(RedisChannelLayer):
|
|
||||||
def __init__(self, *args, **kw):
|
|
||||||
super(ExpiringRedisChannelLayer, self).__init__(*args, **kw)
|
|
||||||
self.receive_buffer = collections.defaultdict(
|
|
||||||
functools.partial(BoundedQueue, self.capacity)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class WebsocketSecretAuthHelper:
|
class WebsocketSecretAuthHelper:
|
||||||
"""
|
"""
|
||||||
Middlewareish for websockets to verify node websocket broadcast interconnect.
|
Middlewareish for websockets to verify node websocket broadcast interconnect.
|
||||||
|
|||||||
@@ -790,7 +790,7 @@ ASGI_APPLICATION = "awx.main.routing.application"
|
|||||||
|
|
||||||
CHANNEL_LAYERS = {
|
CHANNEL_LAYERS = {
|
||||||
"default": {
|
"default": {
|
||||||
"BACKEND": "awx.main.consumers.ExpiringRedisChannelLayer",
|
"BACKEND": "channels_redis.core.RedisChannelLayer",
|
||||||
"CONFIG": {
|
"CONFIG": {
|
||||||
"hosts": [BROKER_URL],
|
"hosts": [BROKER_URL],
|
||||||
"capacity": 10000,
|
"capacity": 10000,
|
||||||
|
|||||||
Reference in New Issue
Block a user