From d6594ab60240e058d630d1353bb0e46ab76d3c0c Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 21 Feb 2020 10:21:41 -0500 Subject: [PATCH] add broadcast websocket metrics * Gather brroadcast websocket metrics and push them into redis every configurable seconds. * Pop metrics from redis in web view layer to display via the api on demand --- awx/api/urls/urls.py | 6 +- awx/api/views/metrics.py | 16 ++ awx/api/views/root.py | 1 + awx/main/analytics/broadcast_websocket.py | 165 ++++++++++++++++++ awx/main/consumers.py | 23 +-- .../management/commands/run_wsbroadcast.py | 2 - awx/main/wsbroadcast.py | 40 +++-- awx/settings/defaults.py | 19 +- awx/settings/local_settings.py.docker_compose | 8 +- 9 files changed, 246 insertions(+), 34 deletions(-) create mode 100644 awx/main/analytics/broadcast_websocket.py diff --git a/awx/api/urls/urls.py b/awx/api/urls/urls.py index ab7d61fd23..4e5aa5b2f5 100644 --- a/awx/api/urls/urls.py +++ b/awx/api/urls/urls.py @@ -34,7 +34,10 @@ from awx.api.views import ( OAuth2ApplicationDetail, ) -from awx.api.views.metrics import MetricsView +from awx.api.views.metrics import ( + MetricsView, + BroadcastWebsocketMetricsView, +) from .organization import urls as organization_urls from .user import urls as user_urls @@ -93,6 +96,7 @@ v2_urls = [ url(r'^tokens/$', OAuth2TokenList.as_view(), name='o_auth2_token_list'), url(r'^', include(oauth2_urls)), url(r'^metrics/$', MetricsView.as_view(), name='metrics_view'), + url(r'^broadcast_websocket_metrics/$', BroadcastWebsocketMetricsView.as_view(), name='broadcast_websocket_metrics_view'), url(r'^ping/$', ApiV2PingView.as_view(), name='api_v2_ping_view'), url(r'^config/$', ApiV2ConfigView.as_view(), name='api_v2_config_view'), url(r'^config/subscriptions/$', ApiV2SubscriptionView.as_view(), name='api_v2_subscription_view'), diff --git a/awx/api/views/metrics.py b/awx/api/views/metrics.py index 092e36efde..64948e6bd1 100644 --- a/awx/api/views/metrics.py +++ b/awx/api/views/metrics.py @@ -15,6 +15,7 @@ from rest_framework.exceptions import PermissionDenied # AWX # from awx.main.analytics import collectors from awx.main.analytics.metrics import metrics +from awx.main.analytics.broadcast_websocket import BroadcastWebsocketStatsManager from awx.api import renderers from awx.api.generics import ( @@ -39,3 +40,18 @@ class MetricsView(APIView): if (request.user.is_superuser or request.user.is_system_auditor): return Response(metrics().decode('UTF-8')) raise PermissionDenied() + +class BroadcastWebsocketMetricsView(APIView): + name = _('Broadcast Websockets') + swagger_topic = 'Broadcast Websockets' + + renderer_classes = [renderers.PlainTextRenderer, + renderers.PrometheusJSONRenderer, + renderers.BrowsableAPIRenderer,] + + def get(self, request): + ''' Show Metrics Details ''' + if (request.user.is_superuser or request.user.is_system_auditor): + stats_str = BroadcastWebsocketStatsManager.get_stats_sync() or b'' + return Response(stats_str.decode('UTF-8')) + raise PermissionDenied() diff --git a/awx/api/views/root.py b/awx/api/views/root.py index 4a15936e9b..3ac0759530 100644 --- a/awx/api/views/root.py +++ b/awx/api/views/root.py @@ -107,6 +107,7 @@ class ApiVersionRootView(APIView): data['applications'] = reverse('api:o_auth2_application_list', request=request) data['tokens'] = reverse('api:o_auth2_token_list', request=request) data['metrics'] = reverse('api:metrics_view', request=request) + data['broadcast_websocket_metrics'] = reverse('api:broadcast_websocket_metrics_view', request=request) data['inventory'] = reverse('api:inventory_list', request=request) data['inventory_scripts'] = reverse('api:inventory_script_list', request=request) data['inventory_sources'] = reverse('api:inventory_source_list', request=request) diff --git a/awx/main/analytics/broadcast_websocket.py b/awx/main/analytics/broadcast_websocket.py new file mode 100644 index 0000000000..08596eb5b6 --- /dev/null +++ b/awx/main/analytics/broadcast_websocket.py @@ -0,0 +1,165 @@ +import datetime +import os +import asyncio +import logging +import json +import aioredis +import redis + +from prometheus_client import ( + generate_latest, + Gauge, + Counter, + Enum, + Histogram, + Enum, + CollectorRegistry, + parser, +) + +from django.conf import settings + + +BROADCAST_WEBSOCKET_REDIS_KEY_NAME = 'broadcast_websocket_stats' + + +logger = logging.getLogger('awx.main.analytics.broadcast_websocket') + + +def dt_to_seconds(dt): + return int((dt - datetime.datetime(1970,1,1)).total_seconds()) + + +def now_seconds(): + return dt_to_seconds(datetime.datetime.now()) + + +# Second granularity; Per-minute +class FixedSlidingWindow(): + def __init__(self, start_time=None): + self.buckets = dict() + self.start_time = start_time or now_seconds() + + def cleanup(self, now_bucket=now_seconds()): + if self.start_time + 60 >= now_bucket: + self.start_time = now_bucket - 60 + 1 + + # Delete old entries + for k,v in self.buckets.items(): + if k < self.start_time: + del self.buckets[k] + + def record(self, ts=datetime.datetime.now()): + now_bucket = int((ts-datetime.datetime(1970,1,1)).total_seconds()) + + val = self.buckets.get(now_bucket, 0) + self.buckets[now_bucket] = val + 1 + + self.cleanup(now_bucket) + + def render(self): + self.cleanup() + return sum(self.buckets.values()) or 0 + + +class BroadcastWebsocketStatsManager(): + def __init__(self, event_loop, local_hostname): + self._local_hostname = local_hostname + + self._event_loop = event_loop + self._stats = dict() + self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME + + def new_remote_host_stats(self, remote_hostname): + self._stats[remote_hostname] = BroadcastWebsocketStats(self._local_hostname, + remote_hostname) + return self._stats[remote_hostname] + + def delete_remote_host_stats(self, remote_hostname): + del self._stats[remote_hostname] + + async def run_loop(self): + try: + redis_conn = await aioredis.create_redis_pool(settings.BROKER_URL) + while True: + stats_data_str = ''.join(stat.serialize() for stat in self._stats.values()) + await redis_conn.set(self._redis_key, stats_data_str) + + await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS) + except Exception as e: + logger.warn(e) + await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS) + self.start() + + def start(self): + self.async_task = self._event_loop.create_task(self.run_loop()) + return self.async_task + + @classmethod + def get_stats_sync(cls): + ''' + Stringified verion of all the stats + ''' + redis_conn = redis.Redis.from_url(settings.BROKER_URL) + return redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) + + +class BroadcastWebsocketStats(): + def __init__(self, local_hostname, remote_hostname): + self._local_hostname = local_hostname + self._remote_hostname = remote_hostname + self._registry = CollectorRegistry() + + # TODO: More robust replacement + self.name = self._local_hostname.replace('-', '_') + self.remote_name = self._remote_hostname.replace('-', '_') + + self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total', + 'Number of messages received, to be forwarded, by the broadcast websocket system', + registry=self._registry) + self._messages_received = Gauge(f'awx_{self.remote_name}_messages_received', + 'Number of messages received, to be forwarded, by the broadcast websocket system, for the duration of the current connection', + registry=self._registry) + self._connection = Enum(f'awx_{self.remote_name}_connection', + 'Websocket broadcast connection', + states=['disconnected', 'connected'], + registry=self._registry) + self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start', + 'Time the connection was established', + registry=self._registry) + + self._messages_received_per_minute = Gauge(f'awx_{self.remote_name}_messages_received_per_minute', + 'Messages received per minute', + registry=self._registry) + self._internal_messages_received_per_minute = FixedSlidingWindow() + + def unregister(self): + self._registry.unregister(f'awx_{self.remote_name}_messages_received') + self._registry.unregister(f'awx_{self.remote_name}_connection') + + def record_message_received(self): + self._internal_messages_received_per_minute.record() + self._messages_received.inc() + self._messages_received_total.inc() + + def record_connection_established(self): + self._connection.state('connected') + self._connection_start.set_to_current_time() + self._messages_received.set(0) + + def record_connection_lost(self): + self._connection.state('disconnected') + + def get_connection_duration(self): + return (datetime.datetime.now() - self._connection_established_ts).total_seconds() + + def render(self): + msgs_per_min = self._internal_messages_received_per_minute.render() + self._messages_received_per_minute.set(msgs_per_min) + + def serialize(self): + self.render() + + data = {} + registry_data = generate_latest(self._registry).decode('UTF-8') + return registry_data diff --git a/awx/main/consumers.py b/awx/main/consumers.py index 68758210e8..07ee6e042c 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -21,12 +21,9 @@ from channels.db import database_sync_to_async from asgiref.sync import async_to_sync -from awx.main.wsbroadcast import wrap_broadcast_msg - logger = logging.getLogger('awx.main.consumers') XRF_KEY = '_auth_user_xrf' -BROADCAST_GROUP = 'broadcast-group_send' class WebsocketSecretAuthHelper: @@ -42,12 +39,12 @@ class WebsocketSecretAuthHelper: def construct_secret(cls): nonce_serialized = "{}".format(int((datetime.datetime.utcnow()-datetime.datetime.fromtimestamp(0)).total_seconds())) payload_dict = { - 'secret': settings.BROADCAST_WEBSOCKETS_SECRET, + 'secret': settings.BROADCAST_WEBSOCKET_SECRET, 'nonce': nonce_serialized } payload_serialized = json.dumps(payload_dict) - secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET), + secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKET_SECRET), msg=force_bytes(payload_serialized), digestmod='sha256').hexdigest() @@ -68,14 +65,14 @@ class WebsocketSecretAuthHelper: try: payload_expected = { - 'secret': settings.BROADCAST_WEBSOCKETS_SECRET, + 'secret': settings.BROADCAST_WEBSOCKET_SECRET, 'nonce': nonce_parsed, } payload_serialized = json.dumps(payload_expected) except Exception: raise ValueError("Failed to create hash to compare to secret.") - secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET), + secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKET_SECRET), msg=force_bytes(payload_serialized), digestmod='sha256').hexdigest() @@ -114,7 +111,7 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer): # TODO: log ip of connected client logger.info(f"Broadcast client connected.") await self.accept() - await self.channel_layer.group_add(BROADCAST_GROUP, self.channel_name) + await self.channel_layer.group_add(settings.BROADCAST_WEBSOCKET_GROUP_NAME, self.channel_name) async def disconnect(self, code): # TODO: log ip of disconnected client @@ -185,7 +182,7 @@ class EventConsumer(AsyncJsonWebsocketConsumer): continue new_groups.add(name) else: - if group_name == BROADCAST_GROUP: + if group_name == settings.BROADCAST_WEBSOCKET_GROUP_NAME: logger.warn("Non-priveleged client asked to join broadcast group!") return @@ -235,6 +232,8 @@ def _dump_payload(payload): async def emit_channel_notification_async(group, payload): + from awx.main.wsbroadcast import wrap_broadcast_msg # noqa + payload_dumped = _dump_payload(payload) if payload_dumped is None: return @@ -249,7 +248,7 @@ async def emit_channel_notification_async(group, payload): ) await channel_layer.group_send( - BROADCAST_GROUP, + settings.BROADCAST_WEBSOCKET_GROUP_NAME, { "type": "internal.message", "text": wrap_broadcast_msg(group, payload_dumped), @@ -258,6 +257,8 @@ async def emit_channel_notification_async(group, payload): def emit_channel_notification(group, payload): + from awx.main.wsbroadcast import wrap_broadcast_msg # noqa + payload_dumped = _dump_payload(payload) if payload_dumped is None: return @@ -273,7 +274,7 @@ def emit_channel_notification(group, payload): )) run_sync(channel_layer.group_send( - BROADCAST_GROUP, + settings.BROADCAST_WEBSOCKET_GROUP_NAME, { "type": "internal.message", "text": wrap_broadcast_msg(group, payload_dumped), diff --git a/awx/main/management/commands/run_wsbroadcast.py b/awx/main/management/commands/run_wsbroadcast.py index 63e8ed6b94..cb684a3577 100644 --- a/awx/main/management/commands/run_wsbroadcast.py +++ b/awx/main/management/commands/run_wsbroadcast.py @@ -23,5 +23,3 @@ class Command(BaseCommand): loop.run_until_complete(task) except KeyboardInterrupt: logger.debug('Terminating Websocket Broadcaster') - if broadcast_websocket_mgr: - broadcast_websocket_mgr.stop() diff --git a/awx/main/wsbroadcast.py b/awx/main/wsbroadcast.py index d5100793e4..7aef604f52 100644 --- a/awx/main/wsbroadcast.py +++ b/awx/main/wsbroadcast.py @@ -15,6 +15,11 @@ from django.conf import settings from django.apps import apps from django.core.serializers.json import DjangoJSONEncoder +from awx.main.analytics.broadcast_websocket import ( + BroadcastWebsocketStats, + BroadcastWebsocketStatsManager, +) + logger = logging.getLogger('awx.main.wsbroadcast') @@ -48,13 +53,15 @@ class WebsocketTask(): def __init__(self, name, event_loop, + stats: BroadcastWebsocketStats, remote_host: str, - remote_port: int=settings.BROADCAST_WEBSOCKETS_PORT, - protocol: str=settings.BROADCAST_WEBSOCKETS_PROTOCOL, - verify_ssl: bool=settings.BROADCAST_WEBSOCKETS_VERIFY_CERT, + remote_port: int=settings.BROADCAST_WEBSOCKET_PORT, + protocol: str=settings.BROADCAST_WEBSOCKET_PROTOCOL, + verify_ssl: bool=settings.BROADCAST_WEBSOCKET_VERIFY_CERT, endpoint: str='broadcast'): self.name = name self.event_loop = event_loop + self.stats = stats self.remote_host = remote_host self.remote_port = remote_port self.endpoint = endpoint @@ -78,7 +85,7 @@ class WebsocketTask(): try: if attempt > 0: - await asyncio.sleep(5) + await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS) except asyncio.CancelledError: logger.warn(f"{self.name} connection to {self.remote_host} cancelled") raise @@ -91,17 +98,20 @@ class WebsocketTask(): async with aiohttp.ClientSession(headers={'secret': secret_val}, timeout=timeout) as session: async with session.ws_connect(uri, ssl=self.verify_ssl) as websocket: + self.stats.record_connection_established() attempt = 0 await self.run_loop(websocket) except asyncio.CancelledError: # TODO: Check if connected and disconnect # Possibly use run_until_complete() if disconnect is async logger.warn(f"{self.name} connection to {self.remote_host} cancelled") + self.stats.record_connection_lost() raise except Exception as e: # Early on, this is our canary. I'm not sure what exceptions we can really encounter. # Does aiohttp throws an exception if a disconnect happens? logger.warn(f"Websocket broadcast client exception {str(e)}") + self.stats.record_connection_lost() # Reconnect self.start(attempt=attempt+1) @@ -115,6 +125,7 @@ class WebsocketTask(): class BroadcastWebsocketTask(WebsocketTask): async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): async for msg in websocket: + self.stats.record_message_received() if msg.type == aiohttp.WSMsgType.ERROR: break @@ -137,9 +148,11 @@ class BroadcastWebsocketManager(object): def __init__(self): self.event_loop = asyncio.get_event_loop() self.broadcast_tasks = dict() + # parallel dict to broadcast_tasks that tracks stats + self.local_hostname = get_local_host() + self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname) - async def run_loop(self): - local_hostname = get_local_host() + async def run_per_host_websocket(self): while True: future_remote_hosts = get_broadcast_hosts() @@ -148,23 +161,28 @@ class BroadcastWebsocketManager(object): new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) if deleted_remote_hosts: - logger.warn(f"{local_hostname} going to remove {deleted_remote_hosts} from the websocket broadcast list") + logger.warn(f"{self.local_hostname} going to remove {deleted_remote_hosts} from the websocket broadcast list") if new_remote_hosts: - logger.warn(f"{local_hostname} going to add {new_remote_hosts} to the websocket broadcast list") + logger.warn(f"{self.local_hostname} going to add {new_remote_hosts} to the websocket broadcast list") for h in deleted_remote_hosts: self.broadcast_tasks[h].cancel() del self.broadcast_tasks[h] + self.stats_mgr.delete_remote_host_stats(h) for h in new_remote_hosts: - broadcast_task = BroadcastWebsocketTask(name=local_hostname, + stats = self.stats_mgr.new_remote_host_stats(h) + broadcast_task = BroadcastWebsocketTask(name=self.local_hostname, event_loop=self.event_loop, + stats=stats, remote_host=h) broadcast_task.start() self.broadcast_tasks[h] = broadcast_task - await asyncio.sleep(settings.BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS) + await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS) def start(self): - self.async_task = self.event_loop.create_task(self.run_loop()) + self.stats_mgr.start() + + self.async_task = self.event_loop.create_task(self.run_per_host_websocket()) return self.async_task diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index f464369b78..47d6a3522f 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1235,17 +1235,26 @@ MIDDLEWARE = [ # Secret header value to exchange for websockets responsible for distributing websocket messages. # This needs to be kept secret and randomly generated -BROADCAST_WEBSOCKETS_SECRET = '' +BROADCAST_WEBSOCKET_SECRET = '' # Port for broadcast websockets to connect to # Note: that the clients will follow redirect responses -BROADCAST_WEBSOCKETS_PORT = 443 +BROADCAST_WEBSOCKET_PORT = 443 # Whether or not broadcast websockets should check nginx certs when interconnecting -BROADCAST_WEBSOCKETS_VERIFY_CERT = False +BROADCAST_WEBSOCKET_VERIFY_CERT = False # Connect to other AWX nodes using http or https -BROADCAST_WEBSOCKETS_PROTOCOL = 'https' +BROADCAST_WEBSOCKET_PROTOCOL = 'https' + +# All websockets that connect to the broadcast websocket endpoint will be put into this group +BROADCAST_WEBSOCKET_GROUP_NAME = 'broadcast-group_send' + +# Time wait before retrying connecting to a websocket broadcast tower node +BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS = 5 # How often websocket process will look for changes in the Instance table -BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS = 10 +BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10 + +# How often websocket process will generate stats +BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index a22bbda98f..8d785e62ef 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -244,7 +244,7 @@ TEST_OPENSTACK_PROJECT = '' TEST_AZURE_USERNAME = '' TEST_AZURE_KEY_DATA = '' -BROADCAST_WEBSOCKETS_SECRET = '🤖starscream🤖' -BROADCAST_WEBSOCKETS_PORT = 8013 -BROADCAST_WEBSOCKETS_VERIFY_CERT = False -BROADCAST_WEBSOCKETS_PROTOCOL = 'http' +BROADCAST_WEBSOCKET_SECRET = '🤖starscream🤖' +BROADCAST_WEBSOCKET_PORT = 8013 +BROADCAST_WEBSOCKET_VERIFY_CERT = False +BROADCAST_WEBSOCKET_PROTOCOL = 'http'