Merge pull request #6522 from chrismeyersfsu/feature-wsbroadcast_status

add broadcast websocket status command

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2020-04-03 02:25:25 +00:00
committed by GitHub
3 changed files with 114 additions and 7 deletions

View File

@@ -11,6 +11,7 @@ from prometheus_client import (
Counter, Counter,
Enum, Enum,
CollectorRegistry, CollectorRegistry,
parser,
) )
from django.conf import settings from django.conf import settings
@@ -30,6 +31,11 @@ def now_seconds():
return dt_to_seconds(datetime.datetime.now()) return dt_to_seconds(datetime.datetime.now())
def safe_name(s):
# Replace all non alpha-numeric characters with _
return re.sub('[^0-9a-zA-Z]+', '_', s)
# Second granularity; Per-minute # Second granularity; Per-minute
class FixedSlidingWindow(): class FixedSlidingWindow():
def __init__(self, start_time=None): def __init__(self, start_time=None):
@@ -99,7 +105,8 @@ class BroadcastWebsocketStatsManager():
Stringified verion of all the stats Stringified verion of all the stats
''' '''
redis_conn = redis.Redis.from_url(settings.BROKER_URL) redis_conn = redis.Redis.from_url(settings.BROKER_URL)
return redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) stats_str = redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) or b''
return parser.text_string_to_metric_families(stats_str.decode('UTF-8'))
class BroadcastWebsocketStats(): class BroadcastWebsocketStats():
@@ -109,8 +116,8 @@ class BroadcastWebsocketStats():
self._registry = CollectorRegistry() self._registry = CollectorRegistry()
# TODO: More robust replacement # TODO: More robust replacement
self.name = self.safe_name(self._local_hostname) self.name = safe_name(self._local_hostname)
self.remote_name = self.safe_name(self._remote_hostname) self.remote_name = safe_name(self._remote_hostname)
self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total', self._messages_received_total = Counter(f'awx_{self.remote_name}_messages_received_total',
'Number of messages received, to be forwarded, by the broadcast websocket system', 'Number of messages received, to be forwarded, by the broadcast websocket system',
@@ -122,6 +129,7 @@ class BroadcastWebsocketStats():
'Websocket broadcast connection', 'Websocket broadcast connection',
states=['disconnected', 'connected'], states=['disconnected', 'connected'],
registry=self._registry) registry=self._registry)
self._connection.state('disconnected')
self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start', self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start',
'Time the connection was established', 'Time the connection was established',
registry=self._registry) registry=self._registry)
@@ -131,10 +139,6 @@ class BroadcastWebsocketStats():
registry=self._registry) registry=self._registry)
self._internal_messages_received_per_minute = FixedSlidingWindow() self._internal_messages_received_per_minute = FixedSlidingWindow()
def safe_name(self, s):
# Replace all non alpha-numeric characters with _
return re.sub('[^0-9a-zA-Z]+', '_', s)
def unregister(self): def unregister(self):
self._registry.unregister(f'awx_{self.remote_name}_messages_received') self._registry.unregister(f'awx_{self.remote_name}_messages_received')
self._registry.unregister(f'awx_{self.remote_name}_connection') self._registry.unregister(f'awx_{self.remote_name}_connection')

View File

@@ -2,10 +2,18 @@
# All Rights Reserved. # All Rights Reserved.
import logging import logging
import asyncio import asyncio
import datetime
import re
from datetime import datetime as dt
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from awx.main.analytics.broadcast_websocket import (
BroadcastWebsocketStatsManager,
safe_name,
)
from awx.main.wsbroadcast import BroadcastWebsocketManager from awx.main.wsbroadcast import BroadcastWebsocketManager
from awx.main.models.ha import Instance
logger = logging.getLogger('awx.main.wsbroadcast') logger = logging.getLogger('awx.main.wsbroadcast')
@@ -14,7 +22,101 @@ logger = logging.getLogger('awx.main.wsbroadcast')
class Command(BaseCommand): class Command(BaseCommand):
help = 'Launch the websocket broadcaster' help = 'Launch the websocket broadcaster'
def add_arguments(self, parser):
parser.add_argument('--status', dest='status', action='store_true',
help='print the internal state of any running broadcast websocket')
@classmethod
def display_len(cls, s):
return len(re.sub('\x1b.*?m', '', s))
@classmethod
def _format_lines(cls, host_stats, padding=5):
widths = [0 for i in host_stats[0]]
for entry in host_stats:
for i, e in enumerate(entry):
if Command.display_len(e) > widths[i]:
widths[i] = Command.display_len(e)
paddings = [padding for i in widths]
lines = []
for entry in host_stats:
line = ""
for pad, width, value in zip(paddings, widths, entry):
if len(value) > Command.display_len(value):
width += len(value) - Command.display_len(value)
total_width = width + pad
line += f'{value:{total_width}}'
lines.append(line)
return lines
@classmethod
def get_connection_status(cls, me, hostnames, data):
host_stats = [('hostame', 'state', 'start time', 'duration (sec)')]
for h in hostnames:
connection_color = '91' # red
h = safe_name(h)
prefix = f'awx_{h}'
connection_state = data.get(f'{prefix}_connection', 'N/A')
connection_started = 'N/A'
connection_duration = 'N/A'
if connection_state is None:
connection_state = 'unknown'
if connection_state == 'connected':
connection_color = '92' # green
connection_started = data.get(f'{prefix}_connection_start', 'Error')
if connection_started != 'Error':
connection_started = datetime.datetime.fromtimestamp(connection_started)
connection_duration = (dt.now() - connection_started).total_seconds()
connection_state = f'\033[{connection_color}m{connection_state}\033[0m'
host_stats.append((h, connection_state, str(connection_started), str(connection_duration)))
return host_stats
@classmethod
def get_connection_stats(cls, me, hostnames, data):
host_stats = [('hostame', 'total', 'per minute')]
for h in hostnames:
h = safe_name(h)
prefix = f'awx_{h}'
messages_total = data.get(f'{prefix}_messages_received', 'N/A')
messages_per_minute = data.get(f'{prefix}_messages_received_per_minute', 'N/A')
host_stats.append((h, str(int(messages_total)), str(int(messages_per_minute))))
return host_stats
def handle(self, *arg, **options): def handle(self, *arg, **options):
if options.get('status'):
stats_all = BroadcastWebsocketStatsManager.get_stats_sync()
data = {}
for family in stats_all:
if family.type == 'gauge' and len(family.samples) > 1:
for sample in family.samples:
if sample.value >= 1:
data[family.name] = sample.labels[family.name]
break
else:
data[family.name] = family.samples[0].value
me = Instance.objects.me()
hostnames = [i.hostname for i in Instance.objects.exclude(hostname=me.hostname)]
host_stats = Command.get_connection_status(me, hostnames, data)
lines = Command._format_lines(host_stats)
print(f'Broadcast websocket connection status from "{me.hostname}" to:')
print('\n'.join(lines))
host_stats = Command.get_connection_stats(me, hostnames, data)
lines = Command._format_lines(host_stats)
print(f'\nBroadcast websocket connection stats from "{me.hostname}" to:')
print('\n'.join(lines))
return
try: try:
broadcast_websocket_mgr = BroadcastWebsocketManager() broadcast_websocket_mgr = BroadcastWebsocketManager()
task = broadcast_websocket_mgr.start() task = broadcast_websocket_mgr.start()

View File

@@ -8,6 +8,7 @@ SOSREPORT_TOWER_COMMANDS = [
"awx-manage list_instances", # tower cluster configuration "awx-manage list_instances", # tower cluster configuration
"awx-manage run_dispatcher --status", # tower dispatch worker status "awx-manage run_dispatcher --status", # tower dispatch worker status
"awx-manage check_license --data", # tower license status "awx-manage check_license --data", # tower license status
"awx-manage run_wsbroadcast --status", # tower broadcast websocket status
"supervisorctl status", # tower process status "supervisorctl status", # tower process status
"/var/lib/awx/venv/awx/bin/pip freeze", # pip package list "/var/lib/awx/venv/awx/bin/pip freeze", # pip package list
"/var/lib/awx/venv/awx/bin/pip freeze -l", # pip package list without globally-installed packages "/var/lib/awx/venv/awx/bin/pip freeze -l", # pip package list without globally-installed packages