Start of heartbeet daemon

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod 2022-12-09 00:17:31 -06:00 committed by Hao Liu
parent 4f5bc992a0
commit 2e0f25150c
5 changed files with 104 additions and 10 deletions

View File

@ -229,6 +229,12 @@ wsrelay:
fi; \
$(PYTHON) manage.py run_wsrelay
heartbeet:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py run_heartbeet
## Run to start the background task dispatcher for development.
dispatcher:
@if [ "$(VENV_BASE)" ]; then \

View File

@ -0,0 +1,70 @@
import json
import logging
import os
import time
from django.core.management.base import BaseCommand
from django.conf import settings
from awx.main.dispatch import pg_bus_conn
logger = logging.getLogger('awx.main.commands.run_heartbeet')
class Command(BaseCommand):
help = 'Launch the web server beacon (heartbeet)'
# def add_arguments(self, parser):
# parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running broadcast websocket')
def print_banner(self):
heartbeet = """
********** **********
************* *************
*****************************
***********HEART***********
*************************
*******************
*************** _._
*********** /`._ `'. __
******* \ .\| \ _'` `)
*** (``_) \| ).'` /`- /
* `\ `;\_ `\\//`-'` /
\ `'.'.| / __/`
`'--v_|/`'`
__||-._
/'` `-`` `'\\
/ .'` )
\ BEET ' )
\. /
'. /'`
`) |
//
'(.
`\`.
``"""
print(heartbeet)
def construct_payload(self, action='online'):
payload = {
'hostname': settings.CLUSTER_HOST_ID,
'ip': os.environ.get('MY_POD_IP'),
'action': action,
}
return json.dumps(payload)
def do_hearbeat_loop(self):
with pg_bus_conn(new_connection=True) as conn:
while True:
logger.debug('Sending heartbeat')
conn.notify('web_heartbeet', self.construct_payload())
time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS)
# TODO: Send a message with action=offline if we notice a SIGTERM or SIGINT
# (wsrelay can use this to remove the node quicker)
def handle(self, *arg, **options):
self.print_banner()
# Note: We don't really try any reconnect logic to pg_notify here,
# just let supervisor restart if we fail.
self.do_hearbeat_loop()

View File

@ -22,6 +22,7 @@ import awx.main.analytics.subsystem_metrics as s_metrics
logger = logging.getLogger('awx.main.wsrelay')
def wrap_broadcast_msg(group, message: str):
# TODO: Maybe wrap as "group","message" so that we don't need to
# encode/decode as json.
@ -192,27 +193,35 @@ class WebSocketRelayManager(object):
async def pg_consumer(self, conn):
try:
await conn.execute("LISTEN wsrelay_rx_from_web")
await conn.execute("LISTEN web_heartbeet")
async for notif in conn.notifies():
if notif is not None and notif.channel == "wsrelay_rx_from_web":
if notif is not None and notif.channel == "web_heartbeet":
try:
payload = json.loads(notif.payload)
except json.JSONDecodeError:
logmsg = "Failed to decode message from pg_notify channel `wsrelay_rx_from_web`"
logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
continue
# Skip if the message comes from the same host we are running on
# In this case, we'll be sharing a redis, no need to relay.
if payload.get("hostname") == self.local_hostname:
continue
if payload.get("action") == "online":
hostname = payload["hostname"]
ip = payload["ip"]
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
self.known_hosts[hostname] = ip
logger.info(f"Web host {hostname} ({ip}) is online.")
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
elif payload.get("action") == "offline":
hostname = payload["hostname"]
del self.known_hosts[hostname]
logger.info(f"Web host {host} ({ip}) is offline.")
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e:
# This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves.
logger.exception(f"pg_consumer exception")
@ -261,12 +270,9 @@ class WebSocketRelayManager(object):
del self.relay_connections[h]
stats_mgr.delete_remote_host_stats(h)
logger.error(f"New remote hosts: {new_remote_hosts}")
for h in new_remote_hosts:
logger.error("we are here once")
stats = stats_mgr.new_remote_host_stats(h)
logger.error("but now we are not?")
logger.info(f"Starting relay connection to {h}")
logger.debug(f"Starting relay connection to {h}")
relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h])
relay_connection.start()
self.relay_connections[h] = relay_connection

View File

@ -1011,6 +1011,9 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10
# How often websocket process will generate stats
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5
# How often should web instances advertise themselves?
BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS = 15
DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}
# Name of the default task queue

View File

@ -28,6 +28,15 @@ killasgroup=true
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-heartbeet]
command = make heartbeet
autorestart = true
autorestart = true
stopasgroup=true
killasgroup=true
stdout_events_enabled = true
stderr_events_enabled = true
[program:awx-rsyslog-configurer]
command = make rsyslog-configurer
autorestart = true
@ -92,7 +101,7 @@ stdout_events_enabled = true
stderr_events_enabled = true
[group:tower-processes]
programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-cache-clear
programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-heartbeet
priority=5
[program:awx-autoreload]