From 3f2d757f4ec5a1e881b73da15cfa0d487be04106 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 24 Jan 2020 15:19:50 -0500 Subject: [PATCH] update awxkit to use new unsubscribe event * Instead of waiting an arbitrary number of seconds. We can now wait the exact amount of time needed to KNOW that we are unsubscribed. This changeset takes advantage of the new subscribe reply semantic. --- awxkit/awxkit/ws.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/awxkit/awxkit/ws.py b/awxkit/awxkit/ws.py index 8005a8ef66..11668641bd 100644 --- a/awxkit/awxkit/ws.py +++ b/awxkit/awxkit/ws.py @@ -4,6 +4,7 @@ import logging import atexit import json import ssl +from datetime import datetime from six.moves.queue import Queue, Empty from six.moves.urllib.parse import urlparse @@ -93,6 +94,7 @@ class WSClient(object): cookie=auth_cookie) self._message_cache = [] self._should_subscribe_to_pending_job = False + self._pending_unsubscribe = threading.Event() def connect(self): wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE})) @@ -184,11 +186,17 @@ class WSClient(object): payload['xrftoken'] = self.csrftoken self._send(json.dumps(payload)) - def unsubscribe(self): - self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken))) - # it takes time for the unsubscribe event to be recieved and consumed and for - # messages to stop being put on the queue for daphne to send to us - time.sleep(5) + def unsubscribe(self, wait=True, timeout=10): + time_start = datetime.now() + if wait: + # Other unnsubscribe events could have caused the edge to trigger. + # This way the _next_ event will trigger our waiting. + self._pending_unsubscribe.clear() + self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken))) + if not self._pending_unsubscribe.wait(timeout): + raise RuntimeError(f"Failed while waiting on unsubscribe reply because timeout of {timeout} seconds was reached.") + else: + self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken))) def _on_message(self, message): message = json.loads(message) @@ -202,6 +210,10 @@ class WSClient(object): self._should_subscribe_to_pending_job['events'] == 'project_update_events'): self._update_subscription(message['unified_job_id']) + # unsubscribe acknowledgement + if 'groups_current' in message: + self._pending_unsubscribe.set() + return self._recv_queue.put(message) def _update_subscription(self, job_id):