From b8c48f7d500242a759369a44d34d7964df980e99 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 11 Jul 2023 16:23:53 -0400 Subject: [PATCH] Restore pre-upgrade pg_notify notifcation behavior (#14222) --- awx/main/dispatch/__init__.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/awx/main/dispatch/__init__.py b/awx/main/dispatch/__init__.py index 1c8ea0dbd9..ef63f0b46d 100644 --- a/awx/main/dispatch/__init__.py +++ b/awx/main/dispatch/__init__.py @@ -55,6 +55,23 @@ class PubSub(object): with self.conn.cursor() as cur: cur.execute('SELECT pg_notify(%s, %s);', (channel, payload)) + @staticmethod + def current_notifies(conn): + """ + Altered version of .notifies method from psycopg library + This removes the outer while True loop so that we only process + queued notifications + """ + with conn.lock: + try: + ns = conn.wait(psycopg.generators.notifies(conn.pgconn)) + except psycopg.errors._NO_TRACEBACK as ex: + raise ex.with_traceback(None) + enc = psycopg._encodings.pgconn_encoding(conn.pgconn) + for pgn in ns: + n = psycopg.connection.Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid) + yield n + def events(self, select_timeout=5, yield_timeouts=False): if not self.conn.autocommit: raise RuntimeError('Listening for events can only be done in autocommit mode') @@ -64,7 +81,7 @@ class PubSub(object): if yield_timeouts: yield None else: - notification_generator = self.conn.notifies() + notification_generator = self.current_notifies(self.conn) for notification in notification_generator: yield notification