Removing psycopg2 references

This commit is contained in:
John Westcott IV
2023-05-09 09:26:46 -04:00
committed by John Westcott IV
parent 2b8ed66f3e
commit e47d30974c
7 changed files with 19 additions and 22 deletions

View File

@@ -1,5 +1,5 @@
import os
import psycopg2
import psycopg
import select
from contextlib import contextmanager
@@ -64,9 +64,9 @@ class PubSub(object):
if yield_timeouts:
yield None
else:
self.conn.poll()
while self.conn.notifies:
yield self.conn.notifies.pop(0)
notification_generator = self.conn.notifies()
for notification in notification_generator:
yield notification
def close(self):
self.conn.close()
@@ -89,9 +89,8 @@ def pg_bus_conn(new_connection=False):
conf['OPTIONS'] = conf.get('OPTIONS', {}).copy()
# Modify the application name to distinguish from other connections the process might use
conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener')
conn = psycopg2.connect(dbname=conf['NAME'], host=conf['HOST'], user=conf['USER'], password=conf['PASSWORD'], port=conf['PORT'], **conf['OPTIONS'])
# Django connection.cursor().connection doesn't have autocommit=True on by default
conn.set_session(autocommit=True)
connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}"
conn = psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS'])
else:
if pg_connection.connection is None:
pg_connection.connect()

View File

@@ -7,7 +7,7 @@ import signal
import sys
import redis
import json
import psycopg2
import psycopg
import time
from uuid import UUID
from queue import Empty as QueueEmpty
@@ -205,10 +205,10 @@ class AWXConsumerPG(AWXConsumerBase):
self.listen_start = time.time()
if self.should_stop:
return
except psycopg2.InterfaceError:
except psycopg.InterfaceError:
logger.warning("Stale Postgres message bus connection, reconnecting")
continue
except (db.DatabaseError, psycopg2.OperationalError):
except (db.DatabaseError, psycopg.OperationalError):
# If we have attained stady state operation, tolerate short-term database hickups
if not self.pg_is_down:
logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s")