consistent wsbroadcast log messages

This commit is contained in:
chris meyers
2020-04-17 13:18:29 -04:00
parent 8f5afc83ce
commit 4787e69afb

View File

@@ -70,7 +70,7 @@ class WebsocketTask():
async def connect(self, attempt): async def connect(self, attempt):
from awx.main.consumers import WebsocketSecretAuthHelper # noqa from awx.main.consumers import WebsocketSecretAuthHelper # noqa
logger.debug(f"{self.name} connect attempt {attempt} to {self.remote_host}") logger.debug(f"Connection from {self.name} to {self.remote_host} attempt number {attempt}.")
''' '''
Can not put get_channel_layer() in the init code because it is in the init Can not put get_channel_layer() in the init code because it is in the init
@@ -83,7 +83,7 @@ class WebsocketTask():
if attempt > 0: if attempt > 0:
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS) await asyncio.sleep(settings.BROADCAST_WEBSOCKET_RECONNECT_RETRY_RATE_SECONDS)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.warn(f"{self.name} connection to {self.remote_host} cancelled") logger.warn(f"Connection from {self.name} to {self.remote_host} cancelled")
raise raise
uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/" uri = f"{self.protocol}://{self.remote_host}:{self.remote_port}/websocket/{self.endpoint}/"
@@ -94,22 +94,25 @@ class WebsocketTask():
async with aiohttp.ClientSession(headers={'secret': secret_val}, async with aiohttp.ClientSession(headers={'secret': secret_val},
timeout=timeout) as session: timeout=timeout) as session:
async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket: async with session.ws_connect(uri, ssl=self.verify_ssl, heartbeat=20) as websocket:
logger.info(f"Connection from {self.name} to {self.remote_host} established.")
self.stats.record_connection_established() self.stats.record_connection_established()
attempt = 0 attempt = 0
await self.run_loop(websocket) await self.run_loop(websocket)
except asyncio.CancelledError: except asyncio.CancelledError:
# TODO: Check if connected and disconnect # TODO: Check if connected and disconnect
# Possibly use run_until_complete() if disconnect is async # Possibly use run_until_complete() if disconnect is async
logger.warn(f"{self.name} connection to {self.remote_host} cancelled") logger.warn(f"Connection from {self.name} to {self.remote_host} cancelled.")
self.stats.record_connection_lost() self.stats.record_connection_lost()
raise raise
except client_exceptions.ClientConnectorError as e: except client_exceptions.ClientConnectorError as e:
logger.warn(f"Failed to connect to {self.remote_host}: '{e}'. Reconnecting ...") logger.warn(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.")
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warn(f"Timeout while trying to connect to {self.remote_host}. Reconnecting ...") logger.warn(f"Connection from {self.name} to {self.remote_host} timed out.")
except Exception as e: except Exception as e:
# Early on, this is our canary. I'm not sure what exceptions we can really encounter. # Early on, this is our canary. I'm not sure what exceptions we can really encounter.
logger.warn(f"Websocket broadcast client exception {type(e)} {e}") logger.warn(f"Connection from {self.name} to {self.remote_host} failed for unknown reason: '{e}'.")
else:
logger.warn(f"Connection from {self.name} to {self.remote_host} list.")
self.stats.record_connection_lost() self.stats.record_connection_lost()
self.start(attempt=attempt + 1) self.start(attempt=attempt + 1)
@@ -160,9 +163,9 @@ class BroadcastWebsocketManager(object):
new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts) new_remote_hosts = set(future_remote_hosts) - set(current_remote_hosts)
if deleted_remote_hosts: if deleted_remote_hosts:
logger.warn(f"{self.local_hostname} going to remove {deleted_remote_hosts} from the websocket broadcast list") logger.warn(f"Removing {deleted_remote_hosts} from websocket broadcast list")
if new_remote_hosts: if new_remote_hosts:
logger.warn(f"{self.local_hostname} going to add {new_remote_hosts} to the websocket broadcast list") logger.warn(f"Adding {new_remote_hosts} to websocket broadcast list")
for h in deleted_remote_hosts: for h in deleted_remote_hosts:
self.broadcast_tasks[h].cancel() self.broadcast_tasks[h].cancel()