From 33f070081cb3963beb80df6b07e3c9b2fa3213ad Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Wed, 14 Dec 2022 12:33:54 -0500 Subject: [PATCH] Send subsystem metrics via wsrelay (#13333) Works by adding a dedicated producer in wsrelay that looks for local django channels message with group "metrics". The producer sends this to the consumer running in the web container. The consumer running in the web container handles the message by pushing it into the local redis instance. The django view that handles a request at the /api/v2/metrics endpoint will load this data from redis, format it, and return the response. --- awx/main/analytics/subsystem_metrics.py | 18 ++---------------- awx/main/consumers.py | 8 +++++++- awx/main/wsrelay.py | 6 ++++++ awx/settings/defaults.py | 3 +++ 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 439f084c5b..d8836b332f 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -9,7 +9,7 @@ from django.apps import apps from awx.main.consumers import emit_channel_notification from awx.main.utils import is_testing -root_key = 'awx_metrics' +root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX logger = logging.getLogger('awx.main.analytics') @@ -264,13 +264,6 @@ class Metrics: data[field] = self.METRICS[field].decode(self.conn) return data - def store_metrics(self, data_json): - # called when receiving metrics from other instances - data = json.loads(data_json) - if self.instance_name != data['instance']: - logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}") - self.conn.set(root_key + "_instance_" + data['instance'], data['metrics']) - def should_pipe_execute(self): if self.metrics_have_changed is False: return False @@ -309,15 +302,8 @@ class Metrics: 'instance': self.instance_name, 'metrics': self.serialize_local_metrics(), } - # store a local copy as well - self.store_metrics(json.dumps(payload)) - # 🚨🚨🚨🚨🚨🚨🚨🚨 - # TODO: rework how metrics are emitted and recorded. we used to exploit wsbroadcast's behavior of - # sending the same data out to every other node. - # Should we increment this data in redis but ultimately just store it in the database? - # emit_channel_notification("metrics", payload) - # 🚨🚨🚨🚨🚨🚨🚨🚨 + emit_channel_notification("metrics", payload) self.previous_send_metrics.set(current_time) self.previous_send_metrics.store_value(self.conn) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index e2d47a96fb..f856ca915e 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -3,6 +3,7 @@ import logging import time import hmac import asyncio +import redis from django.core.serializers.json import DjangoJSONEncoder from django.conf import settings @@ -102,7 +103,12 @@ class RelayConsumer(AsyncJsonWebsocketConsumer): async def receive_json(self, data): (group, message) = unwrap_broadcast_msg(data) - await self.channel_layer.group_send(group, message) + if group == "metrics": + message = json.loads(message['text']) + conn = redis.Redis.from_url(settings.BROKER_URL) + conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics']) + else: + await self.channel_layer.group_send(group, message) async def consumer_subscribe(self, event): await self.send_json(event) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 43840e1691..be0ea04162 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -107,6 +107,12 @@ class WebsocketRelayConnection: self.async_task.cancel() async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse): + # create a dedicated subsystem metric producer to handle local subsystem + # metrics messages + # the "metrics" group is not subscribed to in the typical fashion, so we + # just explicitly create it + producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics")) + self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}} async for msg in websocket: self.stats.record_message_received() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 085309de13..61fb2df764 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -228,6 +228,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000 # The number of job events to migrate per-transaction when moving from int -> bigint JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000 +# The prefix of the redis key that stores metrics +SUBSYSTEM_METRICS_REDIS_KEY_PREFIX = "awx_metrics" + # Histogram buckets for the callback_receiver_batch_events_insert_db metric SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000]