From 3c5c9c6fde893327fb52614994b0d14d775b4ac1 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 14 Feb 2020 16:12:04 -0500 Subject: [PATCH] move broadcast websocket out into its own process --- Makefile | 33 ++----------------- awx/main/consumers.py | 6 ++-- awx/main/db/profiled_pg/base.py | 2 +- .../management/commands/run_wsbroadcast.py | 27 +++++++++++++++ awx/main/{channels.py => wsbroadcast.py} | 15 +++------ awx/settings/defaults.py | 2 +- awx/settings/local_settings.py.docker_compose | 2 +- docs/websockets.md | 6 ++-- installer/roles/image_build/files/settings.py | 5 ++- .../roles/image_build/files/supervisor.conf | 13 +++++++- .../kubernetes/templates/credentials.py.j2 | 4 +-- .../local_docker/templates/credentials.py.j2 | 2 +- tools/docker-compose/supervisor.conf | 16 +++++++++ 13 files changed, 76 insertions(+), 57 deletions(-) create mode 100644 awx/main/management/commands/run_wsbroadcast.py rename awx/main/{channels.py => wsbroadcast.py} (93%) diff --git a/Makefile b/Makefile index 16225f0454..dbf278f30d 100644 --- a/Makefile +++ b/Makefile @@ -265,28 +265,6 @@ migrate: dbchange: $(MANAGEMENT_COMMAND) makemigrations -server_noattach: - tmux new-session -d -s awx 'exec make uwsgi' - tmux rename-window 'AWX' - tmux select-window -t awx:0 - tmux split-window -v 'exec make dispatcher' - tmux new-window 'exec make daphne' - tmux select-window -t awx:1 - tmux rename-window 'WebSockets' - tmux split-window -h 'exec make runworker' - tmux split-window -v 'exec make nginx' - tmux new-window 'exec make receiver' - tmux select-window -t awx:2 - tmux rename-window 'Extra Services' - tmux select-window -t awx:0 - -server: server_noattach - tmux -2 attach-session -t awx - -# Use with iterm2's native tmux protocol support -servercc: server_noattach - tmux -2 -CC attach-session -t awx - supervisor: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ @@ -311,18 +289,11 @@ daphne: fi; \ daphne -b 127.0.0.1 -p 8051 awx.asgi:channel_layer -runworker: +wsbroadcast: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - $(PYTHON) manage.py runworker --only-channels websocket.* - -# Run the built-in development webserver (by default on http://localhost:8013). -runserver: - @if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/awx/bin/activate; \ - fi; \ - $(PYTHON) manage.py runserver + $(PYTHON) manage.py run_wsbroadcast # Run to start the background task dispatcher for development. dispatcher: diff --git a/awx/main/consumers.py b/awx/main/consumers.py index e11a20bc99..68758210e8 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -21,7 +21,7 @@ from channels.db import database_sync_to_async from asgiref.sync import async_to_sync -from awx.main.channels import wrap_broadcast_msg +from awx.main.wsbroadcast import wrap_broadcast_msg logger = logging.getLogger('awx.main.consumers') @@ -106,11 +106,13 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer): try: WebsocketSecretAuthHelper.is_authorized(self.scope) except Exception: + # TODO: log ip of connected client + logger.warn("Broadcast client failed to authorize.") await self.close() return # TODO: log ip of connected client - logger.info("Client connected") + logger.info(f"Broadcast client connected.") await self.accept() await self.channel_layer.group_add(BROADCAST_GROUP, self.channel_name) diff --git a/awx/main/db/profiled_pg/base.py b/awx/main/db/profiled_pg/base.py index 820e867508..ab1f9a7c93 100644 --- a/awx/main/db/profiled_pg/base.py +++ b/awx/main/db/profiled_pg/base.py @@ -64,7 +64,7 @@ class RecordedQueryLog(object): if not os.path.isdir(self.dest): os.makedirs(self.dest) progname = ' '.join(sys.argv) - for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'runworker'): + for match in ('uwsgi', 'dispatcher', 'callback_receiver', 'wsbroadcast'): if match in progname: progname = match break diff --git a/awx/main/management/commands/run_wsbroadcast.py b/awx/main/management/commands/run_wsbroadcast.py new file mode 100644 index 0000000000..63e8ed6b94 --- /dev/null +++ b/awx/main/management/commands/run_wsbroadcast.py @@ -0,0 +1,27 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved. +import logging +import asyncio + +from django.core.management.base import BaseCommand + +from awx.main.wsbroadcast import BroadcastWebsocketManager + + +logger = logging.getLogger('awx.main.wsbroadcast') + + +class Command(BaseCommand): + help = 'Launch the websocket broadcaster' + + def handle(self, *arg, **options): + try: + broadcast_websocket_mgr = BroadcastWebsocketManager() + task = broadcast_websocket_mgr.start() + + loop = asyncio.get_event_loop() + loop.run_until_complete(task) + except KeyboardInterrupt: + logger.debug('Terminating Websocket Broadcaster') + if broadcast_websocket_mgr: + broadcast_websocket_mgr.stop() diff --git a/awx/main/channels.py b/awx/main/wsbroadcast.py similarity index 93% rename from awx/main/channels.py rename to awx/main/wsbroadcast.py index 3d287d6622..d5100793e4 100644 --- a/awx/main/channels.py +++ b/awx/main/wsbroadcast.py @@ -5,6 +5,7 @@ import logging import aiohttp import asyncio import datetime +import sys from channels_redis.core import RedisChannelLayer from channels.layers import get_channel_layer @@ -15,7 +16,7 @@ from django.apps import apps from django.core.serializers.json import DjangoJSONEncoder -logger = logging.getLogger('awx.main') +logger = logging.getLogger('awx.main.wsbroadcast') def wrap_broadcast_msg(group, message: str): @@ -100,7 +101,7 @@ class WebsocketTask(): except Exception as e: # Early on, this is our canary. I'm not sure what exceptions we can really encounter. # Does aiohttp throws an exception if a disconnect happens? - logger.warn("Websocket broadcast client exception {}".format(e)) + logger.warn(f"Websocket broadcast client exception {str(e)}") # Reconnect self.start(attempt=attempt+1) @@ -129,7 +130,6 @@ class BroadcastWebsocketTask(WebsocketTask): (group, message) = unwrap_broadcast_msg(payload) - logger.debug(f"{self.name} broadcasting message") await self.channel_layer.group_send(group, {"type": "internal.message", "text": message}) @@ -167,11 +167,4 @@ class BroadcastWebsocketManager(object): def start(self): self.async_task = self.event_loop.create_task(self.run_loop()) - - -class RedisGroupBroadcastChannelLayer(RedisChannelLayer): - def __init__(self, *args, **kwargs): - super(RedisGroupBroadcastChannelLayer, self).__init__(*args, **kwargs) - - broadcast_websocket_mgr = BroadcastWebsocketManager() - broadcast_websocket_mgr.start() + return self.async_task diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index def4b9135e..664259fb2d 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -959,7 +959,7 @@ ASGI_APPLICATION = "awx.main.routing.application" CHANNEL_LAYERS = { "default": { - "BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer", + "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [("localhost", 6379)], }, diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index e2ae322e3d..9397d3c15e 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -58,7 +58,7 @@ redis_parts = parse.urlparse(BROKER_URL) CHANNEL_LAYERS = { "default": { - "BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer", + "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [(redis_parts.hostname, redis_parts.port)] }, diff --git a/docs/websockets.md b/docs/websockets.md index 2095905028..a1f4403ee8 100644 --- a/docs/websockets.md +++ b/docs/websockets.md @@ -43,14 +43,14 @@ This section will specifically discuss deployment in the context of websockets a | `nginx` | listens on ports 80/443, handles HTTPS proxying, serves static assets, routes requests for `daphne` and `uwsgi` | | `uwsgi` | listens on port 8050, handles API requests | | `daphne` | listens on port 8051, handles websocket requests | -| `runworker` | no listening port, watches and processes the message queue | +| `wsbroadcast` | no listening port, forwards all group messages to all cluster nodes | | `supervisord` | (production-only) handles the process management of all the services except `nginx` | When a request comes in to `nginx` and has the `Upgrade` header and is for the path `/websocket`, then `nginx` knows that it should be routing that request to our `daphne` service. -`daphne` receives the request and generates channel and routing information for the request. The configured event handlers for `daphne` then unpack and parse the request message using the wire protocol mentioned above. This ensures that the connection has its context limited to only receive messages for events it is interested in. `daphne` uses internal events to trigger further behavior, which will generate messages and send them to the queue, which is then processed by the `runworker`. +`daphne` handles websocket connections proxied by nginx. -`runworker` processes the messages from the queue. This uses the contextual information of the message provided by the `daphne` server and our `asgi_amqp` implementation to broadcast messages out to each client. +`wsbroadcast` fully connects all cluster nodes to every other cluster nodes. Sends a copy of all group websocket messages to all other cluster nodes (i.e. job event type messages). ### Development - `nginx` listens on 8013/8043 instead of 80/443 diff --git a/installer/roles/image_build/files/settings.py b/installer/roles/image_build/files/settings.py index 6fe4306ec0..f33100fdef 100644 --- a/installer/roles/image_build/files/settings.py +++ b/installer/roles/image_build/files/settings.py @@ -87,11 +87,10 @@ if os.getenv("DATABASE_SSLMODE", False): BROKER_URL = 'redis://{}:{}'.format( os.getenv("REDIS_HOST", None), - os.getenv("REDIS_PORT", "6379"), + os.getenv("REDIS_PORT", "6379"),) CHANNEL_LAYERS = { - 'default': {'BACKEND': 'asgi_amqp.AMQPChannelLayer', - 'ROUTING': 'awx.main.routing.channel_routing', + 'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': {'hosts': [(os.getenv("REDIS_HOST", None), int(os.getenv("REDIS_PORT", 6379)))]}} } diff --git a/installer/roles/image_build/files/supervisor.conf b/installer/roles/image_build/files/supervisor.conf index 1409b8b2c2..acc1af1d6b 100644 --- a/installer/roles/image_build/files/supervisor.conf +++ b/installer/roles/image_build/files/supervisor.conf @@ -35,8 +35,19 @@ stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 +[program:wsbroadcast] +command = awx-manage run_wsbroadcast +directory = /var/lib/awx +autostart = true +autorestart = true +stopwaitsecs = 5 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 + [group:tower-processes] -programs=nginx,uwsgi,daphne +programs=nginx,uwsgi,daphne,wsbroadcast priority=5 # TODO: Exit Handler diff --git a/installer/roles/kubernetes/templates/credentials.py.j2 b/installer/roles/kubernetes/templates/credentials.py.j2 index fd68abc523..c53bacb2ed 100644 --- a/installer/roles/kubernetes/templates/credentials.py.j2 +++ b/installer/roles/kubernetes/templates/credentials.py.j2 @@ -18,6 +18,6 @@ BROKER_URL = 'redis://{}:{}/'.format( "{{ kubernetes_redis_port }}",) CHANNEL_LAYERS = { - 'default': {'BACKEND': 'awx.main.channels.RedisGroupBroadcastChannelLayer', + 'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': {'hosts': [("{{ kubernetes_redis_hostname }}", {{ kubernetes_redis_port|int }})]}} -} \ No newline at end of file +} diff --git a/installer/roles/local_docker/templates/credentials.py.j2 b/installer/roles/local_docker/templates/credentials.py.j2 index be71a5dc4a..8711785642 100644 --- a/installer/roles/local_docker/templates/credentials.py.j2 +++ b/installer/roles/local_docker/templates/credentials.py.j2 @@ -15,7 +15,7 @@ BROKER_URL = 'redis://{}:{}/'.format( "{{ redis_port }}",) CHANNEL_LAYERS = { - 'default': {'BACKEND': 'awx.main.channels.RedisGroupBroadcastChannelLayer', + 'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': {'hosts': [("{{ redis_hostname }}", {{ redis_port|int }})]}} } diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index c0e649950d..8eee142b79 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -27,6 +27,18 @@ redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 +[program:awx-wsbroadcast] +command = make wsbroadcast +autostart = true +autorestart = true +stopwaitsecs = 1 +stopsignal=KILL +stopasgroup=true +killasgroup=true +redirect_stderr=true +stdout_logfile=/dev/fd/1 +stdout_logfile_maxbytes=0 + [program:awx-uwsgi] command = make uwsgi autostart = true @@ -44,6 +56,10 @@ command = make daphne autostart = true autorestart = true redirect_stderr=true +stopwaitsecs = 1 +stopsignal=KILL +stopasgroup=true +killasgroup=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0