mirror of
https://github.com/ansible/awx.git
synced 2026-05-09 10:27:37 -02:30
Allow wsrelay to fail without FATAL (#15191)
We have not identify the root cause of wsrelay failure but attempt to make wsrelay restart itself resulted in postgres and redis connection leak. We were not able to fully identify where the redis connection leak comes from so reverting back to failing and removing startsecs 30 will prevent wsrelay to FATAL
This commit is contained in:
@@ -101,8 +101,9 @@ class Command(BaseCommand):
|
|||||||
migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes()))
|
migrating = bool(executor.migration_plan(executor.loader.graph.leaf_nodes()))
|
||||||
connection.close() # Because of async nature, main loop will use new connection, so close this
|
connection.close() # Because of async nature, main loop will use new connection, so close this
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning(f'Error on startup of run_wsrelay (error: {exc}), retry in 10s...')
|
time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state
|
||||||
time.sleep(10)
|
# sleeping before logging because logging rely on setting which require database connection...
|
||||||
|
logger.warning(f'Error on startup of run_wsrelay (error: {exc}), slept for 10s...')
|
||||||
return
|
return
|
||||||
|
|
||||||
# In containerized deployments, migrations happen in the task container,
|
# In containerized deployments, migrations happen in the task container,
|
||||||
@@ -121,13 +122,14 @@ class Command(BaseCommand):
|
|||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
my_hostname = Instance.objects.my_hostname()
|
my_hostname = Instance.objects.my_hostname() # This relies on settings.CLUSTER_HOST_ID which requires database connection
|
||||||
logger.info('Active instance with hostname {} is registered.'.format(my_hostname))
|
logger.info('Active instance with hostname {} is registered.'.format(my_hostname))
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
# the CLUSTER_HOST_ID in the task, and web instance must match and
|
# the CLUSTER_HOST_ID in the task, and web instance must match and
|
||||||
# ensure network connectivity between the task and web instance
|
# ensure network connectivity between the task and web instance
|
||||||
logger.info('Unable to return currently active instance: {}, retry in 5s...'.format(e))
|
time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state
|
||||||
time.sleep(5)
|
# sleeping before logging because logging rely on setting which require database connection...
|
||||||
|
logger.warning(f"Unable to return currently active instance: {e}, slept for 10s before return.")
|
||||||
return
|
return
|
||||||
|
|
||||||
if options.get('status'):
|
if options.get('status'):
|
||||||
@@ -166,12 +168,14 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
WebsocketsMetricsServer().start()
|
WebsocketsMetricsServer().start()
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
try:
|
||||||
asyncio.run(WebSocketRelayManager().run())
|
logger.info('Starting Websocket Relayer...')
|
||||||
|
websocket_relay_manager = WebSocketRelayManager()
|
||||||
|
asyncio.run(websocket_relay_manager.run())
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.info('Shutting down Websocket Relayer')
|
logger.info('Terminating Websocket Relayer')
|
||||||
break
|
except BaseException as e: # BaseException is used to catch all exceptions including asyncio.CancelledError
|
||||||
except Exception as e:
|
time.sleep(10) # Prevent supervisor from restarting the service too quickly and the service to enter FATAL state
|
||||||
logger.exception('Error in Websocket Relayer, exception: {}. Restarting in 10 seconds'.format(e))
|
# sleeping before logging because logging rely on setting which require database connection...
|
||||||
time.sleep(10)
|
logger.warning(f"Encounter error while running Websocket Relayer {e}, slept for 10s...")
|
||||||
|
return
|
||||||
|
|||||||
@@ -285,8 +285,6 @@ class WebSocketRelayManager(object):
|
|||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
# Handle the case where the task was already cancelled by the time we got here.
|
# Handle the case where the task was already cancelled by the time we got here.
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to cancel relay connection for {hostname}: {e}")
|
|
||||||
|
|
||||||
del self.relay_connections[hostname]
|
del self.relay_connections[hostname]
|
||||||
|
|
||||||
@@ -297,8 +295,6 @@ class WebSocketRelayManager(object):
|
|||||||
self.stats_mgr.delete_remote_host_stats(hostname)
|
self.stats_mgr.delete_remote_host_stats(hostname)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to delete stats for {hostname}: {e}")
|
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
event_loop = asyncio.get_running_loop()
|
event_loop = asyncio.get_running_loop()
|
||||||
@@ -306,7 +302,6 @@ class WebSocketRelayManager(object):
|
|||||||
self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname)
|
self.stats_mgr = RelayWebsocketStatsManager(event_loop, self.local_hostname)
|
||||||
self.stats_mgr.start()
|
self.stats_mgr.start()
|
||||||
|
|
||||||
# Set up a pg_notify consumer for allowing web nodes to "provision" and "deprovision" themselves gracefully.
|
|
||||||
database_conf = deepcopy(settings.DATABASES['default'])
|
database_conf = deepcopy(settings.DATABASES['default'])
|
||||||
database_conf['OPTIONS'] = deepcopy(database_conf.get('OPTIONS', {}))
|
database_conf['OPTIONS'] = deepcopy(database_conf.get('OPTIONS', {}))
|
||||||
|
|
||||||
@@ -318,24 +313,6 @@ class WebSocketRelayManager(object):
|
|||||||
if 'PASSWORD' in database_conf:
|
if 'PASSWORD' in database_conf:
|
||||||
database_conf['OPTIONS']['password'] = database_conf.pop('PASSWORD')
|
database_conf['OPTIONS']['password'] = database_conf.pop('PASSWORD')
|
||||||
|
|
||||||
task = None
|
|
||||||
|
|
||||||
# Managing the async_conn here so that we can close it if we need to restart the connection
|
|
||||||
async_conn = None
|
|
||||||
|
|
||||||
# Establishes a websocket connection to /websocket/relay on all API servers
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
if not task or task.done():
|
|
||||||
try:
|
|
||||||
# Try to close the connection if it's open
|
|
||||||
if async_conn:
|
|
||||||
try:
|
|
||||||
await async_conn.close()
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to close connection to database for pg_notify: {e}")
|
|
||||||
|
|
||||||
# and re-establish the connection
|
|
||||||
async_conn = await psycopg.AsyncConnection.connect(
|
async_conn = await psycopg.AsyncConnection.connect(
|
||||||
dbname=database_conf['NAME'],
|
dbname=database_conf['NAME'],
|
||||||
host=database_conf['HOST'],
|
host=database_conf['HOST'],
|
||||||
@@ -343,14 +320,14 @@ class WebSocketRelayManager(object):
|
|||||||
port=database_conf['PORT'],
|
port=database_conf['PORT'],
|
||||||
**database_conf.get("OPTIONS", {}),
|
**database_conf.get("OPTIONS", {}),
|
||||||
)
|
)
|
||||||
|
|
||||||
await async_conn.set_autocommit(True)
|
await async_conn.set_autocommit(True)
|
||||||
|
on_ws_heartbeat_task = event_loop.create_task(self.on_ws_heartbeat(async_conn))
|
||||||
|
|
||||||
# before creating the task that uses the connection
|
# Establishes a websocket connection to /websocket/relay on all API servers
|
||||||
task = event_loop.create_task(self.on_ws_heartbeat(async_conn), name="on_ws_heartbeat")
|
while True:
|
||||||
logger.info("Creating `on_ws_heartbeat` task in event loop.")
|
if on_ws_heartbeat_task.done():
|
||||||
|
raise Exception("on_ws_heartbeat_task has exited")
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to connect to database for pg_notify: {e}")
|
|
||||||
|
|
||||||
future_remote_hosts = self.known_hosts.keys()
|
future_remote_hosts = self.known_hosts.keys()
|
||||||
current_remote_hosts = self.relay_connections.keys()
|
current_remote_hosts = self.relay_connections.keys()
|
||||||
@@ -387,10 +364,3 @@ class WebSocketRelayManager(object):
|
|||||||
self.relay_connections[h] = relay_connection
|
self.relay_connections[h] = relay_connection
|
||||||
|
|
||||||
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
|
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)
|
||||||
finally:
|
|
||||||
if async_conn:
|
|
||||||
logger.info("Shutting down db connection for wsrelay.")
|
|
||||||
try:
|
|
||||||
await async_conn.close()
|
|
||||||
except Exception as e:
|
|
||||||
logger.info(f"Failed to close connection to database for pg_notify: {e}")
|
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ command = awx-manage run_wsrelay
|
|||||||
directory = /var/lib/awx
|
directory = /var/lib/awx
|
||||||
{% endif %}
|
{% endif %}
|
||||||
autorestart = true
|
autorestart = true
|
||||||
startsecs = 30
|
|
||||||
stopasgroup=true
|
stopasgroup=true
|
||||||
killasgroup=true
|
killasgroup=true
|
||||||
stdout_logfile=/dev/stdout
|
stdout_logfile=/dev/stdout
|
||||||
|
|||||||
Reference in New Issue
Block a user