move broadcast websocket out into its own process

This commit is contained in:
chris meyers 2020-02-14 16:12:04 -05:00 committed by Ryan Petrello
parent f5193e5ea5
commit 3c5c9c6fde
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
13 changed files with 76 additions and 57 deletions

View File

@ -265,28 +265,6 @@ migrate:
dbchange:
$(MANAGEMENT_COMMAND) makemigrations
server_noattach:
tmux new-session -d -s awx 'exec make uwsgi'
tmux rename-window 'AWX'
tmux select-window -t awx:0
tmux split-window -v 'exec make dispatcher'
tmux new-window 'exec make daphne'
tmux select-window -t awx:1
tmux rename-window 'WebSockets'
tmux split-window -h 'exec make runworker'
tmux split-window -v 'exec make nginx'
tmux new-window 'exec make receiver'
tmux select-window -t awx:2
tmux rename-window 'Extra Services'
tmux select-window -t awx:0
server: server_noattach
tmux -2 attach-session -t awx
# Use with iterm2's native tmux protocol support
servercc: server_noattach
tmux -2 -CC attach-session -t awx
supervisor:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
@ -311,18 +289,11 @@ daphne:
fi; \
daphne -b 127.0.0.1 -p 8051 awx.asgi:channel_layer
runworker:
wsbroadcast:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py runworker --only-channels websocket.*
# Run the built-in development webserver (by default on http://localhost:8013).
runserver:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py runserver
$(PYTHON) manage.py run_wsbroadcast
# Run to start the background task dispatcher for development.
dispatcher:

View File

@ -21,7 +21,7 @@ from channels.db import database_sync_to_async
from asgiref.sync import async_to_sync
from awx.main.channels import wrap_broadcast_msg
from awx.main.wsbroadcast import wrap_broadcast_msg
logger = logging.getLogger('awx.main.consumers')
@ -106,11 +106,13 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer):
try:
WebsocketSecretAuthHelper.is_authorized(self.scope)
except Exception:
# TODO: log ip of connected client
logger.warn("Broadcast client failed to authorize.")
await self.close()
return
# TODO: log ip of connected client
logger.info("Client connected")
logger.info(f"Broadcast client connected.")
await self.accept()
await self.channel_layer.group_add(BROADCAST_GROUP, self.channel_name)

View File

@ -64,7 +64,7 @@ class RecordedQueryLog(object):
if not os.path.isdir(self.dest):
os.makedirs(self.dest)
progname = ' '.join(sys.argv)
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'runworker'):
for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsbroadcast'):
if match in progname:
progname = match
break

View File

@ -0,0 +1,27 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import logging
import asyncio
from django.core.management.base import BaseCommand
from awx.main.wsbroadcast import BroadcastWebsocketManager
logger = logging.getLogger('awx.main.wsbroadcast')
class Command(BaseCommand):
help = 'Launch the websocket broadcaster'
def handle(self, *arg, **options):
try:
broadcast_websocket_mgr = BroadcastWebsocketManager()
task = broadcast_websocket_mgr.start()
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
except KeyboardInterrupt:
logger.debug('Terminating Websocket Broadcaster')
if broadcast_websocket_mgr:
broadcast_websocket_mgr.stop()

View File

@ -5,6 +5,7 @@ import logging
import aiohttp
import asyncio
import datetime
import sys
from channels_redis.core import RedisChannelLayer
from channels.layers import get_channel_layer
@ -15,7 +16,7 @@ from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder
logger = logging.getLogger('awx.main')
logger = logging.getLogger('awx.main.wsbroadcast')
def wrap_broadcast_msg(group, message: str):
@ -100,7 +101,7 @@ class WebsocketTask():
except Exception as e:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
# Does aiohttp throws an exception if a disconnect happens?
logger.warn("Websocket broadcast client exception {}".format(e))
logger.warn(f"Websocket broadcast client exception {str(e)}")
# Reconnect
self.start(attempt=attempt+1)
@ -129,7 +130,6 @@ class BroadcastWebsocketTask(WebsocketTask):
(group, message) = unwrap_broadcast_msg(payload)
logger.debug(f"{self.name} broadcasting message")
await self.channel_layer.group_send(group, {"type": "internal.message", "text": message})
@ -167,11 +167,4 @@ class BroadcastWebsocketManager(object):
def start(self):
self.async_task = self.event_loop.create_task(self.run_loop())
class RedisGroupBroadcastChannelLayer(RedisChannelLayer):
def __init__(self, *args, **kwargs):
super(RedisGroupBroadcastChannelLayer, self).__init__(*args, **kwargs)
broadcast_websocket_mgr = BroadcastWebsocketManager()
broadcast_websocket_mgr.start()
return self.async_task

View File

@ -959,7 +959,7 @@ ASGI_APPLICATION = "awx.main.routing.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer",
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("localhost", 6379)],
},

View File

@ -58,7 +58,7 @@ redis_parts = parse.urlparse(BROKER_URL)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer",
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [(redis_parts.hostname, redis_parts.port)]
},

View File

@ -43,14 +43,14 @@ This section will specifically discuss deployment in the context of websockets a
| `nginx` | listens on ports 80/443, handles HTTPS proxying, serves static assets, routes requests for `daphne` and `uwsgi` |
| `uwsgi` | listens on port 8050, handles API requests |
| `daphne` | listens on port 8051, handles websocket requests |
| `runworker` | no listening port, watches and processes the message queue |
| `wsbroadcast` | no listening port, forwards all group messages to all cluster nodes |
| `supervisord` | (production-only) handles the process management of all the services except `nginx` |
When a request comes in to `nginx` and has the `Upgrade` header and is for the path `/websocket`, then `nginx` knows that it should be routing that request to our `daphne` service.
`daphne` receives the request and generates channel and routing information for the request. The configured event handlers for `daphne` then unpack and parse the request message using the wire protocol mentioned above. This ensures that the connection has its context limited to only receive messages for events it is interested in. `daphne` uses internal events to trigger further behavior, which will generate messages and send them to the queue, which is then processed by the `runworker`.
`daphne` handles websocket connections proxied by nginx.
`runworker` processes the messages from the queue. This uses the contextual information of the message provided by the `daphne` server and our `asgi_amqp` implementation to broadcast messages out to each client.
`wsbroadcast` fully connects all cluster nodes to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages).
### Development
- `nginx` listens on 8013/8043 instead of 80/443

View File

@ -87,11 +87,10 @@ if os.getenv("DATABASE_SSLMODE", False):
BROKER_URL = 'redis://{}:{}'.format(
os.getenv("REDIS_HOST", None),
os.getenv("REDIS_PORT", "6379"),
os.getenv("REDIS_PORT", "6379"),)
CHANNEL_LAYERS = {
'default': {'BACKEND': 'asgi_amqp.AMQPChannelLayer',
'ROUTING': 'awx.main.routing.channel_routing',
'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {'hosts': [(os.getenv("REDIS_HOST", None), int(os.getenv("REDIS_PORT", 6379)))]}}
}

View File

@ -35,8 +35,19 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:wsbroadcast]
command = awx-manage run_wsbroadcast
directory = /var/lib/awx
autostart = true
autorestart = true
stopwaitsecs = 5
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[group:tower-processes]
programs=nginx,uwsgi,daphne
programs=nginx,uwsgi,daphne,wsbroadcast
priority=5
# TODO: Exit Handler

View File

@ -18,6 +18,6 @@ BROKER_URL = 'redis://{}:{}/'.format(
"{{ kubernetes_redis_port }}",)
CHANNEL_LAYERS = {
'default': {'BACKEND': 'awx.main.channels.RedisGroupBroadcastChannelLayer',
'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {'hosts': [("{{ kubernetes_redis_hostname }}", {{ kubernetes_redis_port|int }})]}}
}
}

View File

@ -15,7 +15,7 @@ BROKER_URL = 'redis://{}:{}/'.format(
"{{ redis_port }}",)
CHANNEL_LAYERS = {
'default': {'BACKEND': 'awx.main.channels.RedisGroupBroadcastChannelLayer',
'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {'hosts': [("{{ redis_hostname }}", {{ redis_port|int }})]}}
}

View File

@ -27,6 +27,18 @@ redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:awx-wsbroadcast]
command = make wsbroadcast
autostart = true
autorestart = true
stopwaitsecs = 1
stopsignal=KILL
stopasgroup=true
killasgroup=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:awx-uwsgi]
command = make uwsgi
autostart = true
@ -44,6 +56,10 @@ command = make daphne
autostart = true
autorestart = true
redirect_stderr=true
stopwaitsecs = 1
stopsignal=KILL
stopasgroup=true
killasgroup=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0