diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 439f084c5b..d8836b332f 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -9,7 +9,7 @@ from django.apps import apps from awx.main.consumers import emit_channel_notification from awx.main.utils import is_testing -root_key = 'awx_metrics' +root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX logger = logging.getLogger('awx.main.analytics') @@ -264,13 +264,6 @@ class Metrics: data[field] = self.METRICS[field].decode(self.conn) return data - def store_metrics(self, data_json): - # called when receiving metrics from other instances - data = json.loads(data_json) - if self.instance_name != data['instance']: - logger.debug(f"{self.instance_name} received subsystem metrics from {data['instance']}") - self.conn.set(root_key + "_instance_" + data['instance'], data['metrics']) - def should_pipe_execute(self): if self.metrics_have_changed is False: return False @@ -309,15 +302,8 @@ class Metrics: 'instance': self.instance_name, 'metrics': self.serialize_local_metrics(), } - # store a local copy as well - self.store_metrics(json.dumps(payload)) - # 🚨🚨🚨🚨🚨🚨🚨🚨 - # TODO: rework how metrics are emitted and recorded. we used to exploit wsbroadcast's behavior of - # sending the same data out to every other node. - # Should we increment this data in redis but ultimately just store it in the database? - # emit_channel_notification("metrics", payload) - # 🚨🚨🚨🚨🚨🚨🚨🚨 + emit_channel_notification("metrics", payload) self.previous_send_metrics.set(current_time) self.previous_send_metrics.store_value(self.conn) diff --git a/awx/main/consumers.py b/awx/main/consumers.py index e2d47a96fb..f856ca915e 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -3,6 +3,7 @@ import logging import time import hmac import asyncio +import redis from django.core.serializers.json import DjangoJSONEncoder from django.conf import settings @@ -102,7 +103,12 @@ class RelayConsumer(AsyncJsonWebsocketConsumer): async def receive_json(self, data): (group, message) = unwrap_broadcast_msg(data) - await self.channel_layer.group_send(group, message) + if group == "metrics": + message = json.loads(message['text']) + conn = redis.Redis.from_url(settings.BROKER_URL) + conn.set(settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX + "_instance_" + message['instance'], message['metrics']) + else: + await self.channel_layer.group_send(group, message) async def consumer_subscribe(self, event): await self.send_json(event) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 43840e1691..be0ea04162 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -107,6 +107,12 @@ class WebsocketRelayConnection: self.async_task.cancel() async def run_connection(self, websocket: aiohttp.ClientWebSocketResponse): + # create a dedicated subsystem metric producer to handle local subsystem + # metrics messages + # the "metrics" group is not subscribed to in the typical fashion, so we + # just explicitly create it + producer = self.event_loop.create_task(self.run_producer("metrics", websocket, "metrics")) + self.producers["metrics"] = {"task": producer, "subscriptions": {"metrics"}} async for msg in websocket: self.stats.record_message_received() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 085309de13..61fb2df764 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -228,6 +228,9 @@ JOB_EVENT_MAX_QUEUE_SIZE = 10000 # The number of job events to migrate per-transaction when moving from int -> bigint JOB_EVENT_MIGRATION_CHUNK_SIZE = 1000000 +# The prefix of the redis key that stores metrics +SUBSYSTEM_METRICS_REDIS_KEY_PREFIX = "awx_metrics" + # Histogram buckets for the callback_receiver_batch_events_insert_db metric SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS = [10, 50, 150, 350, 650, 2000]