From 2e0f25150c09f6cf6c653b20b8df2917d772f337 Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Fri, 9 Dec 2022 00:17:31 -0600 Subject: [PATCH] Start of heartbeet daemon Signed-off-by: Rick Elrod --- Makefile | 6 ++ awx/main/management/commands/run_heartbeet.py | 70 +++++++++++++++++++ awx/main/wsrelay.py | 24 ++++--- awx/settings/defaults.py | 3 + tools/docker-compose/supervisor.conf | 11 ++- 5 files changed, 104 insertions(+), 10 deletions(-) create mode 100644 awx/main/management/commands/run_heartbeet.py diff --git a/Makefile b/Makefile index ee434fc0b7..97071b8f0b 100644 --- a/Makefile +++ b/Makefile @@ -229,6 +229,12 @@ wsrelay: fi; \ $(PYTHON) manage.py run_wsrelay +heartbeet: + @if [ "$(VENV_BASE)" ]; then \ + . $(VENV_BASE)/awx/bin/activate; \ + fi; \ + $(PYTHON) manage.py run_heartbeet + ## Run to start the background task dispatcher for development. dispatcher: @if [ "$(VENV_BASE)" ]; then \ diff --git a/awx/main/management/commands/run_heartbeet.py b/awx/main/management/commands/run_heartbeet.py new file mode 100644 index 0000000000..3d6d4aaa82 --- /dev/null +++ b/awx/main/management/commands/run_heartbeet.py @@ -0,0 +1,70 @@ +import json +import logging +import os +import time + +from django.core.management.base import BaseCommand +from django.conf import settings + +from awx.main.dispatch import pg_bus_conn + +logger = logging.getLogger('awx.main.commands.run_heartbeet') + + +class Command(BaseCommand): + help = 'Launch the web server beacon (heartbeet)' + + # def add_arguments(self, parser): + # parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running broadcast websocket') + + def print_banner(self): + heartbeet = """ + ********** ********** + ************* ************* +***************************** + ***********HEART*********** + ************************* + ******************* + *************** _._ + *********** /`._ `'. __ + ******* \ .\| \ _'` `) + *** (``_) \| ).'` /`- / + * `\ `;\_ `\\//`-'` / + \ `'.'.| / __/` + `'--v_|/`'` + __||-._ + /'` `-`` `'\\ + / .'` ) + \ BEET ' ) + \. / + '. /'` + `) | + // + '(. + `\`. + ``""" + print(heartbeet) + + def construct_payload(self, action='online'): + payload = { + 'hostname': settings.CLUSTER_HOST_ID, + 'ip': os.environ.get('MY_POD_IP'), + 'action': action, + } + return json.dumps(payload) + + def do_hearbeat_loop(self): + with pg_bus_conn(new_connection=True) as conn: + while True: + logger.debug('Sending heartbeat') + conn.notify('web_heartbeet', self.construct_payload()) + time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS) + + # TODO: Send a message with action=offline if we notice a SIGTERM or SIGINT + # (wsrelay can use this to remove the node quicker) + def handle(self, *arg, **options): + self.print_banner() + + # Note: We don't really try any reconnect logic to pg_notify here, + # just let supervisor restart if we fail. + self.do_hearbeat_loop() diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 735386a48a..a1abc3ec3d 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -22,6 +22,7 @@ import awx.main.analytics.subsystem_metrics as s_metrics logger = logging.getLogger('awx.main.wsrelay') + def wrap_broadcast_msg(group, message: str): # TODO: Maybe wrap as "group","message" so that we don't need to # encode/decode as json. @@ -192,27 +193,35 @@ class WebSocketRelayManager(object): async def pg_consumer(self, conn): try: - await conn.execute("LISTEN wsrelay_rx_from_web") + await conn.execute("LISTEN web_heartbeet") async for notif in conn.notifies(): - if notif is not None and notif.channel == "wsrelay_rx_from_web": + if notif is not None and notif.channel == "web_heartbeet": try: payload = json.loads(notif.payload) except json.JSONDecodeError: - logmsg = "Failed to decode message from pg_notify channel `wsrelay_rx_from_web`" + logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`" if logger.isEnabledFor(logging.DEBUG): logmsg = "{} {}".format(logmsg, payload) logger.warning(logmsg) continue + # Skip if the message comes from the same host we are running on + # In this case, we'll be sharing a redis, no need to relay. + if payload.get("hostname") == self.local_hostname: + continue + if payload.get("action") == "online": hostname = payload["hostname"] ip = payload["ip"] + if ip is None: + # If we don't get an IP, just try the hostname, maybe it resolves + ip = hostname self.known_hosts[hostname] = ip - logger.info(f"Web host {hostname} ({ip}) is online.") + logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.") elif payload.get("action") == "offline": hostname = payload["hostname"] del self.known_hosts[hostname] - logger.info(f"Web host {host} ({ip}) is offline.") + logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") except Exception as e: # This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. logger.exception(f"pg_consumer exception") @@ -261,12 +270,9 @@ class WebSocketRelayManager(object): del self.relay_connections[h] stats_mgr.delete_remote_host_stats(h) - logger.error(f"New remote hosts: {new_remote_hosts}") for h in new_remote_hosts: - logger.error("we are here once") stats = stats_mgr.new_remote_host_stats(h) - logger.error("but now we are not?") - logger.info(f"Starting relay connection to {h}") + logger.debug(f"Starting relay connection to {h}") relay_connection = WebsocketRelayConnection(name=self.local_hostname, stats=stats, remote_host=self.known_hosts[h]) relay_connection.start() self.relay_connections[h] = relay_connection diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index a9d9a13925..085309de13 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1011,6 +1011,9 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10 # How often websocket process will generate stats BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 +# How often should web instances advertise themselves? +BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS = 15 + DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'} # Name of the default task queue diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 367a2cef53..a843edf342 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -28,6 +28,15 @@ killasgroup=true stdout_events_enabled = true stderr_events_enabled = true +[program:awx-heartbeet] +command = make heartbeet +autorestart = true +autorestart = true +stopasgroup=true +killasgroup=true +stdout_events_enabled = true +stderr_events_enabled = true + [program:awx-rsyslog-configurer] command = make rsyslog-configurer autorestart = true @@ -92,7 +101,7 @@ stdout_events_enabled = true stderr_events_enabled = true [group:tower-processes] -programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-cache-clear +programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-heartbeet priority=5 [program:awx-autoreload]