Send subsystem metrics via wsrelay (#13333)

Works by adding a dedicated producer in wsrelay that looks for
local django channels message with group "metrics". The producer
sends this to the consumer running in the web container.

The consumer running in the web container handles the message by
pushing it into the local redis instance.

The django view that handles a request at the /api/v2/metrics
endpoint will load this data from redis, format it, and return the
response.
This commit is contained in:
Seth Foster
2022-12-14 12:33:54 -05:00
committed by Hao Liu
parent 44463402a8
commit 33f070081c
4 changed files with 18 additions and 17 deletions

View File

@@ -9,7 +9,7 @@ from django.apps import apps
from awx.main.consumers import emit_channel_notification
from awx.main.utils import is_testing
root_key = 'awx_metrics'
root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
logger = logging.getLogger('awx.main.analytics')
@@ -264,13 +264,6 @@ class Metrics:
data[field] = self.METRICS[field].decode(self.conn)
return data
def store_metrics(self, data_json):
# called when receiving metrics from other instances
data = json.loads(data_json)
if self.instance_name != data['instance']:
logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}")
self.conn.set(root_key + "_instance_" + data['instance'], data['metrics'])
def should_pipe_execute(self):
if self.metrics_have_changed is False:
return False
@@ -309,15 +302,8 @@ class Metrics:
'instance': self.instance_name,
'metrics': self.serialize_local_metrics(),
}
# store a local copy as well
self.store_metrics(json.dumps(payload))
# 🚨🚨🚨🚨🚨🚨🚨🚨
# TODO: rework how metrics are emitted and recorded. we used to exploit wsbroadcast's behavior of
# sending the same data out to every other node.
# Should we increment this data in redis but ultimately just store it in the database?
# emit_channel_notification("metrics", payload)
# 🚨🚨🚨🚨🚨🚨🚨🚨
emit_channel_notification("metrics", payload)
self.previous_send_metrics.set(current_time)
self.previous_send_metrics.store_value(self.conn)

View File

@@ -3,6 +3,7 @@ import logging
import time
import hmac
import asyncio
import redis
from django.core.serializers.json import DjangoJSONEncoder
from django.conf import settings
@@ -102,7 +103,12 @@ class RelayConsumer(AsyncJsonWebsocketConsumer):
async def receive_json(self, data):
(group, message) = unwrap_broadcast_msg(data)
await self.channel_layer.group_send(group, message)
if group == "metrics":
message = json.loads(message['text'])
conn = redis.Redis.from_url(settings.BROKER_URL)
conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics'])
else:
await self.channel_layer.group_send(group, message)
async def consumer_subscribe(self, event):
await self.send_json(event)

View File

@@ -107,6 +107,12 @@ class WebsocketRelayConnection:
self.async_task.cancel()
async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse):
# create a dedicated subsystem metric producer to handle local subsystem
# metrics messages
# the "metrics" group is not subscribed to in the typical fashion, so we
# just explicitly create it
producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics"))
self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}}
async for msg in websocket:
self.stats.record_message_received()

View File

@@ -228,6 +228,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000
# The number of job events to migrate per-transaction when moving from int -> bigint
JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000
# The prefix of the redis key that stores metrics
SUBSYSTEM_METRICS_REDIS_KEY_PREFIX = "awx_metrics"
# Histogram buckets for the callback_receiver_batch_events_insert_db metric
SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000]