diff --git a/awx/main/analytics/broadcast_websocket.py b/awx/main/analytics/broadcast_websocket.py index 937c101132..449726f3fe 100644 --- a/awx/main/analytics/broadcast_websocket.py +++ b/awx/main/analytics/broadcast_websocket.py @@ -11,6 +11,7 @@ from prometheus_client import ( Counter, Enum, CollectorRegistry, + parser, ) from django.conf import settings @@ -30,6 +31,11 @@ def now_seconds(): return dt_to_seconds(datetime.datetime.now()) +def safe_name(s): + # Replace all non alpha-numeric characters with _ + return re.sub('[^0-9a-zA-Z]+', '_', s) + + # Second granularity; Per-minute class FixedSlidingWindow(): def __init__(self, start_time=None): @@ -99,7 +105,8 @@ class BroadcastWebsocketStatsManager(): Stringified verion of all the stats ''' redis_conn = redis.Redis.from_url(settings.BROKER_URL) - return redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) + stats_str = redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) or b'' + return parser.text_string_to_metric_families(stats_str.decode('UTF-8')) class BroadcastWebsocketStats(): @@ -109,8 +116,8 @@ class BroadcastWebsocketStats(): self._registry = CollectorRegistry() # TODO: More robust replacement - self.name = self.safe_name(self._local_hostname) - self.remote_name = self.safe_name(self._remote_hostname) + self.name = safe_name(self._local_hostname) + self.remote_name = safe_name(self._remote_hostname) self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total', 'Number of messages received, to be forwarded, by the broadcast websocket system', @@ -122,6 +129,7 @@ class BroadcastWebsocketStats(): 'Websocket broadcast connection', states=['disconnected', 'connected'], registry=self._registry) + self._connection.state('disconnected') self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start', 'Time the connection was established', registry=self._registry) @@ -131,10 +139,6 @@ class BroadcastWebsocketStats(): registry=self._registry) self._internal_messages_received_per_minute = FixedSlidingWindow() - def safe_name(self, s): - # Replace all non alpha-numeric characters with _ - return re.sub('[^0-9a-zA-Z]+', '_', s) - def unregister(self): self._registry.unregister(f'awx_{self.remote_name}_messages_received') self._registry.unregister(f'awx_{self.remote_name}_connection') diff --git a/awx/main/management/commands/run_wsbroadcast.py b/awx/main/management/commands/run_wsbroadcast.py index cb684a3577..314623b7e4 100644 --- a/awx/main/management/commands/run_wsbroadcast.py +++ b/awx/main/management/commands/run_wsbroadcast.py @@ -2,10 +2,17 @@ # All Rights Reserved. import logging import asyncio +import datetime +from datetime import datetime as dt from django.core.management.base import BaseCommand +from awx.main.analytics.broadcast_websocket import ( + BroadcastWebsocketStatsManager, + safe_name, +) from awx.main.wsbroadcast import BroadcastWebsocketManager +from awx.main.models.ha import Instance logger = logging.getLogger('awx.main.wsbroadcast') @@ -14,7 +21,91 @@ logger = logging.getLogger('awx.main.wsbroadcast') class Command(BaseCommand): help = 'Launch the websocket broadcaster' + def add_arguments(self, parser): + parser.add_argument('--status', dest='status', action='store_true', + help='print the internal state of any running broadcast websocket') + + @classmethod + def _format_lines(cls, host_stats, padding=5): + widths = [0 for i in host_stats[0]] + for entry in host_stats: + for i, e in enumerate(entry): + if len(e) > widths[i]: + widths[i] = len(e) + paddings = [padding for i in widths] + + lines = [] + for entry in host_stats: + line = "" + for pad, width, value in zip(paddings, widths, entry): + total_width = width + pad + line += f'{value:{total_width}}' + lines.append(line) + return lines + + @classmethod + def get_connection_status(cls, me, hostnames, data): + host_stats = [('hostame', 'state', 'start time', 'duration (sec)')] + for h in hostnames: + h = safe_name(h) + prefix = f'awx_{h}' + connection_state = data.get(f'{prefix}_connection', 'N/A') + connection_started = 'N/A' + connection_duration = 'N/A' + if connection_state is None: + connection_state = 'unknown' + if connection_state == 'connected': + connection_started = data.get(f'{prefix}_connection_start', 'Error') + if connection_started != 'Error': + connection_started = datetime.datetime.fromtimestamp(connection_started) + connection_duration = (dt.now() - connection_started).total_seconds() + + host_stats.append((h, connection_state, str(connection_started), str(connection_duration))) + + return host_stats + + @classmethod + def get_connection_stats(cls, me, hostnames, data): + host_stats = [('hostame', 'total', 'per minute')] + for h in hostnames: + h = safe_name(h) + prefix = f'awx_{h}' + messages_total = data.get(f'{prefix}_messages_received', 'N/A') + messages_per_minute = data.get(f'{prefix}_messages_received_per_minute', 'N/A') + + host_stats.append((h, str(int(messages_total)), str(int(messages_per_minute)))) + + return host_stats + def handle(self, *arg, **options): + if options.get('status'): + stats_all = BroadcastWebsocketStatsManager.get_stats_sync() + data = {} + for family in stats_all: + if family.type == 'gauge' and len(family.samples) > 1: + for sample in family.samples: + if sample.value >= 1: + data[family.name] = sample.labels[family.name] + break + else: + data[family.name] = family.samples[0].value + me = Instance.objects.me() + hostnames = [i.hostname for i in Instance.objects.exclude(hostname=me.hostname)] + + host_stats = Command.get_connection_status(me, hostnames, data) + lines = Command._format_lines(host_stats) + + print(f'Broadcast websocket connection status from "{me.hostname}" to:') + print('\n'.join(lines)) + + host_stats = Command.get_connection_stats(me, hostnames, data) + lines = Command._format_lines(host_stats) + + print(f'\nBroadcast websocket connection stats from "{me.hostname}" to:') + print('\n'.join(lines)) + + return + try: broadcast_websocket_mgr = BroadcastWebsocketManager() task = broadcast_websocket_mgr.start()