Rename heartbeet daemon to ws_heartbeat (#14041)

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod
2023-05-24 13:27:55 -05:00
committed by GitHub
parent f46c7452d1
commit bac124004f
7 changed files with 26 additions and 55 deletions

View File

@@ -267,11 +267,11 @@ run-wsrelay:
$(PYTHON) manage.py run_wsrelay $(PYTHON) manage.py run_wsrelay
## Start the heartbeat process in background in development environment. ## Start the heartbeat process in background in development environment.
run-heartbeet: run-ws-heartbeat:
@if [ "$(VENV_BASE)" ]; then \ @if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \ . $(VENV_BASE)/awx/bin/activate; \
fi; \ fi; \
$(PYTHON) manage.py run_heartbeet $(PYTHON) manage.py run_ws_heartbeat
reports: reports:
mkdir -p $@ mkdir -p $@

View File

@@ -10,39 +10,11 @@ from django.conf import settings
from awx.main.dispatch import pg_bus_conn from awx.main.dispatch import pg_bus_conn
logger = logging.getLogger('awx.main.commands.run_heartbeet') logger = logging.getLogger('awx.main.commands.run_ws_heartbeat')
class Command(BaseCommand): class Command(BaseCommand):
help = 'Launch the web server beacon (heartbeet)' help = 'Launch the web server beacon (ws_heartbeat)'
def print_banner(self):
heartbeet = r"""
********** **********
************* *************
*****************************
***********HEART***********
*************************
*******************
*************** _._
*********** /`._ `'. __
******* \ .\| \ _'` `)
*** (``_) \| ).'` /`- /
* `\ `;\_ `\\//`-'` /
\ `'.'.| / __/`
`'--v_|/`'`
__||-._
/'` `-`` `'\\
/ .'` )
\ BEET ' )
\. /
'. /'`
`) |
//
'(.
`\`.
``"""
print(heartbeet)
def construct_payload(self, action='online'): def construct_payload(self, action='online'):
payload = { payload = {
@@ -54,18 +26,17 @@ class Command(BaseCommand):
def notify_listener_and_exit(self, *args): def notify_listener_and_exit(self, *args):
with pg_bus_conn(new_connection=False) as conn: with pg_bus_conn(new_connection=False) as conn:
conn.notify('web_heartbeet', self.construct_payload(action='offline')) conn.notify('web_ws_heartbeat', self.construct_payload(action='offline'))
sys.exit(0) sys.exit(0)
def do_hearbeat_loop(self): def do_hearbeat_loop(self):
with pg_bus_conn(new_connection=True) as conn: with pg_bus_conn(new_connection=True) as conn:
while True: while True:
logger.debug('Sending heartbeat') logger.debug('Sending heartbeat')
conn.notify('web_heartbeet', self.construct_payload()) conn.notify('web_ws_heartbeat', self.construct_payload())
time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS) time.sleep(settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS)
def handle(self, *arg, **options): def handle(self, *arg, **options):
self.print_banner()
signal.signal(signal.SIGTERM, self.notify_listener_and_exit) signal.signal(signal.SIGTERM, self.notify_listener_and_exit)
signal.signal(signal.SIGINT, self.notify_listener_and_exit) signal.signal(signal.SIGINT, self.notify_listener_and_exit)

View File

@@ -209,15 +209,15 @@ class WebSocketRelayManager(object):
# hostname -> ip # hostname -> ip
self.known_hosts: Dict[str, str] = dict() self.known_hosts: Dict[str, str] = dict()
async def on_heartbeet(self, conn, pid, channel, payload): async def on_ws_heartbeat(self, conn, pid, channel, payload):
try: try:
if not payload or channel != "web_heartbeet": if not payload or channel != "web_ws_heartbeat":
return return
try: try:
payload = json.loads(payload) payload = json.loads(payload)
except json.JSONDecodeError: except json.JSONDecodeError:
logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`" logmsg = "Failed to decode message from pg_notify channel `web_ws_heartbeat`"
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload) logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg) logger.warning(logmsg)
@@ -235,7 +235,7 @@ class WebSocketRelayManager(object):
# If we don't get an IP, just try the hostname, maybe it resolves # If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname ip = hostname
if ip is None: if ip is None:
logger.warning(f"Received invalid online heartbeet, missing hostname and ip: {payload}") logger.warning(f"Received invalid online ws_heartbeat, missing hostname and ip: {payload}")
return return
self.known_hosts[hostname] = ip self.known_hosts[hostname] = ip
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.") logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
@@ -246,14 +246,14 @@ class WebSocketRelayManager(object):
# If we don't get an IP, just try the hostname, maybe it resolves # If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname ip = hostname
if ip is None: if ip is None:
logger.warning(f"Received invalid offline heartbeet, missing hostname and ip: {payload}") logger.warning(f"Received invalid offline ws_heartbeat, missing hostname and ip: {payload}")
return return
self.cleanup_offline_host(ip) self.cleanup_offline_host(ip)
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e: except Exception as e:
# This catch-all is the same as the one above. asyncio will eat the exception # This catch-all is the same as the one above. asyncio will eat the exception
# but we want to know about it. # but we want to know about it.
logger.exception(f"on_heartbeet exception: {e}") logger.exception(f"on_ws_heartbeat exception: {e}")
def cleanup_offline_host(self, hostname): def cleanup_offline_host(self, hostname):
""" """
@@ -291,7 +291,7 @@ class WebSocketRelayManager(object):
# We cannot include these because asyncpg doesn't allow all the options that psycopg does. # We cannot include these because asyncpg doesn't allow all the options that psycopg does.
# **database_conf.get("OPTIONS", {}), # **database_conf.get("OPTIONS", {}),
) )
await async_conn.add_listener("web_heartbeet", self.on_heartbeet) await async_conn.add_listener("web_ws_heartbeat", self.on_ws_heartbeat)
# Establishes a websocket connection to /websocket/relay on all API servers # Establishes a websocket connection to /websocket/relay on all API servers
while True: while True:

View File

@@ -856,7 +856,7 @@ LOGGING = {
'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'}, 'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'},
'awx.main.rsyslog_configurer': {'handlers': ['rsyslog_configurer']}, 'awx.main.rsyslog_configurer': {'handlers': ['rsyslog_configurer']},
'awx.main.cache_clear': {'handlers': ['cache_clear']}, 'awx.main.cache_clear': {'handlers': ['cache_clear']},
'awx.main.heartbeet': {'handlers': ['heartbeet']}, 'awx.main.ws_heartbeat': {'handlers': ['ws_heartbeat']},
'awx.main.wsrelay': {'handlers': ['wsrelay']}, 'awx.main.wsrelay': {'handlers': ['wsrelay']},
'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False}, 'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False},
'awx.main.tasks': {'handlers': ['task_system', 'external_logger', 'console'], 'propagate': False}, 'awx.main.tasks': {'handlers': ['task_system', 'external_logger', 'console'], 'propagate': False},
@@ -890,7 +890,7 @@ handler_config = {
'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'}, 'job_lifecycle': {'filename': 'job_lifecycle.log', 'formatter': 'job_lifecycle'},
'rsyslog_configurer': {'filename': 'rsyslog_configurer.log'}, 'rsyslog_configurer': {'filename': 'rsyslog_configurer.log'},
'cache_clear': {'filename': 'cache_clear.log'}, 'cache_clear': {'filename': 'cache_clear.log'},
'heartbeet': {'filename': 'heartbeet.log'}, 'ws_heartbeat': {'filename': 'ws_heartbeat.log'},
} }
# If running on a VM, we log to files. When running in a container, we log to stdout. # If running on a VM, we log to files. When running in a container, we log to stdout.

View File

@@ -44,7 +44,7 @@ The notable modules for this component are:
endpoint. This is a daemon. It formerly ran in each web container, but now endpoint. This is a daemon. It formerly ran in each web container, but now
runs in each task container instead. runs in each task container instead.
* `awx/main/management/commands/run_heartbeet.py` - discussed below, used to * `awx/main/management/commands/run_ws_heartbeat.py` - discussed below, used to
send a heartbeat payload to pg_notify every few seconds, so that all task send a heartbeat payload to pg_notify every few seconds, so that all task
pods running `wsrelay.py` (above) know about each web pod. pods running `wsrelay.py` (above) know about each web pod.
@@ -103,7 +103,7 @@ that care about them.
### The Heartbeet ### The Heartbeet
There is also a "heartbeet" system (a play on "heartbeat"), that goes along with There is also a "ws_heartbeat" system, that goes along with
the above. Remember that `wsrelay` lives in each task pod, and there could be an the above. Remember that `wsrelay` lives in each task pod, and there could be an
arbitrary number of web and task pods (independent of each other). Because of arbitrary number of web and task pods (independent of each other). Because of
this, `wsrelay` (on all task pods) needs to know which web pods are up and need this, `wsrelay` (on all task pods) needs to know which web pods are up and need
@@ -111,7 +111,7 @@ to be connected to (on their "relay" endpoints). To accomplish this, we use
pg_notify, since web and task pods are all able to connect to the database and pg_notify, since web and task pods are all able to connect to the database and
we are safely able to use it as a central communication point. we are safely able to use it as a central communication point.
In each web container, there is a process, `run_heartbeet.py` which will send In each web container, there is a process, `run_ws_heartbeat.py` which will send
out a heartbeat payload to pg_notify, every out a heartbeat payload to pg_notify, every
`settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS` seconds. This is `settings.BROADCAST_WEBSOCKET_BEACON_FROM_WEB_RATE_SECONDS` seconds. This is
done in a broadcast fashion to a specific pg_notify channel, and each `wsrelay` done in a broadcast fashion to a specific pg_notify channel, and each `wsrelay`
@@ -120,7 +120,7 @@ these messages. When `wsrelay` sees this heartbeat packet, it checks to see if
the web node is known already. If not, it creates a connection to it, and adds the web node is known already. If not, it creates a connection to it, and adds
it to its list of nodes to relay websocket messages to. it to its list of nodes to relay websocket messages to.
It can also handle web nodes going offline. If `run_heartbeet.py` detects It can also handle web nodes going offline. If `run_ws_heartbeat.py` detects
SIGTERM or SIGINT, it will send an "offline" heartbeat packet, and `wsrelay` SIGTERM or SIGINT, it will send an "offline" heartbeat packet, and `wsrelay`
will work to *remove* the web node from its list of active connections. will work to *remove* the web node from its list of active connections.

View File

@@ -55,12 +55,12 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0 stderr_logfile_maxbytes=0
[program:heartbeet] [program:ws-heartbeat]
{% if kube_dev | bool %} {% if kube_dev | bool %}
command = make run-heartbeet command = make run-ws-heartbeat
directory = /awx_devel directory = /awx_devel
{% else %} {% else %}
command = awx-manage run_heartbeet command = awx-manage run_ws_heartbeat
directory = /var/lib/awx directory = /var/lib/awx
{% endif %} {% endif %}
autorestart = true autorestart = true
@@ -103,7 +103,7 @@ stderr_logfile_maxbytes=0
{% endif %} {% endif %}
[group:tower-processes] [group:tower-processes]
programs=nginx,uwsgi,daphne,awx-cache-clear,heartbeet programs=nginx,uwsgi,daphne,awx-cache-clear,ws-heartbeat
priority=5 priority=5
[eventlistener:superwatcher] [eventlistener:superwatcher]

View File

@@ -28,8 +28,8 @@ killasgroup=true
stdout_events_enabled = true stdout_events_enabled = true
stderr_events_enabled = true stderr_events_enabled = true
[program:awx-heartbeet] [program:awx-ws-heartbeat]
command = awx-manage run_heartbeet command = awx-manage run_ws_heartbeat
autorestart = true autorestart = true
autorestart = true autorestart = true
stopasgroup=true stopasgroup=true
@@ -101,7 +101,7 @@ stdout_events_enabled = true
stderr_events_enabled = true stderr_events_enabled = true
[group:tower-processes] [group:tower-processes]
programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-heartbeet,awx-rsyslog-configurer,awx-cache-clear programs=awx-dispatcher,awx-receiver,awx-uwsgi,awx-daphne,awx-nginx,awx-wsrelay,awx-rsyslogd,awx-ws-heartbeat,awx-rsyslog-configurer,awx-cache-clear
priority=5 priority=5
[program:awx-autoreload] [program:awx-autoreload]