diff --git a/awx/main/management/commands/run_wsrelay.py b/awx/main/management/commands/run_wsrelay.py index 6538a8ac56..bcd64be890 100644 --- a/awx/main/management/commands/run_wsrelay.py +++ b/awx/main/management/commands/run_wsrelay.py @@ -165,9 +165,11 @@ class Command(BaseCommand): return WebsocketsMetricsServer().start() + websocket_relay_manager = WebSocketRelayManager() - try: - websocket_relay_manager = WebSocketRelayManager() - asyncio.run(websocket_relay_manager.run()) - except KeyboardInterrupt: - logger.info('Terminating Websocket Relayer') + while True: + try: + asyncio.run(websocket_relay_manager.run()) + except KeyboardInterrupt: + logger.info('Restarting Websocket Relayer') + time.sleep(10) diff --git a/awx/main/wsrelay.py b/awx/main/wsrelay.py index 651b01ae7e..8c2a8279f9 100644 --- a/awx/main/wsrelay.py +++ b/awx/main/wsrelay.py @@ -310,19 +310,27 @@ class WebSocketRelayManager(object): for k, v in settings.LISTENER_DATABASES.get('default', {}).get('OPTIONS', {}).items(): database_conf['OPTIONS'][k] = v - async_conn = await psycopg.AsyncConnection.connect( - dbname=database_conf['NAME'], - host=database_conf['HOST'], - user=database_conf['USER'], - password=database_conf['PASSWORD'], - port=database_conf['PORT'], - **database_conf.get("OPTIONS", {}), - ) - await async_conn.set_autocommit(True) - event_loop.create_task(self.on_ws_heartbeat(async_conn)) + task = None # Establishes a websocket connection to /websocket/relay on all API servers while True: + if not task or task.done(): + try: + async_conn = await psycopg.AsyncConnection.connect( + dbname=database_conf['NAME'], + host=database_conf['HOST'], + user=database_conf['USER'], + password=database_conf['PASSWORD'], + port=database_conf['PORT'], + **database_conf.get("OPTIONS", {}), + ) + + task = event_loop.create_task(self.on_ws_heartbeat(async_conn), name="on_ws_heartbeat") + logger.info("Creating `on_ws_heartbeat` task in event loop.") + + except Exception as e: + logger.warning(f"Failed to connect to database for pg_notify: {e}") + future_remote_hosts = self.known_hosts.keys() current_remote_hosts = self.relay_connections.keys() deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)