mirror of
https://github.com/ansible/awx.git
synced 2026-05-05 16:37:37 -02:30
add broadcast websocket status command
This commit is contained in:
@@ -11,6 +11,7 @@ from prometheus_client import (
|
|||||||
Counter,
|
Counter,
|
||||||
Enum,
|
Enum,
|
||||||
CollectorRegistry,
|
CollectorRegistry,
|
||||||
|
parser,
|
||||||
)
|
)
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -30,6 +31,11 @@ def now_seconds():
|
|||||||
return dt_to_seconds(datetime.datetime.now())
|
return dt_to_seconds(datetime.datetime.now())
|
||||||
|
|
||||||
|
|
||||||
|
def safe_name(s):
|
||||||
|
# Replace all non alpha-numeric characters with _
|
||||||
|
return re.sub('[^0-9a-zA-Z]+', '_', s)
|
||||||
|
|
||||||
|
|
||||||
# Second granularity; Per-minute
|
# Second granularity; Per-minute
|
||||||
class FixedSlidingWindow():
|
class FixedSlidingWindow():
|
||||||
def __init__(self, start_time=None):
|
def __init__(self, start_time=None):
|
||||||
@@ -99,7 +105,8 @@ class BroadcastWebsocketStatsManager():
|
|||||||
Stringified verion of all the stats
|
Stringified verion of all the stats
|
||||||
'''
|
'''
|
||||||
redis_conn = redis.Redis.from_url(settings.BROKER_URL)
|
redis_conn = redis.Redis.from_url(settings.BROKER_URL)
|
||||||
return redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME)
|
stats_str = redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) or b''
|
||||||
|
return parser.text_string_to_metric_families(stats_str.decode('UTF-8'))
|
||||||
|
|
||||||
|
|
||||||
class BroadcastWebsocketStats():
|
class BroadcastWebsocketStats():
|
||||||
@@ -109,8 +116,8 @@ class BroadcastWebsocketStats():
|
|||||||
self._registry = CollectorRegistry()
|
self._registry = CollectorRegistry()
|
||||||
|
|
||||||
# TODO: More robust replacement
|
# TODO: More robust replacement
|
||||||
self.name = self.safe_name(self._local_hostname)
|
self.name = safe_name(self._local_hostname)
|
||||||
self.remote_name = self.safe_name(self._remote_hostname)
|
self.remote_name = safe_name(self._remote_hostname)
|
||||||
|
|
||||||
self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total',
|
self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total',
|
||||||
'Number of messages received, to be forwarded, by the broadcast websocket system',
|
'Number of messages received, to be forwarded, by the broadcast websocket system',
|
||||||
@@ -122,6 +129,7 @@ class BroadcastWebsocketStats():
|
|||||||
'Websocket broadcast connection',
|
'Websocket broadcast connection',
|
||||||
states=['disconnected', 'connected'],
|
states=['disconnected', 'connected'],
|
||||||
registry=self._registry)
|
registry=self._registry)
|
||||||
|
self._connection.state('disconnected')
|
||||||
self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start',
|
self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start',
|
||||||
'Time the connection was established',
|
'Time the connection was established',
|
||||||
registry=self._registry)
|
registry=self._registry)
|
||||||
@@ -131,10 +139,6 @@ class BroadcastWebsocketStats():
|
|||||||
registry=self._registry)
|
registry=self._registry)
|
||||||
self._internal_messages_received_per_minute = FixedSlidingWindow()
|
self._internal_messages_received_per_minute = FixedSlidingWindow()
|
||||||
|
|
||||||
def safe_name(self, s):
|
|
||||||
# Replace all non alpha-numeric characters with _
|
|
||||||
return re.sub('[^0-9a-zA-Z]+', '_', s)
|
|
||||||
|
|
||||||
def unregister(self):
|
def unregister(self):
|
||||||
self._registry.unregister(f'awx_{self.remote_name}_messages_received')
|
self._registry.unregister(f'awx_{self.remote_name}_messages_received')
|
||||||
self._registry.unregister(f'awx_{self.remote_name}_connection')
|
self._registry.unregister(f'awx_{self.remote_name}_connection')
|
||||||
|
|||||||
@@ -2,10 +2,17 @@
|
|||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
from datetime import datetime as dt
|
||||||
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
|
from awx.main.analytics.broadcast_websocket import (
|
||||||
|
BroadcastWebsocketStatsManager,
|
||||||
|
safe_name,
|
||||||
|
)
|
||||||
from awx.main.wsbroadcast import BroadcastWebsocketManager
|
from awx.main.wsbroadcast import BroadcastWebsocketManager
|
||||||
|
from awx.main.models.ha import Instance
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.wsbroadcast')
|
logger = logging.getLogger('awx.main.wsbroadcast')
|
||||||
@@ -14,7 +21,91 @@ logger = logging.getLogger('awx.main.wsbroadcast')
|
|||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = 'Launch the websocket broadcaster'
|
help = 'Launch the websocket broadcaster'
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
parser.add_argument('--status', dest='status', action='store_true',
|
||||||
|
help='print the internal state of any running broadcast websocket')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _format_lines(cls, host_stats, padding=5):
|
||||||
|
widths = [0 for i in host_stats[0]]
|
||||||
|
for entry in host_stats:
|
||||||
|
for i, e in enumerate(entry):
|
||||||
|
if len(e) > widths[i]:
|
||||||
|
widths[i] = len(e)
|
||||||
|
paddings = [padding for i in widths]
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
for entry in host_stats:
|
||||||
|
line = ""
|
||||||
|
for pad, width, value in zip(paddings, widths, entry):
|
||||||
|
total_width = width + pad
|
||||||
|
line += f'{value:{total_width}}'
|
||||||
|
lines.append(line)
|
||||||
|
return lines
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_connection_status(cls, me, hostnames, data):
|
||||||
|
host_stats = [('hostame', 'state', 'start time', 'duration (sec)')]
|
||||||
|
for h in hostnames:
|
||||||
|
h = safe_name(h)
|
||||||
|
prefix = f'awx_{h}'
|
||||||
|
connection_state = data.get(f'{prefix}_connection', 'N/A')
|
||||||
|
connection_started = 'N/A'
|
||||||
|
connection_duration = 'N/A'
|
||||||
|
if connection_state is None:
|
||||||
|
connection_state = 'unknown'
|
||||||
|
if connection_state == 'connected':
|
||||||
|
connection_started = data.get(f'{prefix}_connection_start', 'Error')
|
||||||
|
if connection_started != 'Error':
|
||||||
|
connection_started = datetime.datetime.fromtimestamp(connection_started)
|
||||||
|
connection_duration = (dt.now() - connection_started).total_seconds()
|
||||||
|
|
||||||
|
host_stats.append((h, connection_state, str(connection_started), str(connection_duration)))
|
||||||
|
|
||||||
|
return host_stats
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_connection_stats(cls, me, hostnames, data):
|
||||||
|
host_stats = [('hostame', 'total', 'per minute')]
|
||||||
|
for h in hostnames:
|
||||||
|
h = safe_name(h)
|
||||||
|
prefix = f'awx_{h}'
|
||||||
|
messages_total = data.get(f'{prefix}_messages_received', 'N/A')
|
||||||
|
messages_per_minute = data.get(f'{prefix}_messages_received_per_minute', 'N/A')
|
||||||
|
|
||||||
|
host_stats.append((h, str(int(messages_total)), str(int(messages_per_minute))))
|
||||||
|
|
||||||
|
return host_stats
|
||||||
|
|
||||||
def handle(self, *arg, **options):
|
def handle(self, *arg, **options):
|
||||||
|
if options.get('status'):
|
||||||
|
stats_all = BroadcastWebsocketStatsManager.get_stats_sync()
|
||||||
|
data = {}
|
||||||
|
for family in stats_all:
|
||||||
|
if family.type == 'gauge' and len(family.samples) > 1:
|
||||||
|
for sample in family.samples:
|
||||||
|
if sample.value >= 1:
|
||||||
|
data[family.name] = sample.labels[family.name]
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
data[family.name] = family.samples[0].value
|
||||||
|
me = Instance.objects.me()
|
||||||
|
hostnames = [i.hostname for i in Instance.objects.exclude(hostname=me.hostname)]
|
||||||
|
|
||||||
|
host_stats = Command.get_connection_status(me, hostnames, data)
|
||||||
|
lines = Command._format_lines(host_stats)
|
||||||
|
|
||||||
|
print(f'Broadcast websocket connection status from "{me.hostname}" to:')
|
||||||
|
print('\n'.join(lines))
|
||||||
|
|
||||||
|
host_stats = Command.get_connection_stats(me, hostnames, data)
|
||||||
|
lines = Command._format_lines(host_stats)
|
||||||
|
|
||||||
|
print(f'\nBroadcast websocket connection stats from "{me.hostname}" to:')
|
||||||
|
print('\n'.join(lines))
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
broadcast_websocket_mgr = BroadcastWebsocketManager()
|
broadcast_websocket_mgr = BroadcastWebsocketManager()
|
||||||
task = broadcast_websocket_mgr.start()
|
task = broadcast_websocket_mgr.start()
|
||||||
|
|||||||
Reference in New Issue
Block a user