diff --git a/awx/main/consumers.py b/awx/main/consumers.py index d32219b3ac..b6d8872ebd 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -1,5 +1,3 @@ -import collections -import functools import json import logging import time @@ -14,40 +12,12 @@ from django.contrib.auth.models import User from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.layers import get_channel_layer from channels.db import database_sync_to_async -from channels_redis.core import RedisChannelLayer logger = logging.getLogger('awx.main.consumers') XRF_KEY = '_auth_user_xrf' -class BoundedQueue(asyncio.Queue): - - def put_nowait(self, item): - if self.full(): - # dispose the oldest item - # if we actually get into this code block, it likely means that - # this specific consumer has stopped reading - # unfortunately, channels_redis will just happily continue to - # queue messages specific to their channel until the heat death - # of the sun: https://github.com/django/channels_redis/issues/212 - # this isn't a huge deal for browser clients that disconnect, - # but it *does* cause a problem for our global broadcast topic - # that's used to broadcast messages to peers in a cluster - # if we get into this code block, it's better to drop messages - # than to continue to malloc() forever - self.get_nowait() - return super(BoundedQueue, self).put_nowait(item) - - -class ExpiringRedisChannelLayer(RedisChannelLayer): - def __init__(self, *args, **kw): - super(ExpiringRedisChannelLayer, self).__init__(*args, **kw) - self.receive_buffer = collections.defaultdict( - functools.partial(BoundedQueue, self.capacity) - ) - - class WebsocketSecretAuthHelper: """ Middlewareish for websockets to verify node websocket broadcast interconnect. diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 8d42af1ae4..7bfcaa8861 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -790,7 +790,7 @@ ASGI_APPLICATION = "awx.main.routing.application" CHANNEL_LAYERS = { "default": { - "BACKEND": "awx.main.consumers.ExpiringRedisChannelLayer", + "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [BROKER_URL], "capacity": 10000,