awx/awx/main/routing.py
chris meyers 216454d298 cleanup channel groups on start
* There are 2 data-structures that django channels redis uses: (1) zset
and (2) list. (1) is used for group membership where the key is the
logic user group and the value(s) are websocket clients. The score of
the zset entry is used for group expiration. We can not rely on group
expiration for clean-up because there is no interface privided by redis
channels to refresh the expiration. Choosing a small value for
group_expiry could result on our websocket backplane group expiring,
which would result in job events not being delivered. Instead, we
increase the group expiration to 5 years and clean up on daphne service
start.
* The list (2) data-structure is used by django channels redis to queue
websocket events per-websocket-client as needed. The need arises to
queue per-websocket-client events when the consumer can not keep up with
the producer. The consumer here is daphne, the producer is AWX.
* When AWX is operating healthy group membership in Redis is reflective
of the real-world. When AWX is unhealthy i.e. daphne cycles, the zset
will contain stale websocket client entries. This can be observed by
running `zrange asgi::group:jobs-status_changed 0 -1`. If the entries
returned look like:
specific.fUkXXpYj!DKOIfwPICNgw
specific.fUkXXpYj!FQcdopZeiRdG
specific.lpTSAgnk!IOKldfzcfdDp
specific.lpTSAgnk!NbvRUZsDpIQx
The entries with `fUkXXpYj` are stale. Note that this changeset fixes
this by removing all `asgi:*` entries on daphne start.
* Also note that individual message themselves have an expiration that
is configurable and defaults to 60.
* Also note that zset's tracking group membership will be deleted by
django channels redis when they are empty.
2020-05-11 11:02:57 -04:00

39 lines
1.0 KiB
Python

import redis
import logging
from django.conf.urls import url
from django.conf import settings
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from . import consumers
logger = logging.getLogger('awx.main.routing')
class AWXProtocolTypeRouter(ProtocolTypeRouter):
def __init__(self, *args, **kwargs):
try:
r = redis.Redis.from_url(settings.BROKER_URL)
for k in r.scan_iter('asgi:*', 500):
logger.debug(f"cleaning up Redis key {k}")
r.delete(k)
except redis.exceptions.RedisError as e:
logger.warn("encountered an error communicating with redis.")
raise e
super().__init__(*args, **kwargs)
websocket_urlpatterns = [
url(r'websocket/$', consumers.EventConsumer),
url(r'websocket/broadcast/$', consumers.BroadcastConsumer),
]
application = AWXProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(websocket_urlpatterns)
),
})