New setting for pg_notify listener DB settings, add keepalive (#14755)

This commit is contained in:
Alan Rominger 2024-01-17 13:44:04 -05:00 committed by GitHub
parent 5cd029df96
commit d91da39f81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 7 deletions

View File

@ -93,6 +93,22 @@ class PubSub(object):
self.conn.close()
def create_listener_connection():
conf = settings.DATABASES['default'].copy()
conf['OPTIONS'] = conf.get('OPTIONS', {}).copy()
# Modify the application name to distinguish from other connections the process might use
conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener')
# Apply overrides specifically for the listener connection
for k, v in settings.LISTENER_DATABASES.get('default', {}).items():
conf[k] = v
for k, v in settings.LISTENER_DATABASES.get('default', {}).get('OPTIONS', {}).items():
conf['OPTIONS'][k] = v
connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}"
return psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS'])
@contextmanager
def pg_bus_conn(new_connection=False, select_timeout=None):
'''
@ -106,12 +122,7 @@ def pg_bus_conn(new_connection=False, select_timeout=None):
'''
if new_connection:
conf = settings.DATABASES['default'].copy()
conf['OPTIONS'] = conf.get('OPTIONS', {}).copy()
# Modify the application name to distinguish from other connections the process might use
conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener')
connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}"
conn = psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS'])
conn = create_listener_connection()
else:
if pg_connection.connection is None:
pg_connection.connect()

View File

@ -214,7 +214,10 @@ class AWXConsumerPG(AWXConsumerBase):
# bypasses pg_notify for scheduled tasks
self.dispatch_task(body)
self.pg_is_down = False
if self.pg_is_down:
logger.info('Dispatcher listener connection established')
self.pg_is_down = False
self.listen_start = time.time()
return self.scheduler.time_until_next_run()

View File

@ -37,6 +37,18 @@ DATABASES = {
}
}
# Special database overrides for dispatcher connections listening to pg_notify
LISTENER_DATABASES = {
'default': {
'OPTIONS': {
'keepalives': 1,
'keepalives_idle': 5,
'keepalives_interval': 5,
'keepalives_count': 5,
},
}
}
# Whether or not the deployment is a K8S-based deployment
# In K8S-based deployments, instances have zero capacity - all playbook
# automation is intended to flow through defined Container Groups that