awx/awxkit/awxkit/ws.py
2020-03-19 09:02:39 -04:00

256 lines
8.8 KiB
Python

import threading
import logging
import atexit
import json
import ssl
from queue import Queue, Empty
from urllib.parse import urlparse
from awxkit.config import config
log = logging.getLogger(__name__)
class WSClientException(Exception):
pass
changed = 'changed'
limit_reached = 'limit_reached'
status_changed = 'status_changed'
summary = 'summary'
class WSClient(object):
"""Provides a basic means of testing pub/sub notifications with payloads similar to
'groups': {'jobs': ['status_changed', 'summary'],
'schedules': ['changed'],
'ad_hoc_command_events': [ids...],
'job_events': [ids...],
'workflow_events': [ids...],
'project_update_events': [ids...],
'inventory_update_events': [ids...],
'system_job_events': [ids...],
'control': ['limit_reached']}
e.x:
```
ws = WSClient(token, port=8013, secure=False).connect()
ws.job_details()
... # launch job
job_messages = [msg for msg in ws]
ws.ad_hoc_stdout()
... # launch ad hoc command
ad_hoc_messages = [msg for msg in ws]
ws.close()
```
"""
# Subscription group types
def __init__(self, token=None, hostname='', port=443, secure=True, session_id=None, csrftoken=None):
# delay this import, because this is an optional dependency
import websocket
if not hostname:
result = urlparse(config.base_url)
secure = result.scheme == 'https'
port = result.port
if port is None:
port = 80
if secure:
port = 443
# should we be adding result.path here?
hostname = result.hostname
self.port = port
self._use_ssl = secure
self.hostname = hostname
self.token = token
self.session_id = session_id
self.csrftoken = csrftoken
self._recv_queue = Queue()
self._ws_closed = False
self._ws_connected_flag = threading.Event()
if self.token is not None:
auth_cookie = 'token="{0.token}";'.format(self)
elif self.session_id is not None:
auth_cookie = 'sessionid="{0.session_id}"'.format(self)
if self.csrftoken:
auth_cookie += ';csrftoken={0.csrftoken}'.format(self)
else:
auth_cookie = ''
pref = 'wss://' if self._use_ssl else 'ws://'
url = '{0}{1.hostname}:{1.port}/websocket/'.format(pref, self)
self.ws = websocket.WebSocketApp(url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
cookie=auth_cookie)
self._message_cache = []
self._should_subscribe_to_pending_job = False
self._pending_unsubscribe = threading.Event()
def connect(self):
wst = threading.Thread(target=self._ws_run_forever, args=(self.ws, {"cert_reqs": ssl.CERT_NONE}))
wst.daemon = True
wst.start()
atexit.register(self.close)
if not self._ws_connected_flag.wait(20):
raise WSClientException('Failed to establish channel connection w/ AWX.')
return self
def close(self):
log.info('close method was called, but ignoring')
if not self._ws_closed:
log.info('Closing websocket connection.')
self.ws.close()
def job_details(self, *job_ids):
"""subscribes to job status, summary, and, for the specified ids, job events"""
self.subscribe(jobs=[status_changed, summary], job_events=list(job_ids))
def pending_job_details(self):
"""subscribes to job status and summary, with responsive
job event subscription for an id provided by AWX
"""
self.subscribe_to_pending_events('job_events', [status_changed, summary])
def status_changes(self):
self.subscribe(jobs=[status_changed])
def job_stdout(self, *job_ids):
self.subscribe(jobs=[status_changed], job_events=list(job_ids))
def pending_job_stdout(self):
self.subscribe_to_pending_events('job_events')
# mirror page behavior
def ad_hoc_stdout(self, *ahc_ids):
self.subscribe(jobs=[status_changed], ad_hoc_command_events=list(ahc_ids))
def pending_ad_hoc_stdout(self):
self.subscribe_to_pending_events('ad_hoc_command_events')
def project_update_stdout(self, *project_update_ids):
self.subscribe(jobs=[status_changed], project_update_events=list(project_update_ids))
def pending_project_update_stdout(self):
self.subscribe_to_pending_events('project_update_events')
def inventory_update_stdout(self, *inventory_update_ids):
self.subscribe(jobs=[status_changed], inventory_update_events=list(inventory_update_ids))
def pending_inventory_update_stdout(self):
self.subscribe_to_pending_events('inventory_update_events')
def workflow_events(self, *wfjt_ids):
self.subscribe(jobs=[status_changed], workflow_events=list(wfjt_ids))
def pending_workflow_events(self):
self.subscribe_to_pending_events('workflow_events')
def system_job_events(self, *system_job_ids):
self.subscribe(jobs=[status_changed], system_job_events=list(system_job_ids))
def pending_system_job_events(self):
self.subscribe_to_pending_events('system_job_events')
def subscribe_to_pending_events(self, events, jobs=[status_changed]):
self._should_subscribe_to_pending_job = dict(jobs=jobs, events=events)
self.subscribe(jobs=jobs)
# mirror page behavior
def jobs_list(self):
self.subscribe(jobs=[status_changed, summary], schedules=[changed])
# mirror page behavior
def dashboard(self):
self.subscribe(jobs=[status_changed])
def subscribe(self, **groups):
"""Sends a subscription request for the specified channel groups.
```
ws.subscribe(jobs=[ws.status_changed, ws.summary],
job_events=[1,2,3])
```
"""
self._subscribe(groups=groups)
def _subscribe(self, **payload):
payload['xrftoken'] = self.csrftoken
self._send(json.dumps(payload))
def unsubscribe(self, wait=True, timeout=10):
if wait:
# Other unnsubscribe events could have caused the edge to trigger.
# This way the _next_ event will trigger our waiting.
self._pending_unsubscribe.clear()
self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken)))
if not self._pending_unsubscribe.wait(timeout):
raise RuntimeError("Failed while waiting on unsubscribe reply because timeout of {} seconds was reached.".format(timeout))
else:
self._send(json.dumps(dict(groups={}, xrftoken=self.csrftoken)))
def _on_message(self, message):
message = json.loads(message)
log.debug('received message: {}'.format(message))
if all([message.get('group_name') == 'jobs',
message.get('status') == 'pending',
message.get('unified_job_id'),
self._should_subscribe_to_pending_job]):
if bool(message.get('project_id')) == (
self._should_subscribe_to_pending_job['events'] == 'project_update_events'):
self._update_subscription(message['unified_job_id'])
ret = self._recv_queue.put(message)
# unsubscribe acknowledgement
if 'groups_current' in message:
self._pending_unsubscribe.set()
return ret
def _update_subscription(self, job_id):
subscription = dict(jobs=self._should_subscribe_to_pending_job['jobs'])
events = self._should_subscribe_to_pending_job['events']
subscription[events] = [job_id]
self.subscribe(**subscription)
self._should_subscribe_to_pending_job = False
def _on_open(self):
self._ws_connected_flag.set()
def _on_error(self, error):
log.info('Error received: {}'.format(error))
def _on_close(self):
log.info('Successfully closed ws.')
self._ws_closed = True
def _ws_run_forever(self, sockopt=None, sslopt=None):
self.ws.run_forever(sslopt=sslopt)
log.debug('ws.run_forever finished')
def _recv(self, wait=False, timeout=10):
try:
msg = self._recv_queue.get(wait, timeout)
except Empty:
return None
return msg
def _send(self, data):
self.ws.send(data)
log.debug('successfully sent {}'.format(data))
def __iter__(self):
while True:
val = self._recv()
if not val:
return
yield val