mirror of
https://github.com/ansible/awx.git
synced 2026-05-13 04:17:36 -02:30
remove health info
* Sending health about websockets over websockets is not a great idea. * I tried sending health data via prometheus and encountered problems that will need PR's to prometheus_client library to solve. Circle back to this later.
This commit is contained in:
committed by
Ryan Petrello
parent
45ce6d794e
commit
c06b6306ab
@@ -18,14 +18,6 @@ from django.core.serializers.json import DjangoJSONEncoder
|
|||||||
logger = logging.getLogger('awx.main')
|
logger = logging.getLogger('awx.main')
|
||||||
|
|
||||||
|
|
||||||
def dt_to_seconds(dt):
|
|
||||||
return int((dt - datetime.datetime(1970,1,1)).total_seconds())
|
|
||||||
|
|
||||||
|
|
||||||
def now_seconds():
|
|
||||||
return dt_to_seconds(datetime.datetime.now())
|
|
||||||
|
|
||||||
|
|
||||||
def wrap_broadcast_msg(group, message: str):
|
def wrap_broadcast_msg(group, message: str):
|
||||||
# TODO: Maybe wrap as "group","message" so that we don't need to
|
# TODO: Maybe wrap as "group","message" so that we don't need to
|
||||||
# encode/decode as json.
|
# encode/decode as json.
|
||||||
@@ -51,64 +43,6 @@ def get_local_host():
|
|||||||
return Instance.objects.me().hostname
|
return Instance.objects.me().hostname
|
||||||
|
|
||||||
|
|
||||||
# Second granularity; Per-minute
|
|
||||||
class FixedSlidingWindow():
|
|
||||||
def __init__(self, start_time=None):
|
|
||||||
self.buckets = dict()
|
|
||||||
self.start_time = start_time or now_seconds()
|
|
||||||
|
|
||||||
def cleanup(self, now_bucket=now_seconds()):
|
|
||||||
if self.start_time + 60 >= now_bucket:
|
|
||||||
self.start_time = now_bucket - 60 + 1
|
|
||||||
|
|
||||||
# Delete old entries
|
|
||||||
for k,v in self.buckets.items():
|
|
||||||
if k < self.start_time:
|
|
||||||
del self.buckets[k]
|
|
||||||
|
|
||||||
def record(self, ts=datetime.datetime.now()):
|
|
||||||
now_bucket = int((ts-datetime.datetime(1970,1,1)).total_seconds())
|
|
||||||
|
|
||||||
val = self.buckets.get(now_bucket, 0)
|
|
||||||
self.buckets[now_bucket] = val + 1
|
|
||||||
|
|
||||||
self.cleanup(now_bucket)
|
|
||||||
|
|
||||||
def sum(self):
|
|
||||||
self.cleanup()
|
|
||||||
return sum(self.buckets.values()) or 0
|
|
||||||
|
|
||||||
|
|
||||||
class Stats():
|
|
||||||
def __init__(self, name):
|
|
||||||
self.name = name
|
|
||||||
self._messages_received_per_minute = FixedSlidingWindow()
|
|
||||||
self._messages_received = 0
|
|
||||||
self._is_connected = False
|
|
||||||
self._connection_established_ts = None
|
|
||||||
|
|
||||||
def record_message_received(self, ts=datetime.datetime.now()):
|
|
||||||
self._messages_received += 1
|
|
||||||
self._messages_received_per_minute.record(ts)
|
|
||||||
|
|
||||||
def get_messages_received_total(self):
|
|
||||||
return self._messages_received
|
|
||||||
|
|
||||||
def get_messages_received_per_minute(self):
|
|
||||||
self._messages_received_per_minute.sum()
|
|
||||||
|
|
||||||
def record_connection_established(self, ts=datetime.datetime.now()):
|
|
||||||
self._connection_established_ts = ts
|
|
||||||
|
|
||||||
def record_connection_lost(self, ts=datetime.datetime.now()):
|
|
||||||
self._connection_established_ts = None
|
|
||||||
self._is_connected = False
|
|
||||||
|
|
||||||
def get_connection_duration(self):
|
|
||||||
return (datetime.datetime.now() - self._connection_established_ts).total_seconds()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class WebsocketTask():
|
class WebsocketTask():
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
name,
|
name,
|
||||||
@@ -119,7 +53,6 @@ class WebsocketTask():
|
|||||||
verify_ssl: bool=settings.BROADCAST_WEBSOCKETS_VERIFY_CERT,
|
verify_ssl: bool=settings.BROADCAST_WEBSOCKETS_VERIFY_CERT,
|
||||||
endpoint: str='broadcast'):
|
endpoint: str='broadcast'):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.stats = Stats(name)
|
|
||||||
self.event_loop = event_loop
|
self.event_loop = event_loop
|
||||||
self.remote_host = remote_host
|
self.remote_host = remote_host
|
||||||
self.remote_port = remote_port
|
self.remote_port = remote_port
|
||||||
@@ -152,7 +85,6 @@ class WebsocketTask():
|
|||||||
async with aiohttp.ClientSession(headers={'secret': secret_val},
|
async with aiohttp.ClientSession(headers={'secret': secret_val},
|
||||||
timeout=timeout) as session:
|
timeout=timeout) as session:
|
||||||
async with session.ws_connect(uri, ssl=self.verify_ssl) as websocket:
|
async with session.ws_connect(uri, ssl=self.verify_ssl) as websocket:
|
||||||
self.stats.record_connection_established()
|
|
||||||
attempt = 0
|
attempt = 0
|
||||||
await self.run_loop(websocket)
|
await self.run_loop(websocket)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -160,7 +92,6 @@ class WebsocketTask():
|
|||||||
# Does aiohttp throws an exception if a disconnect happens?
|
# Does aiohttp throws an exception if a disconnect happens?
|
||||||
logger.warn("Websocket broadcast client exception {}".format(e))
|
logger.warn("Websocket broadcast client exception {}".format(e))
|
||||||
finally:
|
finally:
|
||||||
self.stats.record_connection_lost()
|
|
||||||
# Reconnect
|
# Reconnect
|
||||||
self.start(attempt=attempt+1)
|
self.start(attempt=attempt+1)
|
||||||
|
|
||||||
@@ -168,71 +99,9 @@ class WebsocketTask():
|
|||||||
self.event_loop.create_task(self.connect(attempt=attempt))
|
self.event_loop.create_task(self.connect(attempt=attempt))
|
||||||
|
|
||||||
|
|
||||||
class HealthWebsocketTask(WebsocketTask):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.period = kwargs.pop('period', 10)
|
|
||||||
self.broadcast_stats = kwargs.pop('broadcast_stats', [])
|
|
||||||
|
|
||||||
super().__init__(*args, endpoint='health', **kwargs)
|
|
||||||
|
|
||||||
self.period_abs = None
|
|
||||||
# Ideally, we send a health beat at exactly the period. In reality
|
|
||||||
# there is always jitter due to OS needs, system load, etc.
|
|
||||||
# This variable tracks that offset.
|
|
||||||
self.last_period_offset = 0
|
|
||||||
|
|
||||||
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
|
|
||||||
'''
|
|
||||||
now = datetime.datetime.now()
|
|
||||||
if not self.period_abs:
|
|
||||||
self.period_abs = now
|
|
||||||
|
|
||||||
sleep_time = self.period_abs + self.period
|
|
||||||
|
|
||||||
|
|
||||||
if now <= next_period:
|
|
||||||
logger.warn("Websocket broadcast missed sending health ping.")
|
|
||||||
else:
|
|
||||||
await asyncio.sleep(sleep_time)
|
|
||||||
|
|
||||||
|
|
||||||
sleep_time = datetime.datetime.now() - (self.last_period + datetime.timedelta(seconds=PERIOD))
|
|
||||||
'''
|
|
||||||
|
|
||||||
# Start stats loop
|
|
||||||
self.event_loop.create_task(self.run_calc_stats_loop())
|
|
||||||
|
|
||||||
# Let this task loop be the send loop
|
|
||||||
await self.run_send_stats_loop(websocket)
|
|
||||||
|
|
||||||
async def run_calc_stats_loop(self):
|
|
||||||
"""
|
|
||||||
Do any periodic calculations needed. i.e. sampling
|
|
||||||
"""
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
|
|
||||||
async def run_send_stats_loop(self, websocket: aiohttp.ClientWebSocketResponse):
|
|
||||||
while True:
|
|
||||||
msg = {
|
|
||||||
"sending_host": self.name,
|
|
||||||
"remote_hosts": [],
|
|
||||||
}
|
|
||||||
for s in self.broadcast_stats:
|
|
||||||
msg['remote_hosts'].append({
|
|
||||||
'name': s.name,
|
|
||||||
'messages_received': s.get_messages_received_total(),
|
|
||||||
'messages_received_per_minute': s.get_messages_received_per_minute(),
|
|
||||||
})
|
|
||||||
|
|
||||||
logger.debug(f"Sending health message {msg}")
|
|
||||||
await websocket.send_json(msg)
|
|
||||||
await asyncio.sleep(10)
|
|
||||||
|
|
||||||
|
|
||||||
class BroadcastWebsocketTask(WebsocketTask):
|
class BroadcastWebsocketTask(WebsocketTask):
|
||||||
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
|
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
|
||||||
async for msg in websocket:
|
async for msg in websocket:
|
||||||
self.stats.record_message_received()
|
|
||||||
|
|
||||||
if msg.type == aiohttp.WSMsgType.ERROR:
|
if msg.type == aiohttp.WSMsgType.ERROR:
|
||||||
break
|
break
|
||||||
@@ -260,10 +129,8 @@ class RedisGroupBroadcastChannelLayer(RedisChannelLayer):
|
|||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
local_hostname = get_local_host()
|
local_hostname = get_local_host()
|
||||||
|
|
||||||
broadcast_tasks = [BroadcastWebsocketTask(local_hostname, loop, h) for h in remote_hosts]
|
broadcast_tasks = [BroadcastWebsocketTask(name=local_hostname,
|
||||||
broadcast_stats = [t.stats for t in broadcast_tasks]
|
event_loop=loop,
|
||||||
health_tasks = [HealthWebsocketTask(local_hostname, loop, h, broadcast_stats=broadcast_stats) for h in remote_hosts]
|
remote_host=h) for h in remote_hosts]
|
||||||
|
|
||||||
[t.start() for t in broadcast_tasks]
|
[t.start() for t in broadcast_tasks]
|
||||||
[t.start() for t in health_tasks]
|
|
||||||
|
|
||||||
|
|||||||
@@ -122,29 +122,6 @@ class BroadcastConsumer(AsyncJsonWebsocketConsumer):
|
|||||||
await self.send(event['text'])
|
await self.send(event['text'])
|
||||||
|
|
||||||
|
|
||||||
class HealthConsumer(AsyncJsonWebsocketConsumer):
|
|
||||||
async def connect(self):
|
|
||||||
try:
|
|
||||||
WebsocketSecretAuthHelper.is_authorized(self.scope)
|
|
||||||
except Exception:
|
|
||||||
await self.close()
|
|
||||||
return
|
|
||||||
|
|
||||||
# TODO: log ip of connected client
|
|
||||||
logger.info("Client connected to health endpoint")
|
|
||||||
await self.accept()
|
|
||||||
|
|
||||||
async def disconnect(self, code):
|
|
||||||
# TODO: log ip of disconnected client
|
|
||||||
logger.info("Client disconnected from health endpoint")
|
|
||||||
|
|
||||||
async def receive_json(self, content, **kwargs):
|
|
||||||
logger.debug(f"Got Health status {content}")
|
|
||||||
|
|
||||||
async def internal_message(self, event):
|
|
||||||
logger.info("Got internal message from health endpoint .. can this happen?")
|
|
||||||
|
|
||||||
|
|
||||||
class EventConsumer(AsyncJsonWebsocketConsumer):
|
class EventConsumer(AsyncJsonWebsocketConsumer):
|
||||||
async def connect(self):
|
async def connect(self):
|
||||||
user = self.scope['user']
|
user = self.scope['user']
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ from . import consumers
|
|||||||
websocket_urlpatterns = [
|
websocket_urlpatterns = [
|
||||||
url(r'websocket/$', consumers.EventConsumer),
|
url(r'websocket/$', consumers.EventConsumer),
|
||||||
url(r'websocket/broadcast/$', consumers.BroadcastConsumer),
|
url(r'websocket/broadcast/$', consumers.BroadcastConsumer),
|
||||||
url(r'websocket/health/$', consumers.HealthConsumer),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
application = ProtocolTypeRouter({
|
application = ProtocolTypeRouter({
|
||||||
|
|||||||
Reference in New Issue
Block a user