diff --git a/awx/main/dispatch/__init__.py b/awx/main/dispatch/__init__.py index 1c8ea0dbd9..ef63f0b46d 100644 --- a/awx/main/dispatch/__init__.py +++ b/awx/main/dispatch/__init__.py @@ -55,6 +55,23 @@ class PubSub(object): with self.conn.cursor() as cur: cur.execute('SELECT pg_notify(%s, %s);', (channel, payload)) + @staticmethod + def current_notifies(conn): + """ + Altered version of .notifies method from psycopg library + This removes the outer while True loop so that we only process + queued notifications + """ + with conn.lock: + try: + ns = conn.wait(psycopg.generators.notifies(conn.pgconn)) + except psycopg.errors._NO_TRACEBACK as ex: + raise ex.with_traceback(None) + enc = psycopg._encodings.pgconn_encoding(conn.pgconn) + for pgn in ns: + n = psycopg.connection.Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid) + yield n + def events(self, select_timeout=5, yield_timeouts=False): if not self.conn.autocommit: raise RuntimeError('Listening for events can only be done in autocommit mode') @@ -64,7 +81,7 @@ class PubSub(object): if yield_timeouts: yield None else: - notification_generator = self.conn.notifies() + notification_generator = self.current_notifies(self.conn) for notification in notification_generator: yield notification