Restore pre-upgrade pg_notify notifcation behavior (#14222)

This commit is contained in:
Alan Rominger 2023-07-11 16:23:53 -04:00 committed by GitHub
parent 07e30a3d5f
commit b8c48f7d50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -55,6 +55,23 @@ class PubSub(object):
with self.conn.cursor() as cur:
cur.execute('SELECT pg_notify(%s, %s);', (channel, payload))
@staticmethod
def current_notifies(conn):
"""
Altered version of .notifies method from psycopg library
This removes the outer while True loop so that we only process
queued notifications
"""
with conn.lock:
try:
ns = conn.wait(psycopg.generators.notifies(conn.pgconn))
except psycopg.errors._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
enc = psycopg._encodings.pgconn_encoding(conn.pgconn)
for pgn in ns:
n = psycopg.connection.Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
yield n
def events(self, select_timeout=5, yield_timeouts=False):
if not self.conn.autocommit:
raise RuntimeError('Listening for events can only be done in autocommit mode')
@ -64,7 +81,7 @@ class PubSub(object):
if yield_timeouts:
yield None
else:
notification_generator = self.conn.notifies()
notification_generator = self.current_notifies(self.conn)
for notification in notification_generator:
yield notification