[wsrelay] Port back to psycopg3

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod 2023-05-30 11:31:49 -05:00 committed by John Westcott IV
parent a665d96026
commit f9bd780d62

View File

@ -12,7 +12,7 @@ from channels.layers import get_channel_layer
from django.conf import settings
from django.apps import apps
import asyncpg
import psycopg
from awx.main.analytics.broadcast_websocket import (
RelayWebsocketStats,
@ -209,51 +209,55 @@ class WebSocketRelayManager(object):
# hostname -> ip
self.known_hosts: Dict[str, str] = dict()
async def on_ws_heartbeat(self, conn, pid, channel, payload):
try:
if not payload or channel != "web_ws_heartbeat":
return
async def on_ws_heartbeat(self, conn):
await conn.execute("LISTEN web_ws_heartbeat")
async for notif in conn.notifies():
if notif is None:
continue
try:
payload = json.loads(payload)
except json.JSONDecodeError:
logmsg = "Failed to decode message from pg_notify channel `web_ws_heartbeat`"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
return
# Skip if the message comes from the same host we are running on
# In this case, we'll be sharing a redis, no need to relay.
if payload.get("hostname") == self.local_hostname:
return
if payload.get("action") == "online":
hostname = payload.get("hostname")
ip = payload.get("ip")
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
if ip is None:
logger.warning(f"Received invalid online ws_heartbeat, missing hostname and ip: {payload}")
if not notif.payload or notif.channel != "web_ws_heartbeat":
return
self.known_hosts[hostname] = ip
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
elif payload.get("action") == "offline":
hostname = payload.get("hostname")
ip = payload.get("ip")
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
if ip is None:
logger.warning(f"Received invalid offline ws_heartbeat, missing hostname and ip: {payload}")
try:
payload = json.loads(notif.payload)
except json.JSONDecodeError:
logmsg = "Failed to decode message from pg_notify channel `web_ws_heartbeat`"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
return
self.cleanup_offline_host(ip)
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e:
# This catch-all is the same as the one above. asyncio will eat the exception
# but we want to know about it.
logger.exception(f"on_ws_heartbeat exception: {e}")
# Skip if the message comes from the same host we are running on
# In this case, we'll be sharing a redis, no need to relay.
if payload.get("hostname") == self.local_hostname:
return
if payload.get("action") == "online":
hostname = payload.get("hostname")
ip = payload.get("ip")
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
if ip is None:
logger.warning(f"Received invalid online ws_heartbeat, missing hostname and ip: {payload}")
return
self.known_hosts[hostname] = ip
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
elif payload.get("action") == "offline":
hostname = payload.get("hostname")
ip = payload.get("ip")
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
if ip is None:
logger.warning(f"Received invalid offline ws_heartbeat, missing hostname and ip: {payload}")
return
self.cleanup_offline_host(ip)
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e:
# This catch-all is the same as the one above. asyncio will eat the exception
# but we want to know about it.
logger.exception(f"on_ws_heartbeat exception: {e}")
def cleanup_offline_host(self, hostname):
"""
@ -282,16 +286,16 @@ class WebSocketRelayManager(object):
# Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully.
database_conf = settings.DATABASES['default']
async_conn = await asyncpg.connect(
database=database_conf['NAME'],
async_conn = await psycopg.AsyncConnection.connect(
dbname=database_conf['NAME'],
host=database_conf['HOST'],
user=database_conf['USER'],
password=database_conf['PASSWORD'],
port=database_conf['PORT'],
# We cannot include these because asyncpg doesn't allow all the options that psycopg does.
# **database_conf.get("OPTIONS", {}),
**database_conf.get("OPTIONS", {}),
)
await async_conn.add_listener("web_ws_heartbeat", self.on_ws_heartbeat)
await async_conn.set_autocommit(True)
event_loop.create_task(self.on_ws_heartbeat(async_conn))
# Establishes a websocket connection to /websocket/relay on all API servers
while True: