From c8eeacacca6ae252954faedc997ec4fd1262edda Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 8 Nov 2019 10:36:39 -0500 Subject: [PATCH] POC channels 2 --- awx/asgi.py | 10 +- awx/main/channels.py | 90 ++++++ awx/main/consumers.py | 271 +++++++++++++----- awx/main/dispatch/control.py | 7 +- awx/main/dispatch/kombu.py | 42 --- awx/main/dispatch/publish.py | 10 +- .../commands/run_callback_receiver.py | 5 +- .../management/commands/run_dispatcher.py | 3 +- awx/main/queue.py | 8 +- awx/main/routing.py | 20 +- awx/main/signals.py | 10 - awx/settings/defaults.py | 30 +- awx/settings/local_settings.py.docker_compose | 23 +- requirements/requirements.in | 8 +- requirements/requirements.txt | 76 ++--- tools/docker-compose-cluster.yml | 95 +++--- tools/docker-compose.yml | 15 +- tools/docker-compose/bootstrap_development.sh | 1 - tools/docker-compose/haproxy.cfg | 29 +- tools/docker-compose/supervisor.conf | 10 +- tools/redis/redis_1.conf | 4 + tools/redis/redis_2.conf | 4 + tools/redis/redis_3.conf | 4 + 23 files changed, 497 insertions(+), 278 deletions(-) create mode 100644 awx/main/channels.py delete mode 100644 awx/main/dispatch/kombu.py create mode 100644 tools/redis/redis_1.conf create mode 100644 tools/redis/redis_2.conf create mode 100644 tools/redis/redis_3.conf diff --git a/awx/asgi.py b/awx/asgi.py index 3190a7032c..40640a4a19 100644 --- a/awx/asgi.py +++ b/awx/asgi.py @@ -2,14 +2,15 @@ # All Rights Reserved. import os import logging +import django from awx import __version__ as tower_version # Prepare the AWX environment. from awx import prepare_env, MODE prepare_env() # NOQA -from django.core.wsgi import get_wsgi_application # NOQA -from channels.asgi import get_channel_layer +from channels.routing import get_default_application + """ ASGI config for AWX project. @@ -32,6 +33,5 @@ if MODE == 'production': os.environ.setdefault("DJANGO_SETTINGS_MODULE", "awx.settings") - - -channel_layer = get_channel_layer() +django.setup() +channel_layer = get_default_application() diff --git a/awx/main/channels.py b/awx/main/channels.py new file mode 100644 index 0000000000..e76b06cf1b --- /dev/null +++ b/awx/main/channels.py @@ -0,0 +1,90 @@ + +import os +import json +import logging +import aiohttp +import asyncio + +from channels_redis.core import RedisChannelLayer +from channels.layers import get_channel_layer + +from django.utils.encoding import force_bytes +from django.conf import settings +from django.apps import apps +from django.core.serializers.json import DjangoJSONEncoder + + +logger = logging.getLogger('awx.main') + + +def wrap_broadcast_msg(group, message): + # TODO: Maybe wrap as "group","message" so that we don't need to + # encode/decode as json. + return json.dumps(dict(group=group, message=message), cls=DjangoJSONEncoder) + + +def unwrap_broadcast_msg(payload): + return (payload['group'], payload['message']) + + +def get_broadcast_hosts(): + Instance = apps.get_model('main', 'Instance') + return [h[0] for h in Instance.objects.filter(rampart_groups__controller__isnull=True) + .exclude(hostname=Instance.objects.me().hostname) + .order_by('hostname') + .values_list('hostname') + .distinct()] + + +class RedisGroupBroadcastChannelLayer(RedisChannelLayer): + def __init__(self, *args, **kwargs): + super(RedisGroupBroadcastChannelLayer, self).__init__(*args, **kwargs) + + self.broadcast_hosts = get_broadcast_hosts() + self.broadcast_websockets = set() + + loop = asyncio.get_event_loop() + for host in self.broadcast_hosts: + loop.create_task(self.connect(host, settings.BROADCAST_WEBSOCKETS_PORT)) + + async def connect(self, host, port, secret='abc123', attempt=0): + from awx.main.consumers import WebsocketSecretAuthHelper # noqa + + if attempt > 0: + await asyncio.sleep(5) + channel_layer = get_channel_layer() + uri = f"{settings.BROADCAST_WEBSOCKETS_PROTOCOL}://{host}:{port}/websocket/broadcast/" + timeout = aiohttp.ClientTimeout(total=10) + + secret_val = WebsocketSecretAuthHelper.construct_secret() + try: + async with aiohttp.ClientSession(headers={'secret': secret_val}, + timeout=timeout) as session: + async with session.ws_connect(uri, ssl=settings.BROADCAST_WEBSOCKETS_VERIFY_CERT) as websocket: + # TODO: Surface a health status of the broadcast interconnect + async for msg in websocket: + if msg.type == aiohttp.WSMsgType.ERROR: + break + elif msg.type == aiohttp.WSMsgType.TEXT: + try: + payload = json.loads(msg.data) + except json.JSONDecodeError: + logmsg = "Failed to decode broadcast message" + if logger.isEnabledFor(logging.DEBUG): + logmsg = "{} {}".format(logmsg, payload) + logger.warn(logmsg) + continue + + (group, message) = unwrap_broadcast_msg(payload) + + await channel_layer.group_send(group, {"type": "internal.message", "text": message}) + except Exception as e: + # Early on, this is our canary. I'm not sure what exceptions we can really encounter. + # Does aiohttp throws an exception if a disconnect happens? + logger.warn("Websocket broadcast client exception {}".format(e)) + finally: + # Reconnect + loop = asyncio.get_event_loop() + loop.create_task(self.connect(host, port, secret, attempt=attempt+1)) + + diff --git a/awx/main/consumers.py b/awx/main/consumers.py index a4fcdc96a6..c81085c988 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -1,97 +1,228 @@ + +import os import json import logging +import codecs +import datetime +import hmac -from channels import Group -from channels.auth import channel_session_user_from_http, channel_session_user - +from django.utils.encoding import force_bytes from django.utils.encoding import smart_str from django.http.cookie import parse_cookie from django.core.serializers.json import DjangoJSONEncoder +from django.conf import settings +from django.utils.encoding import force_bytes + +from channels.generic.websocket import AsyncJsonWebsocketConsumer +from channels.layers import get_channel_layer +from channels.db import database_sync_to_async + +from asgiref.sync import async_to_sync + +from awx.main.channels import wrap_broadcast_msg logger = logging.getLogger('awx.main.consumers') XRF_KEY = '_auth_user_xrf' +BROADCAST_GROUP = 'broadcast-group_send' -def discard_groups(message): - if 'groups' in message.channel_session: - for group in message.channel_session['groups']: - Group(group).discard(message.reply_channel) +class WebsocketSecretAuthHelper: + """ + Middlewareish for websockets to verify node websocket broadcast interconnect. + + Note: The "ish" is due to the channels routing interface. Routing occurs + _after_ authentication; making it hard to apply this auth to _only_ a subset of + websocket endpoints. + """ + + @classmethod + def construct_secret(cls): + nonce_serialized = "{}".format(int((datetime.datetime.utcnow()-datetime.datetime.fromtimestamp(0)).total_seconds())) + payload_dict = { + 'secret': settings.BROADCAST_WEBSOCKETS_SECRET, + 'nonce': nonce_serialized + } + payload_serialized = json.dumps(payload_dict) + + secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET), + msg=force_bytes(payload_serialized), + digestmod='sha256').hexdigest() + + return 'HMAC-SHA256 {}:{}'.format(nonce_serialized, secret_serialized) -@channel_session_user_from_http -def ws_connect(message): - headers = dict(message.content.get('headers', '')) - message.reply_channel.send({"accept": True}) - message.content['method'] = 'FAKE' - if message.user.is_authenticated: - message.reply_channel.send( - {"text": json.dumps({"accept": True, "user": message.user.id})} - ) - # store the valid CSRF token from the cookie so we can compare it later - # on ws_receive - cookie_token = parse_cookie( - smart_str(headers.get(b'cookie')) - ).get('csrftoken') - if cookie_token: - message.channel_session[XRF_KEY] = cookie_token - else: - logger.error("Request user is not authenticated to use websocket.") - message.reply_channel.send({"close": True}) - return None + @classmethod + def verify_secret(cls, s, nonce_tolerance=300): + hex_decoder = codecs.getdecoder("hex_codec") + + try: + (prefix, payload) = s.split(' ') + if prefix != 'HMAC-SHA256': + raise ValueError('Unsupported encryption algorithm') + (nonce_parsed, secret_parsed) = payload.split(':') + except Exception: + raise ValueError("Failed to parse secret") + + try: + payload_expected = { + 'secret': settings.BROADCAST_WEBSOCKETS_SECRET, + 'nonce': nonce_parsed, + } + payload_serialized = json.dumps(payload_expected) + except Exception: + raise ValueError("Failed to create hash to compare to secret.") + + secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET), + msg=force_bytes(payload_serialized), + digestmod='sha256').hexdigest() + + if secret_serialized != secret_parsed: + raise ValueError("Invalid secret") + + # Avoid timing attack and check the nonce after all the heavy lifting + now = datetime.datetime.utcnow() + nonce_parsed = datetime.datetime.fromtimestamp(int(nonce_parsed)) + if (now-nonce_parsed).total_seconds() > nonce_tolerance: + raise ValueError("Potential replay attack or machine(s) time out of sync.") + + return True + + @classmethod + def is_authorized(cls, scope): + secret = '' + for k, v in scope['headers']: + if k.decode("utf-8") == 'secret': + secret = v.decode("utf-8") + break + WebsocketSecretAuthHelper.verify_secret(secret) -@channel_session_user -def ws_disconnect(message): - discard_groups(message) +class BroadcastConsumer(AsyncJsonWebsocketConsumer): + + async def connect(self): + try: + WebsocketSecretAuthHelper.is_authorized(self.scope) + except Exception: + await self.close() + return + + # TODO: log ip of connected client + logger.info("Client connected") + await self.accept() + await self.channel_layer.group_add(BROADCAST_GROUP, self.channel_name) + + async def disconnect(self, code): + # TODO: log ip of disconnected client + logger.info("Client disconnected") + + async def internal_message(self, event): + await self.send(event['text']) -@channel_session_user -def ws_receive(message): - from awx.main.access import consumer_access - user = message.user - raw_data = message.content['text'] - data = json.loads(raw_data) +class EventConsumer(AsyncJsonWebsocketConsumer): + async def connect(self): + user = self.scope['user'] + if user and not user.is_anonymous: + await self.accept() + await self.send_json({"accept": True, "user": user.id}) + # store the valid CSRF token from the cookie so we can compare it later + # on ws_receive + cookie_token = self.scope['cookies'].get('csrftoken') + if cookie_token: + self.scope['session'][XRF_KEY] = cookie_token + else: + logger.error("Request user is not authenticated to use websocket.") + # TODO: Carry over from channels 1 implementation + # We should never .accept() the client and close without sending a close message + await self.accept() + await self.send_json({"close": True}) + await self.close() - xrftoken = data.get('xrftoken') - if ( - not xrftoken or - XRF_KEY not in message.channel_session or - xrftoken != message.channel_session[XRF_KEY] - ): - logger.error( + @database_sync_to_async + def user_can_see_object_id(self, user_access): + return user_access.get_queryset().filter(pk=oid).exists() + + async def receive_json(self, data): + from awx.main.access import consumer_access + user = self.scope['user'] + xrftoken = data.get('xrftoken') + if ( + not xrftoken or + XRF_KEY not in self.scope["session"] or + xrftoken != self.scope["session"][XRF_KEY] + ): + logger.error( "access denied to channel, XRF mismatch for {}".format(user.username) - ) - message.reply_channel.send({ - "text": json.dumps({"error": "access denied to channel"}) - }) - return + ) + await self.send_json({"error": "access denied to channel"}) + return - if 'groups' in data: - discard_groups(message) - groups = data['groups'] - current_groups = set(message.channel_session.pop('groups') if 'groups' in message.channel_session else []) - for group_name,v in groups.items(): - if type(v) is list: - for oid in v: - name = '{}-{}'.format(group_name, oid) - access_cls = consumer_access(group_name) - if access_cls is not None: - user_access = access_cls(user) - if not user_access.get_queryset().filter(pk=oid).exists(): - message.reply_channel.send({"text": json.dumps( - {"error": "access denied to channel {0} for resource id {1}".format(group_name, oid)})}) - continue - current_groups.add(name) - Group(name).add(message.reply_channel) - else: - current_groups.add(group_name) - Group(group_name).add(message.reply_channel) - message.channel_session['groups'] = list(current_groups) + if 'groups' in data: + groups = data['groups'] + new_groups = set() + current_groups = set(self.scope['session'].pop('groups') if 'groups' in self.scope['session'] else []) + for group_name,v in groups.items(): + if type(v) is list: + for oid in v: + name = '{}-{}'.format(group_name, oid) + access_cls = consumer_access(group_name) + if access_cls is not None: + user_access = access_cls(user) + if not self.user_can_see_object_id(user_access): + await self.send_json({"error": "access denied to channel {0} for resource id {1}".format(group_name, oid)}) + continue + + new_groups.add(name) + else: + if group_name == BROADCAST_GROUP: + logger.warn("Non-priveleged client asked to join broadcast group!") + return + + new_groups.add(name) + + old_groups = current_groups - new_groups + for group_name in old_groups: + await self.channel_layer.group_discard( + group_name, + self.channel_name, + ) + + new_groups_exclusive = new_groups - current_groups + for group_name in new_groups_exclusive: + await self.channel_layer.group_add( + group_name, + self.channel_name + ) + logger.debug(f"Channel {self.channel_name} left groups {old_groups} and joined {new_groups_exclusive}") + self.scope['session']['groups'] = new_groups + + async def internal_message(self, event): + await self.send(event['text']) def emit_channel_notification(group, payload): try: - Group(group).send({"text": json.dumps(payload, cls=DjangoJSONEncoder)}) + payload = json.dumps(payload, cls=DjangoJSONEncoder) except ValueError: logger.error("Invalid payload emitting channel {} on topic: {}".format(group, payload)) + return + + channel_layer = get_channel_layer() + + async_to_sync(channel_layer.group_send)( + group, + { + "type": "internal.message", + "text": payload + }, + ) + + async_to_sync(channel_layer.group_send)( + BROADCAST_GROUP, + { + "type": "internal.message", + "text": wrap_broadcast_msg(group, payload), + }, + ) + diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index 5f081e84f2..f938aab6b5 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -4,8 +4,7 @@ import socket from django.conf import settings from awx.main.dispatch import get_local_queuename -from awx.main.dispatch.kombu import Connection -from kombu import Queue, Exchange, Producer, Consumer +from kombu import Queue, Exchange, Producer, Consumer, Connection logger = logging.getLogger('awx.main.dispatch') @@ -40,7 +39,7 @@ class Control(object): logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename)) reply_queue = Queue(name="amq.rabbitmq.reply-to") self.result = None - with Connection(settings.BROKER_URL) as conn: + with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True): self.publish({'control': command}, conn, reply_to='amq.rabbitmq.reply-to') try: @@ -51,7 +50,7 @@ class Control(object): return self.result def control(self, msg, **kwargs): - with Connection(settings.BROKER_URL) as conn: + with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: self.publish(msg, conn) def process_message(self, body, message): diff --git a/awx/main/dispatch/kombu.py b/awx/main/dispatch/kombu.py deleted file mode 100644 index 94fc7a035e..0000000000 --- a/awx/main/dispatch/kombu.py +++ /dev/null @@ -1,42 +0,0 @@ -from amqp.exceptions import PreconditionFailed -from django.conf import settings -from kombu.connection import Connection as KombuConnection -from kombu.transport import pyamqp - -import logging - -logger = logging.getLogger('awx.main.dispatch') - - -__all__ = ['Connection'] - - -class Connection(KombuConnection): - - def __init__(self, *args, **kwargs): - super(Connection, self).__init__(*args, **kwargs) - class _Channel(pyamqp.Channel): - - def queue_declare(self, queue, *args, **kwargs): - kwargs['durable'] = settings.BROKER_DURABILITY - try: - return super(_Channel, self).queue_declare(queue, *args, **kwargs) - except PreconditionFailed as e: - if "inequivalent arg 'durable'" in getattr(e, 'reply_text', None): - logger.error( - 'queue {} durability is not {}, deleting and recreating'.format( - - queue, - kwargs['durable'] - ) - ) - self.queue_delete(queue) - return super(_Channel, self).queue_declare(queue, *args, **kwargs) - - class _Connection(pyamqp.Connection): - Channel = _Channel - - class _Transport(pyamqp.Transport): - Connection = _Connection - - self.transport_cls = _Transport diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index 9bbd7ae45f..be64594ee3 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -4,9 +4,8 @@ import sys from uuid import uuid4 from django.conf import settings -from kombu import Exchange, Producer +from kombu import Exchange, Producer, Connection, Queue, Consumer -from awx.main.dispatch.kombu import Connection logger = logging.getLogger('awx.main.dispatch') @@ -86,8 +85,13 @@ class task: if callable(queue): queue = queue() if not settings.IS_TESTING(sys.argv): - with Connection(settings.BROKER_URL) as conn: + with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: exchange = Exchange(queue, type=exchange_type or 'direct') + + # HACK: With Redis as the broker declaring an exchange isn't enough to create the queue + # Creating a Consumer _will_ create a queue so that publish will succeed. Note that we + # don't call consume() on the consumer so we don't actually eat any messages + Consumer(conn, queues=[Queue(queue, exchange, routing_key=queue)], accept=['json']) producer = Producer(conn) logger.debug('publish {}({}, queue={})'.format( cls.name, diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 51608a8b7a..58e311f2bb 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,9 +3,8 @@ from django.conf import settings from django.core.management.base import BaseCommand -from kombu import Exchange, Queue +from kombu import Exchange, Queue, Connection -from awx.main.dispatch.kombu import Connection from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker @@ -18,7 +17,7 @@ class Command(BaseCommand): help = 'Launch the job callback receiver' def handle(self, *arg, **options): - with Connection(settings.BROKER_URL) as conn: + with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: consumer = None try: consumer = AWXConsumer( diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 9fd9c3256d..7e69897687 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -11,7 +11,6 @@ from kombu import Exchange, Queue from awx.main.utils.handlers import AWXProxyHandler from awx.main.dispatch import get_local_queuename, reaper from awx.main.dispatch.control import Control -from awx.main.dispatch.kombu import Connection from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumer, TaskWorker from awx.main.dispatch import periodic @@ -63,7 +62,7 @@ class Command(BaseCommand): # in cpython itself: # https://bugs.python.org/issue37429 AWXProxyHandler.disable() - with Connection(settings.BROKER_URL) as conn: + with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: try: bcast = 'tower_broadcast_all' queues = [ diff --git a/awx/main/queue.py b/awx/main/queue.py index 0da0e22e48..3d8a8384eb 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -10,8 +10,7 @@ import os from django.conf import settings # Kombu -from awx.main.dispatch.kombu import Connection -from kombu import Exchange, Producer +from kombu import Exchange, Producer, Connection from kombu.serialization import registry __all__ = ['CallbackQueueDispatcher'] @@ -41,6 +40,7 @@ class CallbackQueueDispatcher(object): def __init__(self): self.callback_connection = getattr(settings, 'BROKER_URL', None) + self.callback_connection_options = getattr(settings, 'BROKER_TRANSPORT_OPTIONS', {}) self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '') self.connection = None self.exchange = None @@ -57,7 +57,7 @@ class CallbackQueueDispatcher(object): if self.connection_pid != active_pid: self.connection = None if self.connection is None: - self.connection = Connection(self.callback_connection) + self.connection = Connection(self.callback_connection, transport_options=self.callback_connection_options) self.exchange = Exchange(self.connection_queue, type='direct') producer = Producer(self.connection) @@ -66,7 +66,7 @@ class CallbackQueueDispatcher(object): compression='bzip2', exchange=self.exchange, declare=[self.exchange], - delivery_mode="persistent" if settings.PERSISTENT_CALLBACK_MESSAGES else "transient", + delivery_mode="transient", routing_key=self.connection_queue) return except Exception as e: diff --git a/awx/main/routing.py b/awx/main/routing.py index 0a49f25c6c..1efb6159d3 100644 --- a/awx/main/routing.py +++ b/awx/main/routing.py @@ -1,8 +1,16 @@ -from channels.routing import route +from django.urls import re_path +from django.conf.urls import url +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter, URLRouter +from . import consumers - -channel_routing = [ - route("websocket.connect", "awx.main.consumers.ws_connect", path=r'^/websocket/$'), - route("websocket.disconnect", "awx.main.consumers.ws_disconnect", path=r'^/websocket/$'), - route("websocket.receive", "awx.main.consumers.ws_receive", path=r'^/websocket/$'), +websocket_urlpatterns = [ + url(r'websocket/$', consumers.EventConsumer), + url(r'websocket/broadcast/$', consumers.BroadcastConsumer), ] + +application = ProtocolTypeRouter({ + 'websocket': AuthMiddlewareStack( + URLRouter(websocket_urlpatterns) + ), +}) diff --git a/awx/main/signals.py b/awx/main/signals.py index 64a35c1f1d..1983b335f1 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -593,16 +593,6 @@ def deny_orphaned_approvals(sender, instance, **kwargs): @receiver(post_save, sender=Session) def save_user_session_membership(sender, **kwargs): session = kwargs.get('instance', None) - if pkg_resources.get_distribution('channels').version >= '2': - # If you get into this code block, it means we upgraded channels, but - # didn't make the settings.SESSIONS_PER_USER feature work - raise RuntimeError( - 'save_user_session_membership must be updated for channels>=2: ' - 'http://channels.readthedocs.io/en/latest/one-to-two.html#requirements' - ) - if 'runworker' in sys.argv: - # don't track user session membership for websocket per-channel sessions - return if not session: return user_id = session.get_decoded().get(SESSION_KEY, None) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index c5c0ad5e89..252e39d708 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -421,7 +421,8 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') BROKER_DURABILITY = True BROKER_POOL_LIMIT = None -BROKER_URL = 'amqp://guest:guest@localhost:5672//' +BROKER_URL = 'redis://localhost:6379;' +BROKER_TRANSPORT_OPTIONS = {} CELERY_DEFAULT_QUEUE = 'awx_private_queue' CELERYBEAT_SCHEDULE = { 'tower_scheduler': { @@ -929,8 +930,6 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False # Internal API URL for use by inventory scripts and callback plugin. INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT -PERSISTENT_CALLBACK_MESSAGES = True -USE_CALLBACK_QUEUE = True CALLBACK_QUEUE = "callback_tasks" SCHEDULER_QUEUE = "scheduler" @@ -965,6 +964,17 @@ LOG_AGGREGATOR_LEVEL = 'INFO' # raising this value can help CHANNEL_LAYER_RECEIVE_MAX_RETRY = 10 +ASGI_APPLICATION = "awx.main.routing.application" + +CHANNEL_LAYERS = { + "default": { + "BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer", + "CONFIG": { + "hosts": [("localhost", 6379)], + }, + }, +} + # Logging configuration. LOGGING = { 'version': 1, @@ -1239,3 +1249,17 @@ MIDDLEWARE = [ 'awx.main.middleware.URLModificationMiddleware', 'awx.main.middleware.SessionTimeoutMiddleware', ] + +# Secret header value to exchange for websockets responsible for distributing websocket messages. +# This needs to be kept secret and randomly generated +BROADCAST_WEBSOCKETS_SECRET = '' + +# Port for broadcast websockets to connect to +# Note: that the clients will follow redirect responses +BROADCAST_WEBSOCKETS_PORT = 443 + +# Whether or not broadcast websockets should check nginx certs when interconnecting +BROADCAST_WEBSOCKETS_VERIFY_CERT = False + +# Connect to other AWX nodes using http or https +BROADCAST_WEBSOCKETS_PROTOCOL = 'https' diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index 42e5a3cd74..8ebaa8747e 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -49,16 +49,18 @@ if "pytest" in sys.modules: } } -# AMQP configuration. -BROKER_URL = "amqp://{}:{}@{}/{}".format(os.environ.get("RABBITMQ_USER"), - os.environ.get("RABBITMQ_PASS"), - os.environ.get("RABBITMQ_HOST"), - urllib.parse.quote(os.environ.get("RABBITMQ_VHOST", "/"), safe='')) +# Use Redis as the message bus for now +# Default to "just works" for single tower docker +BROKER_URL = os.environ.get('BROKER_URL', "redis://redis_1:6379") CHANNEL_LAYERS = { - 'default': {'BACKEND': 'asgi_amqp.AMQPChannelLayer', - 'ROUTING': 'awx.main.routing.channel_routing', - 'CONFIG': {'url': BROKER_URL}} + "default": { + "BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer", + "CONFIG": { + "hosts": [(os.environ.get('REDIS_HOST', 'redis_1'), + int(os.environ.get('REDIS_PORT', 6379)))], + }, + }, } # Absolute filesystem path to the directory to host projects (with playbooks). @@ -238,3 +240,8 @@ TEST_OPENSTACK_PROJECT = '' # Azure credentials. TEST_AZURE_USERNAME = '' TEST_AZURE_KEY_DATA = '' + +BROADCAST_WEBSOCKETS_SECRET = '🤖starscream🤖' +BROADCAST_WEBSOCKETS_PORT = 8013 +BROADCAST_WEBSOCKETS_VERIFY_CERT = False +BROADCAST_WEBSOCKETS_PROTOCOL = 'http' diff --git a/requirements/requirements.in b/requirements/requirements.in index 09f08975a2..b6ba519768 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -1,10 +1,11 @@ +aiohttp ansible-runner ansiconv==1.0.0 # UPGRADE BLOCKER: from 2013, consider replacing instead of upgrading -asgi-amqp>=1.1.4 # see library notes, related to channels 2 azure-keyvault==1.1.0 # see UPGRADE BLOCKERs boto # replacement candidate https://github.com/ansible/awx/issues/2115 -channels==1.1.8 # UPGRADE BLOCKER: Last before backwards-incompatible channels 2 upgrade -daphne==1.4.2 # UPGRADE BLOCKER: last before channels 2 but not pinned by other deps +channels +channels-redis +daphne django==2.2.10 # see UPGRADE BLOCKERs django-auth-ldap django-cors-headers @@ -37,6 +38,7 @@ python3-saml schedule==0.6.0 social-auth-core==3.2.0 # see UPGRADE BLOCKERs social-auth-app-django==3.1.0 # see UPGRADE BLOCKERs +redis requests requests-futures # see library notes slackclient==1.1.2 # see UPGRADE BLOCKERs diff --git a/requirements/requirements.txt b/requirements/requirements.txt index a636429641..d778a73922 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,33 +1,36 @@ adal==1.2.2 # via msrestazure +aiohttp==3.6.2 +aioredis==1.3.1 # via channels-redis amqp==2.5.2 # via kombu ansible-runner==1.4.4 ansiconv==1.0.0 -asgi-amqp==1.1.4 -asgiref==1.1.2 # via asgi-amqp, channels, daphne -attrs==19.3.0 # via automat, jsonschema, twisted -autobahn==19.11.1 # via daphne +asgiref==3.2.3 # via channels, channels-redis, daphne +async-timeout==3.0.1 # via aiohttp, aioredis +attrs==19.3.0 # via aiohttp, automat, jsonschema, service-identity, twisted +autobahn==20.1.1 # via daphne automat==0.8.0 # via twisted -azure-common==1.1.23 # via azure-keyvault +azure-common==1.1.24 # via azure-keyvault azure-keyvault==1.1.0 azure-nspkg==3.0.2 # via azure-keyvault boto==2.49.0 -cachetools==3.1.1 # via google-auth +cachetools==4.0.0 # via google-auth certifi==2019.11.28 # via kubernetes, msrest, requests cffi==1.13.2 # via cryptography -channels==1.1.8 -chardet==3.0.4 # via requests +channels-redis==2.4.1 +channels==2.4.0 +chardet==3.0.4 # via aiohttp, requests constantly==15.1.0 # via twisted -cryptography==2.8 # via adal, autobahn, azure-keyvault -daphne==1.4.2 +cryptography==2.8 # via adal, autobahn, azure-keyvault, pyopenssl, service-identity +daphne==2.4.1 defusedxml==0.6.0 # via python3-openid, python3-saml, social-auth-core dictdiffer==0.8.1 # via openshift django-auth-ldap==2.1.0 -django-cors-headers==3.2.0 +django-cors-headers==3.2.1 django-crum==0.7.5 django-extensions==2.2.5 django-jsonfield==1.2.0 django-oauth-toolkit==1.1.3 -django-pglocks==1.0.3 +django-pglocks==1.0.4 django-polymorphic==2.1.2 django-qsstats-magic==1.1.0 django-radius==1.3.3 @@ -41,24 +44,23 @@ docutils==0.15.2 # via python-daemon future==0.16.0 # via django-radius gitdb2==2.0.6 # via gitpython gitpython==3.0.5 -google-auth==1.9.0 # via kubernetes +google-auth==1.10.0 # via kubernetes +hiredis==1.0.1 # via aioredis hyperlink==19.0.0 # via twisted -idna==2.8 # via hyperlink, requests -importlib-metadata==1.3.0 # via inflect, irc, jsonschema, kombu +idna-ssl==1.1.0 # via aiohttp +idna==2.8 # via hyperlink, idna-ssl, requests, twisted, yarl +importlib-metadata==1.4.0 # via irc, jsonschema, kombu importlib-resources==1.0.2 # via jaraco.text incremental==17.5.0 # via twisted -inflect==3.0.2 # via jaraco.itertools -irc==17.1 +irc==18.0.0 isodate==0.6.0 # via msrest, python3-saml -jaraco.classes==2.0 # via jaraco.collections -jaraco.collections==2.1 # via irc -jaraco.functools==2.0 # via irc, jaraco.text, tempora -jaraco.itertools==4.4.2 # via irc -jaraco.logging==2.0 # via irc +jaraco.classes==3.1.0 # via jaraco.collections +jaraco.collections==3.0.0 # via irc +jaraco.functools==3.0.0 # via irc, jaraco.text, tempora +jaraco.logging==3.0.0 # via irc jaraco.stream==3.0.0 # via irc jaraco.text==3.2.0 # via irc, jaraco.collections jinja2==2.10.3 -jsonpickle==1.2 # via asgi-amqp jsonschema==3.2.0 kombu==4.6.7 # via asgi-amqp kubernetes==10.0.1 # via openshift @@ -66,10 +68,11 @@ lockfile==0.12.2 # via python-daemon lxml==4.4.2 # via xmlsec markdown==3.1.1 markupsafe==1.1.1 # via jinja2 -more-itertools==8.0.2 # via irc, jaraco.functools, jaraco.itertools, zipp -msgpack-python==0.5.6 # via asgi-amqp +more-itertools==8.1.0 # via irc, jaraco.classes, jaraco.functools, zipp +msgpack==0.6.2 # via channels-redis msrest==0.6.10 # via azure-keyvault, msrestazure msrestazure==0.6.2 # via azure-keyvault +multidict==4.7.4 # via aiohttp, yarl netaddr==0.7.19 # via pyrad oauthlib==3.1.0 # via django-oauth-toolkit, requests-oauthlib, social-auth-core openshift==0.10.1 @@ -79,15 +82,16 @@ prometheus-client==0.7.1 psutil==5.6.7 # via ansible-runner psycopg2==2.8.4 ptyprocess==0.6.0 # via pexpect -pyasn1-modules==0.2.7 # via google-auth, python-ldap -pyasn1==0.4.8 # via pyasn1-modules, python-ldap, rsa +pyasn1-modules==0.2.8 # via google-auth, python-ldap, service-identity +pyasn1==0.4.8 # via pyasn1-modules, python-ldap, rsa, service-identity pycparser==2.19 # via cffi pygerduty==0.38.2 pyhamcrest==1.9.0 # via twisted pyjwt==1.7.1 # via adal, social-auth-core, twilio -pyparsing==2.4.5 +pyopenssl==19.1.0 # via twisted +pyparsing==2.4.6 pyrad==2.2 # via django-radius -pyrsistent==0.15.6 # via jsonschema +pyrsistent==0.15.7 # via jsonschema python-daemon==2.2.4 # via ansible-runner python-dateutil==2.8.1 # via adal, kubernetes python-ldap==3.2.0 # via django-auth-ldap @@ -98,6 +102,8 @@ python3-openid==3.1.0 # via social-auth-core python3-saml==1.9.0 pytz==2019.3 # via django, irc, tempora, twilio pyyaml==5.2 # via ansible-runner, djangorestframework-yaml, kubernetes +pyyaml==5.3 # via ansible-runner, djangorestframework-yaml, kubernetes +redis==3.3.11 requests-futures==1.0.0 requests-oauthlib==1.3.0 # via kubernetes, msrest, social-auth-core requests==2.22.0 @@ -112,17 +118,19 @@ social-auth-app-django==3.1.0 social-auth-core==3.2.0 sqlparse==0.3.0 # via django tacacs_plus==1.0 -tempora==1.14.1 # via irc, jaraco.logging -twilio==6.35.1 -twisted==19.10.0 # via daphne +tempora==2.1.0 # via irc, jaraco.logging +twilio==6.35.2 +twisted[tls]==19.10.0 # via daphne txaio==18.8.1 # via autobahn +typing-extensions==3.7.4.1 # via aiohttp urllib3==1.25.7 # via kubernetes, requests uwsgi==2.0.18 uwsgitop==0.11 vine==1.3.0 # via amqp -websocket-client==0.56.0 # via kubernetes, slackclient +websocket-client==0.57.0 # via kubernetes, slackclient xmlsec==1.3.3 # via python3-saml -zipp==0.6.0 # via importlib-metadata +yarl==1.4.2 # via aiohttp +zipp==1.0.0 # via importlib-metadata zope.interface==4.7.1 # via twisted # The following packages are considered to be unsafe in a requirements file: diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml index 860a7f481d..caa2a7540f 100644 --- a/tools/docker-compose-cluster.yml +++ b/tools/docker-compose-cluster.yml @@ -7,45 +7,50 @@ services: dockerfile: Dockerfile-haproxy container_name: tools_haproxy_1 depends_on: - - "awx_1" - - "awx_2" - - "awx_3" + - "awx-1" + - "awx-2" + - "awx-3" ports: - "8013:8013" - "8043:8043" - "1936:1936" - "15672:15672" - awx_1: + awx-1: user: ${CURRENT_UID} container_name: tools_awx_1_1 privileged: true image: ${DEV_DOCKER_TAG_BASE}/awx_devel:${TAG} - hostname: awx_1 + hostname: awx-1 + #entrypoint: ["bash"] environment: CURRENT_UID: - RABBITMQ_HOST: rabbitmq_1 - RABBITMQ_USER: guest - RABBITMQ_PASS: guest - RABBITMQ_VHOST: / + # BROKER_URL will go away when we use postgres as our message broker + BROKER_URL: "redis://redis_1:63791" + REDIS_HOST: redis_1 + REDIS_PORT: 63791 SDB_HOST: 0.0.0.0 SDB_PORT: 5899 AWX_GROUP_QUEUES: alpha,tower + command: /start_development.sh + working_dir: "/awx_devel" volumes: - "../:/awx_devel" ports: - "5899-5999:5899-5999" - awx_2: + awx-2: user: ${CURRENT_UID} container_name: tools_awx_2_1 privileged: true image: ${DEV_DOCKER_TAG_BASE}/awx_devel:${TAG} - hostname: awx_2 + hostname: awx-2 + command: /start_development.sh + working_dir: "/awx_devel" environment: CURRENT_UID: - RABBITMQ_HOST: rabbitmq_2 - RABBITMQ_USER: guest - RABBITMQ_PASS: guest - RABBITMQ_VHOST: / + # BROKER_URL will go away when we use postgres as our message broker + BROKER_URL: "redis://redis_1:63791" + REDIS_HOST: redis_2 + REDIS_PORT: 63792 SDB_HOST: 0.0.0.0 SDB_PORT: 7899 AWX_GROUP_QUEUES: bravo,tower @@ -53,18 +58,21 @@ services: - "../:/awx_devel" ports: - "7899-7999:7899-7999" - awx_3: + awx-3: user: ${CURRENT_UID} container_name: tools_awx_3_1 privileged: true image: ${DEV_DOCKER_TAG_BASE}/awx_devel:${TAG} - hostname: awx_3 + hostname: awx-3 + entrypoint: ["bash"] + command: /start_development.sh + working_dir: "/awx_devel" environment: CURRENT_UID: - RABBITMQ_HOST: rabbitmq_3 - RABBITMQ_USER: guest - RABBITMQ_PASS: guest - RABBITMQ_VHOST: / + # BROKER_URL will go away when we use postgres as our message broker + BROKER_URL: "redis://redis_1:63791" + REDIS_HOST: redis_3 + REDIS_PORT: 63793 SDB_HOST: 0.0.0.0 SDB_PORT: 8899 AWX_GROUP_QUEUES: charlie,tower @@ -72,24 +80,33 @@ services: - "../:/awx_devel" ports: - "8899-8999:8899-8999" - rabbitmq_1: - image: ${DEV_DOCKER_TAG_BASE}/rabbit_cluster_node:latest - hostname: rabbitmq_1 - container_name: tools_rabbitmq_1_1 - rabbitmq_2: - image: ${DEV_DOCKER_TAG_BASE}/rabbit_cluster_node:latest - hostname: rabbitmq_2 - container_name: tools_rabbitmq_2_1 - environment: - - CLUSTERED=true - - CLUSTER_WITH=rabbitmq_1 - rabbitmq_3: - image: ${DEV_DOCKER_TAG_BASE}/rabbit_cluster_node:latest - hostname: rabbitmq_3 - container_name: tools_rabbitmq_3_1 - environment: - - CLUSTERED=true - - CLUSTER_WITH=rabbitmq_1 + redis_1: + image: redis:latest + hostname: redis_1 + container_name: tools_redis_1_1 + command: "redis-server /usr/local/etc/redis/redis.conf" + volumes: + - "./redis/redis_1.conf:/usr/local/etc/redis/redis.conf" + ports: + - "63791:63791" + redis_2: + image: redis:latest + hostname: redis_2 + container_name: tools_redis_2_1 + command: "redis-server /usr/local/etc/redis/redis.conf" + volumes: + - "./redis/redis_2.conf:/usr/local/etc/redis/redis.conf" + ports: + - "63792:63792" + redis_3: + image: redis:latest + hostname: redis_3 + container_name: tools_redis_3_1 + command: "redis-server /usr/local/etc/redis/redis.conf" + volumes: + - "./redis/redis_3.conf:/usr/local/etc/redis/redis.conf" + ports: + - "63793:63793" postgres: image: postgres:10 container_name: tools_postgres_1 diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index aa35901257..353510ce84 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -11,10 +11,6 @@ services: environment: CURRENT_UID: OS: - RABBITMQ_HOST: rabbitmq - RABBITMQ_USER: guest - RABBITMQ_PASS: guest - RABBITMQ_VHOST: / SDB_HOST: 0.0.0.0 SDB_PORT: 7899 AWX_GROUP_QUEUES: tower @@ -28,7 +24,7 @@ services: links: - postgres - memcached - - rabbitmq + - redis # - sync # volumes_from: # - sync @@ -57,8 +53,9 @@ services: container_name: tools_memcached_1 ports: - "11211:11211" - rabbitmq: - image: rabbitmq:3-management - container_name: tools_rabbitmq_1 + redis: + image: redis:latest + container_name: tools_redis_1 ports: - - "15672:15672" + - "6379:6379" + diff --git a/tools/docker-compose/bootstrap_development.sh b/tools/docker-compose/bootstrap_development.sh index dfcbe11420..cd0ce68e8b 100755 --- a/tools/docker-compose/bootstrap_development.sh +++ b/tools/docker-compose/bootstrap_development.sh @@ -4,7 +4,6 @@ set +x # Wait for the databases to come up ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=postgres port=5432" all ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=memcached port=11211" all -ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=${RABBITMQ_HOST} port=5672" all # In case AWX in the container wants to connect to itself, use "docker exec" to attach to the container otherwise # TODO: FIX diff --git a/tools/docker-compose/haproxy.cfg b/tools/docker-compose/haproxy.cfg index 9f4fcfa6e3..d37cbf691a 100644 --- a/tools/docker-compose/haproxy.cfg +++ b/tools/docker-compose/haproxy.cfg @@ -22,11 +22,6 @@ frontend localnodes_ssl mode tcp default_backend nodes_ssl -frontend rabbitctl - bind *:15672 - mode http - default_backend rabbitctl_nodes - backend nodes mode http balance roundrobin @@ -35,28 +30,16 @@ backend nodes http-request set-header X-Forwarded-Port %[dst_port] http-request add-header X-Forwarded-Proto https if { ssl_fc } option httpchk HEAD / HTTP/1.1\r\nHost:localhost - server awx_1 awx_1:8013 check - server awx_2 awx_2:8013 check - server awx_3 awx_3:8013 check + server awx-1 awx-1:8013 check + server awx-2 awx-2:8013 check + server awx-3 awx-3:8013 check backend nodes_ssl mode tcp balance roundrobin - server awx_1 awx_1:8043 - server awx_2 awx_2:8043 - server awx_3 awx_3:8043 - -backend rabbitctl_nodes - mode http - balance roundrobin - option forwardfor - option http-pretend-keepalive - http-request set-header X-Forwarded-Port %[dst_port] - http-request add-header X-Forwarded-Proto https if { ssl_fc } - #option httpchk HEAD / HTTP/1.1\r\nHost:localhost - server rabbitmq_1 rabbitmq_1:15672 - server rabbitmq_2 rabbitmq_2:15672 - server rabbitmq_3 rabbitmq_3:15672 + server awx-1 awx-1:8043 + server awx-2 awx-2:8043 + server awx-3 awx-3:8043 listen stats bind *:1936 diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 84ebaaf291..c0e649950d 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -27,14 +27,6 @@ redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 -[program:awx-runworker] -command = make runworker -autostart = true -autorestart = true -redirect_stderr=true -stdout_logfile=/dev/fd/1 -stdout_logfile_maxbytes=0 - [program:awx-uwsgi] command = make uwsgi autostart = true @@ -64,7 +56,7 @@ stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 [group:tower-processes] -programs=awx-dispatcher,awx-receiver,awx-runworker,awx-uwsgi,awx-daphne,awx-nginx +programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx priority=5 [unix_http_server] diff --git a/tools/redis/redis_1.conf b/tools/redis/redis_1.conf new file mode 100644 index 0000000000..8b6f3784d7 --- /dev/null +++ b/tools/redis/redis_1.conf @@ -0,0 +1,4 @@ +protected-mode no +port 63791 +dir . +logfile "/tmp/redis.log" diff --git a/tools/redis/redis_2.conf b/tools/redis/redis_2.conf new file mode 100644 index 0000000000..7f02a91f4b --- /dev/null +++ b/tools/redis/redis_2.conf @@ -0,0 +1,4 @@ +protected-mode no +port 63792 +dir . +logfile "/tmp/redis.log" diff --git a/tools/redis/redis_3.conf b/tools/redis/redis_3.conf new file mode 100644 index 0000000000..203d723421 --- /dev/null +++ b/tools/redis/redis_3.conf @@ -0,0 +1,4 @@ +protected-mode no +port 63793 +dir . +logfile "/tmp/redis.log"