[wsrelay] switch from psycopg 3 to asyncpg (#13965)

Due to dependency issues specifically around upgrading to Django 4.2, we
cannot feasibly have a dependency on psycopg2 and psycopg3. The only
place that was currently using psycopg3 was wsrelay.

Change wsrelay to use the asyncpg library and psycopg2 instead.

Tested locally on kind with a dev build of awx.

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod
2023-05-10 09:10:35 -05:00
committed by GitHub
parent 05e9b29460
commit 74c46568c1
6 changed files with 249 additions and 204 deletions

View File

@@ -11,7 +11,7 @@ from channels.layers import get_channel_layer
from django.conf import settings
from django.apps import apps
import psycopg
import asyncpg
from awx.main.analytics.broadcast_websocket import (
RelayWebsocketStats,
@@ -205,41 +205,47 @@ class WebSocketRelayManager(object):
# hostname -> ip
self.known_hosts: Dict[str, str] = dict()
async def pg_consumer(self, conn):
async def on_heartbeet(self, conn, pid, channel, payload):
try:
await conn.execute("LISTEN web_heartbeet")
async for notif in conn.notifies():
if notif is not None and notif.channel == "web_heartbeet":
try:
payload = json.loads(notif.payload)
except json.JSONDecodeError:
logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
continue
if not payload or channel != "web_heartbeet":
return
# Skip if the message comes from the same host we are running on
# In this case, we'll be sharing a redis, no need to relay.
if payload.get("hostname") == self.local_hostname:
continue
try:
payload = json.loads(payload)
except json.JSONDecodeError:
logmsg = "Failed to decode message from pg_notify channel `web_heartbeet`"
if logger.isEnabledFor(logging.DEBUG):
logmsg = "{} {}".format(logmsg, payload)
logger.warning(logmsg)
return
if payload.get("action") == "online":
hostname = payload["hostname"]
ip = payload["ip"]
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
self.known_hosts[hostname] = ip
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
elif payload.get("action") == "offline":
hostname = payload["hostname"]
del self.known_hosts[hostname]
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
# Skip if the message comes from the same host we are running on
# In this case, we'll be sharing a redis, no need to relay.
if payload.get("hostname") == self.local_hostname:
return
if payload.get("action") == "online":
hostname = payload.get("hostname")
ip = payload.get("ip")
if ip is None:
# If we don't get an IP, just try the hostname, maybe it resolves
ip = hostname
if hostname is None or ip is None:
logger.warning(f"Received invalid online heartbeet, missing hostname or ip: {payload}")
return
self.known_hosts[hostname] = ip
logger.debug(f"Web host {hostname} ({ip}) online heartbeat received.")
elif payload.get("action") == "offline":
hostname = payload.get("hostname")
if hostname is None:
logger.warning(f"Received invalid offline heartbeet, missing hostname: {payload}")
return
del self.known_hosts[hostname]
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e:
# This catch-all is the same as the one above. asyncio will eat the exception
# but we want to know about it.
logger.exception(f"pg_consumer exception: {e}")
logger.exception(f"on_heartbeet exception: {e}")
async def run(self):
event_loop = asyncio.get_running_loop()
@@ -249,16 +255,16 @@ class WebSocketRelayManager(object):
# Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully.
database_conf = settings.DATABASES['default']
async_conn = await psycopg.AsyncConnection.connect(
dbname=database_conf['NAME'],
async_conn = await asyncpg.connect(
database=database_conf['NAME'],
host=database_conf['HOST'],
user=database_conf['USER'],
password=database_conf['PASSWORD'],
port=database_conf['PORT'],
**database_conf.get("OPTIONS", {}),
# We cannot include these because asyncpg doesn't allow all the options that psycopg does.
# **database_conf.get("OPTIONS", {}),
)
await async_conn.set_autocommit(True)
event_loop.create_task(self.pg_consumer(async_conn))
await async_conn.add_listener("web_heartbeet", self.on_heartbeet)
# Establishes a websocket connection to /websocket/relay on all API servers
while True: