add broadcast websocket metrics

* Gather brroadcast websocket metrics and push them into redis every
configurable seconds.
* Pop metrics from redis in web view layer to display via the api on
demand
This commit is contained in:
chris meyers 2020-02-21 10:21:41 -05:00 committed by Ryan Petrello
parent b6b9802f9e
commit d6594ab602
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
9 changed files with 246 additions and 34 deletions

View File

@ -34,7 +34,10 @@ from awx.api.views import (
OAuth2ApplicationDetail,
)
from awx.api.views.metrics import MetricsView
from awx.api.views.metrics import (
MetricsView,
BroadcastWebsocketMetricsView,
)
from .organization import urls as organization_urls
from .user import urls as user_urls
@ -93,6 +96,7 @@ v2_urls = [
url(r'^tokens/$', OAuth2TokenList.as_view(), name='o_auth2_token_list'),
url(r'^', include(oauth2_urls)),
url(r'^metrics/$', MetricsView.as_view(), name='metrics_view'),
url(r'^broadcast_websocket_metrics/$', BroadcastWebsocketMetricsView.as_view(), name='broadcast_websocket_metrics_view'),
url(r'^ping/$', ApiV2PingView.as_view(), name='api_v2_ping_view'),
url(r'^config/$', ApiV2ConfigView.as_view(), name='api_v2_config_view'),
url(r'^config/subscriptions/$', ApiV2SubscriptionView.as_view(), name='api_v2_subscription_view'),

View File

@ -15,6 +15,7 @@ from rest_framework.exceptions import PermissionDenied
# AWX
# from awx.main.analytics import collectors
from awx.main.analytics.metrics import metrics
from awx.main.analytics.broadcast_websocket import BroadcastWebsocketStatsManager
from awx.api import renderers
from awx.api.generics import (
@ -39,3 +40,18 @@ class MetricsView(APIView):
if (request.user.is_superuser or request.user.is_system_auditor):
return Response(metrics().decode('UTF-8'))
raise PermissionDenied()
class BroadcastWebsocketMetricsView(APIView):
name = _('Broadcast Websockets')
swagger_topic = 'Broadcast Websockets'
renderer_classes = [renderers.PlainTextRenderer,
renderers.PrometheusJSONRenderer,
renderers.BrowsableAPIRenderer,]
def get(self, request):
''' Show Metrics Details '''
if (request.user.is_superuser or request.user.is_system_auditor):
stats_str = BroadcastWebsocketStatsManager.get_stats_sync() or b''
return Response(stats_str.decode('UTF-8'))
raise PermissionDenied()

View File

@ -107,6 +107,7 @@ class ApiVersionRootView(APIView):
data['applications'] = reverse('api:o_auth2_application_list', request=request)
data['tokens'] = reverse('api:o_auth2_token_list', request=request)
data['metrics'] = reverse('api:metrics_view', request=request)
data['broadcast_websocket_metrics'] = reverse('api:broadcast_websocket_metrics_view', request=request)
data['inventory'] = reverse('api:inventory_list', request=request)
data['inventory_scripts'] = reverse('api:inventory_script_list', request=request)
data['inventory_sources'] = reverse('api:inventory_source_list', request=request)

View File

@ -0,0 +1,165 @@
import datetime
import os
import asyncio
import logging
import json
import aioredis
import redis
from prometheus_client import (
generate_latest,
Gauge,
Counter,
Enum,
Histogram,
Enum,
CollectorRegistry,
parser,
)
from django.conf import settings
BROADCAST_WEBSOCKET_REDIS_KEY_NAME = 'broadcast_websocket_stats'
logger = logging.getLogger('awx.main.analytics.broadcast_websocket')
def dt_to_seconds(dt):
return int((dt - datetime.datetime(1970,1,1)).total_seconds())
def now_seconds():
return dt_to_seconds(datetime.datetime.now())
# Second granularity; Per-minute
class FixedSlidingWindow():
def __init__(self, start_time=None):
self.buckets = dict()
self.start_time = start_time or now_seconds()
def cleanup(self, now_bucket=now_seconds()):
if self.start_time + 60 >= now_bucket:
self.start_time = now_bucket - 60 + 1
# Delete old entries
for k,v in self.buckets.items():
if k < self.start_time:
del self.buckets[k]
def record(self, ts=datetime.datetime.now()):
now_bucket = int((ts-datetime.datetime(1970,1,1)).total_seconds())
val = self.buckets.get(now_bucket, 0)
self.buckets[now_bucket] = val + 1
self.cleanup(now_bucket)
def render(self):
self.cleanup()
return sum(self.buckets.values()) or 0
class BroadcastWebsocketStatsManager():
def __init__(self, event_loop, local_hostname):
self._local_hostname = local_hostname
self._event_loop = event_loop
self._stats = dict()
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME
def new_remote_host_stats(self, remote_hostname):
self._stats[remote_hostname] = BroadcastWebsocketStats(self._local_hostname,
remote_hostname)
return self._stats[remote_hostname]
def delete_remote_host_stats(self, remote_hostname):
del self._stats[remote_hostname]
async def run_loop(self):
try:
redis_conn = await aioredis.create_redis_pool(settings.BROKER_URL)
while True:
stats_data_str = ''.join(stat.serialize() for stat in self._stats.values())
await redis_conn.set(self._redis_key, stats_data_str)
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS)
except Exception as e:
logger.warn(e)
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS)
self.start()
def start(self):
self.async_task = self._event_loop.create_task(self.run_loop())
return self.async_task
@classmethod
def get_stats_sync(cls):
'''
Stringified verion of all the stats
'''
redis_conn = redis.Redis.from_url(settings.BROKER_URL)
return redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME)
class BroadcastWebsocketStats():
def __init__(self, local_hostname, remote_hostname):
self._local_hostname = local_hostname
self._remote_hostname = remote_hostname
self._registry = CollectorRegistry()
# TODO: More robust replacement
self.name = self._local_hostname.replace('-', '_')
self.remote_name = self._remote_hostname.replace('-', '_')
self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total',
'Number of messages received, to be forwarded, by the broadcast websocket system',
registry=self._registry)
self._messages_received = Gauge(f'awx_{self.remote_name}_messages_received',
'Number of messages received, to be forwarded, by the broadcast websocket system, for the duration of the current connection',
registry=self._registry)
self._connection = Enum(f'awx_{self.remote_name}_connection',
'Websocket broadcast connection',
states=['disconnected', 'connected'],
registry=self._registry)
self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start',
'Time the connection was established',
registry=self._registry)
self._messages_received_per_minute = Gauge(f'awx_{self.remote_name}_messages_received_per_minute',
'Messages received per minute',
registry=self._registry)
self._internal_messages_received_per_minute = FixedSlidingWindow()
def unregister(self):
self._registry.unregister(f'awx_{self.remote_name}_messages_received')
self._registry.unregister(f'awx_{self.remote_name}_connection')
def record_message_received(self):
self._internal_messages_received_per_minute.record()
self._messages_received.inc()
self._messages_received_total.inc()
def record_connection_established(self):
self._connection.state('connected')
self._connection_start.set_to_current_time()
self._messages_received.set(0)
def record_connection_lost(self):
self._connection.state('disconnected')
def get_connection_duration(self):
return (datetime.datetime.now() - self._connection_established_ts).total_seconds()
def render(self):
msgs_per_min = self._internal_messages_received_per_minute.render()
self._messages_received_per_minute.set(msgs_per_min)
def serialize(self):
self.render()
data = {}
registry_data = generate_latest(self._registry).decode('UTF-8')
return registry_data

View File

@ -21,12 +21,9 @@ from channels.db import database_sync_to_async
from asgiref.sync import async_to_sync
from awx.main.wsbroadcast import wrap_broadcast_msg
logger = logging.getLogger('awx.main.consumers')
XRF_KEY = '_auth_user_xrf'
BROADCAST_GROUP = 'broadcast-group_send'
class WebsocketSecretAuthHelper:
@ -42,12 +39,12 @@ class WebsocketSecretAuthHelper:
def construct_secret(cls):
nonce_serialized = "{}".format(int((datetime.datetime.utcnow()-datetime.datetime.fromtimestamp(0)).total_seconds()))
payload_dict = {
'secret': settings.BROADCAST_WEBSOCKETS_SECRET,
'secret': settings.BROADCAST_WEBSOCKET_SECRET,
'nonce': nonce_serialized
}
payload_serialized = json.dumps(payload_dict)
secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET),
secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKET_SECRET),
msg=force_bytes(payload_serialized),
digestmod='sha256').hexdigest()
@ -68,14 +65,14 @@ class WebsocketSecretAuthHelper:
try:
payload_expected = {
'secret': settings.BROADCAST_WEBSOCKETS_SECRET,
'secret': settings.BROADCAST_WEBSOCKET_SECRET,
'nonce': nonce_parsed,
}
payload_serialized = json.dumps(payload_expected)
except Exception:
raise ValueError("Failed to create hash to compare to secret.")
secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKETS_SECRET),
secret_serialized = hmac.new(force_bytes(settings.BROADCAST_WEBSOCKET_SECRET),
msg=force_bytes(payload_serialized),
digestmod='sha256').hexdigest()
@ -114,7 +111,7 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer):
# TODO: log ip of connected client
logger.info(f"Broadcast client connected.")
await self.accept()
await self.channel_layer.group_add(BROADCAST_GROUP, self.channel_name)
await self.channel_layer.group_add(settings.BROADCAST_WEBSOCKET_GROUP_NAME, self.channel_name)
async def disconnect(self, code):
# TODO: log ip of disconnected client
@ -185,7 +182,7 @@ class EventConsumer(AsyncJsonWebsocketConsumer):
continue
new_groups.add(name)
else:
if group_name == BROADCAST_GROUP:
if group_name == settings.BROADCAST_WEBSOCKET_GROUP_NAME:
logger.warn("Non-priveleged client asked to join broadcast group!")
return
@ -235,6 +232,8 @@ def _dump_payload(payload):
async def emit_channel_notification_async(group, payload):
from awx.main.wsbroadcast import wrap_broadcast_msg # noqa
payload_dumped = _dump_payload(payload)
if payload_dumped is None:
return
@ -249,7 +248,7 @@ async def emit_channel_notification_async(group, payload):
)
await channel_layer.group_send(
BROADCAST_GROUP,
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
{
"type": "internal.message",
"text": wrap_broadcast_msg(group, payload_dumped),
@ -258,6 +257,8 @@ async def emit_channel_notification_async(group, payload):
def emit_channel_notification(group, payload):
from awx.main.wsbroadcast import wrap_broadcast_msg # noqa
payload_dumped = _dump_payload(payload)
if payload_dumped is None:
return
@ -273,7 +274,7 @@ def emit_channel_notification(group, payload):
))
run_sync(channel_layer.group_send(
BROADCAST_GROUP,
settings.BROADCAST_WEBSOCKET_GROUP_NAME,
{
"type": "internal.message",
"text": wrap_broadcast_msg(group, payload_dumped),

View File

@ -23,5 +23,3 @@ class Command(BaseCommand):
loop.run_until_complete(task)
except KeyboardInterrupt:
logger.debug('Terminating Websocket Broadcaster')
if broadcast_websocket_mgr:
broadcast_websocket_mgr.stop()

View File

@ -15,6 +15,11 @@ from django.conf import settings
from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder
from awx.main.analytics.broadcast_websocket import (
BroadcastWebsocketStats,
BroadcastWebsocketStatsManager,
)
logger = logging.getLogger('awx.main.wsbroadcast')
@ -48,13 +53,15 @@ class WebsocketTask():
def __init__(self,
name,
event_loop,
stats: BroadcastWebsocketStats,
remote_host: str,
remote_port: int=settings.BROADCAST_WEBSOCKETS_PORT,
protocol: str=settings.BROADCAST_WEBSOCKETS_PROTOCOL,
verify_ssl: bool=settings.BROADCAST_WEBSOCKETS_VERIFY_CERT,
remote_port: int=settings.BROADCAST_WEBSOCKET_PORT,
protocol: str=settings.BROADCAST_WEBSOCKET_PROTOCOL,
verify_ssl: bool=settings.BROADCAST_WEBSOCKET_VERIFY_CERT,
endpoint: str='broadcast'):
self.name = name
self.event_loop = event_loop
self.stats = stats
self.remote_host = remote_host
self.remote_port = remote_port
self.endpoint = endpoint
@ -78,7 +85,7 @@ class WebsocketTask():
try:
if attempt > 0:
await asyncio.sleep(5)
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS)
except asyncio.CancelledError:
logger.warn(f"{self.name} connection to {self.remote_host} cancelled")
raise
@ -91,17 +98,20 @@ class WebsocketTask():
async with aiohttp.ClientSession(headers={'secret': secret_val},
timeout=timeout) as session:
async with session.ws_connect(uri, ssl=self.verify_ssl) as websocket:
self.stats.record_connection_established()
attempt = 0
await self.run_loop(websocket)
except asyncio.CancelledError:
# TODO: Check if connected and disconnect
# Possibly use run_until_complete() if disconnect is async
logger.warn(f"{self.name} connection to {self.remote_host} cancelled")
self.stats.record_connection_lost()
raise
except Exception as e:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter.
# Does aiohttp throws an exception if a disconnect happens?
logger.warn(f"Websocket broadcast client exception {str(e)}")
self.stats.record_connection_lost()
# Reconnect
self.start(attempt=attempt+1)
@ -115,6 +125,7 @@ class WebsocketTask():
class BroadcastWebsocketTask(WebsocketTask):
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
async for msg in websocket:
self.stats.record_message_received()
if msg.type == aiohttp.WSMsgType.ERROR:
break
@ -137,9 +148,11 @@ class BroadcastWebsocketManager(object):
def __init__(self):
self.event_loop = asyncio.get_event_loop()
self.broadcast_tasks = dict()
# parallel dict to broadcast_tasks that tracks stats
self.local_hostname = get_local_host()
self.stats_mgr = BroadcastWebsocketStatsManager(self.event_loop, self.local_hostname)
async def run_loop(self):
local_hostname = get_local_host()
async def run_per_host_websocket(self):
while True:
future_remote_hosts = get_broadcast_hosts()
@ -148,23 +161,28 @@ class BroadcastWebsocketManager(object):
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
if deleted_remote_hosts:
logger.warn(f"{local_hostname} going to remove {deleted_remote_hosts} from the websocket broadcast list")
logger.warn(f"{self.local_hostname} going to remove {deleted_remote_hosts} from the websocket broadcast list")
if new_remote_hosts:
logger.warn(f"{local_hostname} going to add {new_remote_hosts} to the websocket broadcast list")
logger.warn(f"{self.local_hostname} going to add {new_remote_hosts} to the websocket broadcast list")
for h in deleted_remote_hosts:
self.broadcast_tasks[h].cancel()
del self.broadcast_tasks[h]
self.stats_mgr.delete_remote_host_stats(h)
for h in new_remote_hosts:
broadcast_task = BroadcastWebsocketTask(name=local_hostname,
stats = self.stats_mgr.new_remote_host_stats(h)
broadcast_task = BroadcastWebsocketTask(name=self.local_hostname,
event_loop=self.event_loop,
stats=stats,
remote_host=h)
broadcast_task.start()
self.broadcast_tasks[h] = broadcast_task
await asyncio.sleep(settings.BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS)
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
def start(self):
self.async_task = self.event_loop.create_task(self.run_loop())
self.stats_mgr.start()
self.async_task = self.event_loop.create_task(self.run_per_host_websocket())
return self.async_task

View File

@ -1235,17 +1235,26 @@ MIDDLEWARE = [
# Secret header value to exchange for websockets responsible for distributing websocket messages.
# This needs to be kept secret and randomly generated
BROADCAST_WEBSOCKETS_SECRET = ''
BROADCAST_WEBSOCKET_SECRET = ''
# Port for broadcast websockets to connect to
# Note: that the clients will follow redirect responses
BROADCAST_WEBSOCKETS_PORT = 443
BROADCAST_WEBSOCKET_PORT = 443
# Whether or not broadcast websockets should check nginx certs when interconnecting
BROADCAST_WEBSOCKETS_VERIFY_CERT = False
BROADCAST_WEBSOCKET_VERIFY_CERT = False
# Connect to other AWX nodes using http or https
BROADCAST_WEBSOCKETS_PROTOCOL = 'https'
BROADCAST_WEBSOCKET_PROTOCOL = 'https'
# All websockets that connect to the broadcast websocket endpoint will be put into this group
BROADCAST_WEBSOCKET_GROUP_NAME = 'broadcast-group_send'
# Time wait before retrying connecting to a websocket broadcast tower node
BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS = 5
# How often websocket process will look for changes in the Instance table
BROADCAST_WEBSOCKETS_NEW_INSTANCE_POLL_RATE_SECONDS = 10
BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10
# How often websocket process will generate stats
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5

View File

@ -244,7 +244,7 @@ TEST_OPENSTACK_PROJECT = ''
TEST_AZURE_USERNAME = ''
TEST_AZURE_KEY_DATA = ''
BROADCAST_WEBSOCKETS_SECRET = '🤖starscream🤖'
BROADCAST_WEBSOCKETS_PORT = 8013
BROADCAST_WEBSOCKETS_VERIFY_CERT = False
BROADCAST_WEBSOCKETS_PROTOCOL = 'http'
BROADCAST_WEBSOCKET_SECRET = '🤖starscream🤖'
BROADCAST_WEBSOCKET_PORT = 8013
BROADCAST_WEBSOCKET_VERIFY_CERT = False
BROADCAST_WEBSOCKET_PROTOCOL = 'http'