Remove auto-reconnect logic from wsrelay

We no longer need to do this from wsrelay, as it will automatically try
to reconnect when it hears the next beacon from heartbeet.

This also cleans up the logic for what we do when we want to delete a
node we previously knew about.

Signed-off-by: Rick Elrod <rick@elrod.me>
This commit is contained in:
Rick Elrod
2022-12-11 01:34:12 -06:00
committed by Hao Liu
parent 704759d29a
commit 91bf49cdb3

View File

@@ -54,11 +54,12 @@ class WebsocketRelayConnection:
self.channel_layer = None self.channel_layer = None
self.subsystem_metrics = s_metrics.Metrics(instance_name=name) self.subsystem_metrics = s_metrics.Metrics(instance_name=name)
self.producers = dict() self.producers = dict()
self.connected = False
async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse): async def run_loop(self, websocket: aiohttp.ClientWebSocketResponse):
raise RuntimeError("Implement me") raise RuntimeError("Implement me")
async def connect(self, attempt): async def connect(self):
from awx.main.consumers import WebsocketSecretAuthHelper # noqa from awx.main.consumers import WebsocketSecretAuthHelper # noqa
logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.") logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.")
@@ -70,13 +71,6 @@ class WebsocketRelayConnection:
if not self.channel_layer: if not self.channel_layer:
self.channel_layer = get_channel_layer() self.channel_layer = get_channel_layer()
try:
if attempt > 0:
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS)
except asyncio.CancelledError:
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled")
raise
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/" uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/relay/"
timeout = aiohttp.ClientTimeout(total=10) timeout = aiohttp.ClientTimeout(total=10)
@@ -86,14 +80,12 @@ class WebsocketRelayConnection:
async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket: async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket:
logger.info(f"Connection from {self.name} to {self.remote_host} established.") logger.info(f"Connection from {self.name} to {self.remote_host} established.")
self.stats.record_connection_established() self.stats.record_connection_established()
attempt = 0 self.connected = True
await self.run_connection(websocket) await self.run_connection(websocket)
except asyncio.CancelledError: except asyncio.CancelledError:
# TODO: Check if connected and disconnect # TODO: Check if connected and disconnect
# Possibly use run_until_complete() if disconnect is async # Possibly use run_until_complete() if disconnect is async
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.") logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.")
self.stats.record_connection_lost()
raise
except client_exceptions.ClientConnectorError as e: except client_exceptions.ClientConnectorError as e:
logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True) logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True)
except asyncio.TimeoutError: except asyncio.TimeoutError:
@@ -102,14 +94,13 @@ class WebsocketRelayConnection:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter. # Early on, this is our canary. I'm not sure what exceptions we can really encounter.
logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True) logger.warning(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.", exc_info=True)
else: else:
logger.warning(f"Connection from {self.name} to {self.remote_host} list.") logger.info(f"Connection from {self.name} to {self.remote_host} lost, but no exception was raised.")
finally:
self.stats.record_connection_lost() self.connected = False
self.start(attempt=attempt + 1) self.stats.record_connection_lost()
def start(self, attempt=0):
self.async_task = self.event_loop.create_task(self.connect(attempt=attempt))
def start(self):
self.async_task = self.event_loop.create_task(self.connect())
return self.async_task return self.async_task
def cancel(self): def cancel(self):
@@ -137,7 +128,6 @@ class WebsocketRelayConnection:
origin_channel = payload['origin_channel'] origin_channel = payload['origin_channel']
if not self.producers.get(name): if not self.producers.get(name):
producer = self.event_loop.create_task(self.run_producer(name, websocket, group)) producer = self.event_loop.create_task(self.run_producer(name, websocket, group))
self.producers[name] = {"task": producer, "subscriptions": {origin_channel}} self.producers[name] = {"task": producer, "subscriptions": {origin_channel}}
logger.debug(f"Producer {name} started.") logger.debug(f"Producer {name} started.")
else: else:
@@ -181,6 +171,12 @@ class WebsocketRelayConnection:
continue continue
await websocket.send_json(wrap_broadcast_msg(group, msg)) await websocket.send_json(wrap_broadcast_msg(group, msg))
except ConnectionResetError:
# This can be hit when a web node is scaling down and we try to write to it.
# There's really nothing to do in this case and it's a fairly typical thing to happen.
# We'll log it as debug, but it's not really a problem.
logger.debug(f"Producer {name} connection reset.")
pass
except Exception: except Exception:
# Note, this is very intentional and important since we do not otherwise # Note, this is very intentional and important since we do not otherwise
# ever check the result of this future. Without this line you will not see an error if # ever check the result of this future. Without this line you will not see an error if
@@ -231,7 +227,8 @@ class WebSocketRelayManager(object):
del self.known_hosts[hostname] del self.known_hosts[hostname]
logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.") logger.debug(f"Web host {hostname} ({ip}) offline heartbeat received.")
except Exception as e: except Exception as e:
# This catch-all is the same as the one above. asyncio will NOT log exceptions anywhere, so we need to do so ourselves. # This catch-all is the same as the one above. asyncio will eat the exception
# but we want to know about it.
logger.exception(f"pg_consumer exception") logger.exception(f"pg_consumer exception")
async def run(self): async def run(self):
@@ -261,12 +258,22 @@ class WebSocketRelayManager(object):
deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts) deleted_remote_hosts = set(current_remote_hosts) - set(future_remote_hosts)
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
remote_addresses = {k: v.remote_host for k, v in self.relay_connections.items()} # This loop handles if we get an advertisement from a host we already know about but
# the advertisement has a different IP than we are currently connected to.
for hostname, address in self.known_hosts.items(): for hostname, address in self.known_hosts.items():
if hostname in self.relay_connections and address != remote_addresses[hostname]: if hostname not in self.relay_connections:
# We've picked up a new hostname that we don't know about yet.
continue
if address != self.relay_connections[hostname].remote_host:
deleted_remote_hosts.add(hostname) deleted_remote_hosts.add(hostname)
new_remote_hosts.add(hostname) new_remote_hosts.add(hostname)
# Delete any hosts with closed connections
for hostname, relay_conn in self.relay_connections.items():
if not relay_conn.connected:
deleted_remote_hosts.add(hostname)
if deleted_remote_hosts: if deleted_remote_hosts:
logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list") logger.info(f"Removing {deleted_remote_hosts} from websocket broadcast list")
@@ -276,6 +283,7 @@ class WebSocketRelayManager(object):
for h in deleted_remote_hosts: for h in deleted_remote_hosts:
self.relay_connections[h].cancel() self.relay_connections[h].cancel()
del self.relay_connections[h] del self.relay_connections[h]
del self.known_hosts[h]
stats_mgr.delete_remote_host_stats(h) stats_mgr.delete_remote_host_stats(h)
for h in new_remote_hosts: for h in new_remote_hosts:
@@ -285,7 +293,4 @@ class WebSocketRelayManager(object):
relay_connection.start() relay_connection.start()
self.relay_connections[h] = relay_connection self.relay_connections[h] = relay_connection
# for host, conn in self.relay_connections.items():
# logger.info(f"Current producers for {host}: {conn.producers}")
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS) await asyncio.sleep(settings.BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS)