Prevent looping issue when task/web share a Redis

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod 2022-12-10 02:40:48 -06:00 committed by Hao Liu
parent 2e0f25150c
commit 5f41003fb1
2 changed files with 9 additions and 1 deletions

View File

@ -240,6 +240,6 @@ def emit_channel_notification(group, payload):
run_sync(
channel_layer.group_send(
group,
{"type": "internal.message", "text": payload_dumped},
{"type": "internal.message", "text": payload_dumped, "needs_relay": True},
)
)

View File

@ -164,6 +164,14 @@ class WebsocketRelayConnection:
while True:
try:
msg = await asyncio.wait_for(self.channel_layer.receive(consumer_channel), timeout=10)
if not msg.get("needs_relay"):
# This is added in by emit_channel_notification(). It prevents us from looping
# in the event that we are sharing a redis with a web instance. We'll see the
# message once (it'll have needs_relay=True), we'll delete that, and then forward
# the message along. The web instance will add it back to the same channels group,
# but it won't have needs_relay=True, so we'll ignore it.
continue
del msg["needs_relay"]
except asyncio.TimeoutError:
current_subscriptions = self.producers[name]["subscriptions"]
if len(current_subscriptions) == 0: