From 507d2e158dbe5eae13578e1bc35f25ab374c05ba Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 15 Apr 2014 11:06:39 -0400 Subject: [PATCH] Vendor gevent's socketio and websocket modules --- .../site-packages/geventwebsocket/__init__.py | 21 + .../geventwebsocket/exceptions.py | 19 + .../geventwebsocket/gunicorn/__init__.py | 0 .../geventwebsocket/gunicorn/workers.py | 6 + .../site-packages/geventwebsocket/handler.py | 278 +++++++++ .../site-packages/geventwebsocket/logging.py | 31 + .../geventwebsocket/protocols/__init__.py | 0 .../geventwebsocket/protocols/base.py | 35 ++ .../geventwebsocket/protocols/wamp.py | 229 ++++++++ .../site-packages/geventwebsocket/resource.py | 74 +++ .../site-packages/geventwebsocket/server.py | 34 ++ .../geventwebsocket/utf8validator.py | 128 +++++ .../site-packages/geventwebsocket/utils.py | 45 ++ .../geventwebsocket/websocket.py | 543 ++++++++++++++++++ awx/lib/site-packages/socketio/__init__.py | 76 +++ awx/lib/site-packages/socketio/handler.py | 149 +++++ awx/lib/site-packages/socketio/mixins.py | 73 +++ awx/lib/site-packages/socketio/namespace.py | 468 +++++++++++++++ awx/lib/site-packages/socketio/packet.py | 204 +++++++ .../site-packages/socketio/policyserver.py | 27 + awx/lib/site-packages/socketio/server.py | 83 +++ awx/lib/site-packages/socketio/sgunicorn.py | 57 ++ awx/lib/site-packages/socketio/transports.py | 270 +++++++++ awx/lib/site-packages/socketio/virtsocket.py | 428 ++++++++++++++ 24 files changed, 3278 insertions(+) create mode 100644 awx/lib/site-packages/geventwebsocket/__init__.py create mode 100644 awx/lib/site-packages/geventwebsocket/exceptions.py create mode 100644 awx/lib/site-packages/geventwebsocket/gunicorn/__init__.py create mode 100644 awx/lib/site-packages/geventwebsocket/gunicorn/workers.py create mode 100644 awx/lib/site-packages/geventwebsocket/handler.py create mode 100644 awx/lib/site-packages/geventwebsocket/logging.py create mode 100644 awx/lib/site-packages/geventwebsocket/protocols/__init__.py create mode 100644 awx/lib/site-packages/geventwebsocket/protocols/base.py create mode 100644 awx/lib/site-packages/geventwebsocket/protocols/wamp.py create mode 100644 awx/lib/site-packages/geventwebsocket/resource.py create mode 100644 awx/lib/site-packages/geventwebsocket/server.py create mode 100644 awx/lib/site-packages/geventwebsocket/utf8validator.py create mode 100644 awx/lib/site-packages/geventwebsocket/utils.py create mode 100644 awx/lib/site-packages/geventwebsocket/websocket.py create mode 100644 awx/lib/site-packages/socketio/__init__.py create mode 100644 awx/lib/site-packages/socketio/handler.py create mode 100644 awx/lib/site-packages/socketio/mixins.py create mode 100644 awx/lib/site-packages/socketio/namespace.py create mode 100644 awx/lib/site-packages/socketio/packet.py create mode 100644 awx/lib/site-packages/socketio/policyserver.py create mode 100644 awx/lib/site-packages/socketio/server.py create mode 100644 awx/lib/site-packages/socketio/sgunicorn.py create mode 100644 awx/lib/site-packages/socketio/transports.py create mode 100644 awx/lib/site-packages/socketio/virtsocket.py diff --git a/awx/lib/site-packages/geventwebsocket/__init__.py b/awx/lib/site-packages/geventwebsocket/__init__.py new file mode 100644 index 0000000000..7e2e0167d8 --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/__init__.py @@ -0,0 +1,21 @@ +VERSION = (0, 9, 3, 'final', 0) + +__all__ = [ + 'WebSocketApplication', + 'Resource', + 'WebSocketServer', + 'WebSocketError', + 'get_version' +] + + +def get_version(*args, **kwargs): + from .utils import get_version + return get_version(*args, **kwargs) + +try: + from .resource import WebSocketApplication, Resource + from .server import WebSocketServer + from .exceptions import WebSocketError +except ImportError: + pass diff --git a/awx/lib/site-packages/geventwebsocket/exceptions.py b/awx/lib/site-packages/geventwebsocket/exceptions.py new file mode 100644 index 0000000000..e066727e57 --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/exceptions.py @@ -0,0 +1,19 @@ +from socket import error as socket_error + + +class WebSocketError(socket_error): + """ + Base class for all websocket errors. + """ + + +class ProtocolError(WebSocketError): + """ + Raised if an error occurs when de/encoding the websocket protocol. + """ + + +class FrameTooLargeException(ProtocolError): + """ + Raised if a frame is received that is too large. + """ diff --git a/awx/lib/site-packages/geventwebsocket/gunicorn/__init__.py b/awx/lib/site-packages/geventwebsocket/gunicorn/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/awx/lib/site-packages/geventwebsocket/gunicorn/workers.py b/awx/lib/site-packages/geventwebsocket/gunicorn/workers.py new file mode 100644 index 0000000000..d0aa136943 --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/gunicorn/workers.py @@ -0,0 +1,6 @@ +from geventwebsocket.handler import WebSocketHandler +from gunicorn.workers.ggevent import GeventPyWSGIWorker + + +class GeventWebSocketWorker(GeventPyWSGIWorker): + wsgi_handler = WebSocketHandler diff --git a/awx/lib/site-packages/geventwebsocket/handler.py b/awx/lib/site-packages/geventwebsocket/handler.py new file mode 100644 index 0000000000..a016ae3d74 --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/handler.py @@ -0,0 +1,278 @@ +import base64 +import hashlib +import warnings + +from gevent.pywsgi import WSGIHandler +from .websocket import WebSocket, Stream +from .logging import create_logger + + +class Client(object): + def __init__(self, address, ws): + self.address = address + self.ws = ws + + +class WebSocketHandler(WSGIHandler): + """ + Automatically upgrades the connection to a websocket. + + To prevent the WebSocketHandler to call the underlying WSGI application, + but only setup the WebSocket negotiations, do: + + mywebsockethandler.prevent_wsgi_call = True + + before calling run_application(). This is useful if you want to do more + things before calling the app, and want to off-load the WebSocket + negotiations to this library. Socket.IO needs this for example, to send + the 'ack' before yielding the control to your WSGI app. + """ + + SUPPORTED_VERSIONS = ('13', '8', '7') + GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + + def run_websocket(self): + """ + Called when a websocket has been created successfully. + """ + + if getattr(self, 'prevent_wsgi_call', False): + return + + # In case WebSocketServer is not used + if not hasattr(self.server, 'clients'): + self.server.clients = {} + + # Since we're now a websocket connection, we don't care what the + # application actually responds with for the http response + + try: + self.server.clients[self.client_address] = Client( + self.client_address, self.websocket) + self.application(self.environ, lambda s, h: []) + finally: + del self.server.clients[self.client_address] + if not self.websocket.closed: + self.websocket.close() + self.environ.update({ + 'wsgi.websocket': None + }) + self.websocket = None + + def run_application(self): + if (hasattr(self.server, 'pre_start_hook') + and self.server.pre_start_hook): + self.logger.debug("Calling pre-start hook") + if self.server.pre_start_hook(self): + return super(WebSocketHandler, self).run_application() + + self.logger.debug("Initializing WebSocket") + self.result = self.upgrade_websocket() + + if hasattr(self, 'websocket'): + if self.status and not self.headers_sent: + self.write('') + + self.run_websocket() + else: + if self.status: + # A status was set, likely an error so just send the response + if not self.result: + self.result = [] + + self.process_result() + return + + # This handler did not handle the request, so defer it to the + # underlying application object + return super(WebSocketHandler, self).run_application() + + def upgrade_websocket(self): + """ + Attempt to upgrade the current environ into a websocket enabled + connection. If successful, the environ dict with be updated with two + new entries, `wsgi.websocket` and `wsgi.websocket_version`. + + :returns: Whether the upgrade was successful. + """ + + # Some basic sanity checks first + + self.logger.debug("Validating WebSocket request") + + if self.environ.get('REQUEST_METHOD', '') != 'GET': + # This is not a websocket request, so we must not handle it + self.logger.debug('Can only upgrade connection if using GET method.') + return + + upgrade = self.environ.get('HTTP_UPGRADE', '').lower() + + if upgrade == 'websocket': + connection = self.environ.get('HTTP_CONNECTION', '').lower() + + if 'upgrade' not in connection: + # This is not a websocket request, so we must not handle it + self.logger.warning("Client didn't ask for a connection " + "upgrade") + return + else: + # This is not a websocket request, so we must not handle it + return + + if self.request_version != 'HTTP/1.1': + self.start_response('402 Bad Request', []) + self.logger.warning("Bad server protocol in headers") + + return ['Bad protocol version'] + + if self.environ.get('HTTP_SEC_WEBSOCKET_VERSION'): + return self.upgrade_connection() + else: + self.logger.warning("No protocol defined") + self.start_response('426 Upgrade Required', [ + ('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS))]) + + return ['No Websocket protocol version defined'] + + def upgrade_connection(self): + """ + Validate and 'upgrade' the HTTP request to a WebSocket request. + + If an upgrade succeeded then then handler will have `start_response` + with a status of `101`, the environ will also be updated with + `wsgi.websocket` and `wsgi.websocket_version` keys. + + :param environ: The WSGI environ dict. + :param start_response: The callable used to start the response. + :param stream: File like object that will be read from/written to by + the underlying WebSocket object, if created. + :return: The WSGI response iterator is something went awry. + """ + + self.logger.debug("Attempting to upgrade connection") + + version = self.environ.get("HTTP_SEC_WEBSOCKET_VERSION") + + if version not in self.SUPPORTED_VERSIONS: + msg = "Unsupported WebSocket Version: {0}".format(version) + + self.logger.warning(msg) + self.start_response('400 Bad Request', [ + ('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS)) + ]) + + return [msg] + + key = self.environ.get("HTTP_SEC_WEBSOCKET_KEY", '').strip() + + if not key: + # 5.2.1 (3) + msg = "Sec-WebSocket-Key header is missing/empty" + + self.logger.warning(msg) + self.start_response('400 Bad Request', []) + + return [msg] + + try: + key_len = len(base64.b64decode(key)) + except TypeError: + msg = "Invalid key: {0}".format(key) + + self.logger.warning(msg) + self.start_response('400 Bad Request', []) + + return [msg] + + if key_len != 16: + # 5.2.1 (3) + msg = "Invalid key: {0}".format(key) + + self.logger.warning(msg) + self.start_response('400 Bad Request', []) + + return [msg] + + # Check for WebSocket Protocols + requested_protocols = self.environ.get( + 'HTTP_SEC_WEBSOCKET_PROTOCOL', '') + protocol = None + + if hasattr(self.application, 'app_protocol'): + allowed_protocol = self.application.app_protocol( + self.environ['PATH_INFO']) + + if allowed_protocol and allowed_protocol in requested_protocols: + protocol = allowed_protocol + self.logger.debug("Protocol allowed: {0}".format(protocol)) + + self.websocket = WebSocket(self.environ, Stream(self), self) + self.environ.update({ + 'wsgi.websocket_version': version, + 'wsgi.websocket': self.websocket + }) + + headers = [ + ("Upgrade", "websocket"), + ("Connection", "Upgrade"), + ("Sec-WebSocket-Accept", base64.b64encode( + hashlib.sha1(key + self.GUID).digest())), + ] + + if protocol: + headers.append(("Sec-WebSocket-Protocol", protocol)) + + self.logger.debug("WebSocket request accepted, switching protocols") + self.start_response("101 Switching Protocols", headers) + + @property + def logger(self): + if not hasattr(self.server, 'logger'): + self.server.logger = create_logger(__name__) + + return self.server.logger + + def log_request(self): + if '101' not in self.status: + self.logger.info(self.format_request()) + + @property + def active_client(self): + return self.server.clients[self.client_address] + + def start_response(self, status, headers, exc_info=None): + """ + Called when the handler is ready to send a response back to the remote + endpoint. A websocket connection may have not been created. + """ + writer = super(WebSocketHandler, self).start_response( + status, headers, exc_info=exc_info) + + self._prepare_response() + + return writer + + def _prepare_response(self): + """ + Sets up the ``pywsgi.Handler`` to work with a websocket response. + + This is used by other projects that need to support WebSocket + connections as part of a larger effort. + """ + assert not self.headers_sent + + if not self.environ.get('wsgi.websocket'): + # a WebSocket connection is not established, do nothing + return + + # So that `finalize_headers` doesn't write a Content-Length header + self.provided_content_length = False + + # The websocket is now controlling the response + self.response_use_chunked = False + + # Once the request is over, the connection must be closed + self.close_connection = True + + # Prevents the Date header from being written + self.provided_date = True diff --git a/awx/lib/site-packages/geventwebsocket/logging.py b/awx/lib/site-packages/geventwebsocket/logging.py new file mode 100644 index 0000000000..554ca02d61 --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/logging.py @@ -0,0 +1,31 @@ +from __future__ import absolute_import + +from logging import getLogger, StreamHandler, getLoggerClass, Formatter, DEBUG + + +def create_logger(name, debug=False, format=None): + Logger = getLoggerClass() + + class DebugLogger(Logger): + def getEffectiveLevel(x): + if x.level == 0 and debug: + return DEBUG + else: + return Logger.getEffectiveLevel(x) + + class DebugHandler(StreamHandler): + def emit(x, record): + StreamHandler.emit(x, record) if debug else None + + handler = DebugHandler() + handler.setLevel(DEBUG) + + if format: + handler.setFormatter(Formatter(format)) + + logger = getLogger(name) + del logger.handlers[:] + logger.__class__ = DebugLogger + logger.addHandler(handler) + + return logger diff --git a/awx/lib/site-packages/geventwebsocket/protocols/__init__.py b/awx/lib/site-packages/geventwebsocket/protocols/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/awx/lib/site-packages/geventwebsocket/protocols/base.py b/awx/lib/site-packages/geventwebsocket/protocols/base.py new file mode 100644 index 0000000000..1c05ab620a --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/protocols/base.py @@ -0,0 +1,35 @@ +class BaseProtocol(object): + PROTOCOL_NAME = '' + + def __init__(self, app): + self._app = app + + def on_open(self): + self.app.on_open() + + def on_message(self, message): + self.app.on_message(message) + + def on_close(self, reason=None): + self.app.on_close(reason) + + @property + def app(self): + if self._app: + return self._app + else: + raise Exception("No application coupled") + + @property + def server(self): + if not hasattr(self.app, 'ws'): + return None + + return self.app.ws.handler.server + + @property + def handler(self): + if not hasattr(self.app, 'ws'): + return None + + return self.app.ws.handler diff --git a/awx/lib/site-packages/geventwebsocket/protocols/wamp.py b/awx/lib/site-packages/geventwebsocket/protocols/wamp.py new file mode 100644 index 0000000000..954486549e --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/protocols/wamp.py @@ -0,0 +1,229 @@ +import inspect +import random +import string +import types + +try: + import ujson as json +except ImportError: + try: + import simplejson as json + except ImportError: + import json + +from ..exceptions import WebSocketError +from .base import BaseProtocol + + +def export_rpc(arg=None): + if isinstance(arg, types.FunctionType): + arg._rpc = arg.__name__ + return arg + + +def serialize(data): + return json.dumps(data) + + +class Prefixes(object): + def __init__(self): + self.prefixes = {} + + def add(self, prefix, uri): + self.prefixes[prefix] = uri + + def resolve(self, curie_or_uri): + if "http://" in curie_or_uri: + return curie_or_uri + elif ':' in curie_or_uri: + prefix, proc = curie_or_uri.split(':', 1) + return self.prefixes[prefix] + proc + else: + raise Exception(curie_or_uri) + + +class RemoteProcedures(object): + def __init__(self): + self.calls = {} + + def register_procedure(self, uri, proc): + self.calls[uri] = proc + + def register_object(self, uri, obj): + for k in inspect.getmembers(obj, inspect.ismethod): + if '_rpc' in k[1].__dict__: + proc_uri = uri + k[1]._rpc + self.calls[proc_uri] = (obj, k[1]) + + def call(self, uri, args): + if uri in self.calls: + proc = self.calls[uri] + + # Do the correct call whether it's a function or instance method. + if isinstance(proc, tuple): + if proc[1].__self__ is None: + # Create instance of object and call method + return proc[1](proc[0](), *args) + else: + # Call bound method on instance + return proc[1](*args) + else: + return self.calls[uri](*args) + else: + raise Exception("no such uri '{}'".format(uri)) + + +class Channels(object): + def __init__(self): + self.channels = {} + + def create(self, uri, prefix_matching=False): + if uri not in self.channels: + self.channels[uri] = [] + + # TODO: implement prefix matching + + def subscribe(self, uri, client): + if uri in self.channels: + self.channels[uri].append(client) + + def unsubscribe(self, uri, client): + if uri not in self.channels: + return + + client_index = self.channels[uri].index(client) + self.channels[uri].pop(client_index) + + if len(self.channels[uri]) == 0: + del self.channels[uri] + + def publish(self, uri, event, exclude=None, eligible=None): + if uri not in self.channels: + return + + # TODO: exclude & eligible + + msg = [WampProtocol.MSG_EVENT, uri, event] + + for client in self.channels[uri]: + try: + client.ws.send(serialize(msg)) + except WebSocketError: + # Seems someone didn't unsubscribe before disconnecting + self.channels[uri].remove(client) + + +class WampProtocol(BaseProtocol): + MSG_WELCOME = 0 + MSG_PREFIX = 1 + MSG_CALL = 2 + MSG_CALL_RESULT = 3 + MSG_CALL_ERROR = 4 + MSG_SUBSCRIBE = 5 + MSG_UNSUBSCRIBE = 6 + MSG_PUBLISH = 7 + MSG_EVENT = 8 + + PROTOCOL_NAME = "wamp" + + def __init__(self, *args, **kwargs): + self.procedures = RemoteProcedures() + self.prefixes = Prefixes() + self.session_id = ''.join( + [random.choice(string.digits + string.letters) + for i in xrange(16)]) + + super(WampProtocol, self).__init__(*args, **kwargs) + + def register_procedure(self, *args, **kwargs): + self.procedures.register_procedure(*args, **kwargs) + + def register_object(self, *args, **kwargs): + self.procedures.register_object(*args, **kwargs) + + def register_pubsub(self, *args, **kwargs): + if not hasattr(self.server, 'channels'): + self.server.channels = Channels() + + self.server.channels.create(*args, **kwargs) + + def do_handshake(self): + from geventwebsocket import get_version + + welcome = [ + self.MSG_WELCOME, + self.session_id, + 1, + 'gevent-websocket/' + get_version() + ] + self.app.ws.send(serialize(welcome)) + + def rpc_call(self, data): + call_id, curie_or_uri = data[1:3] + args = data[3:] + + if not isinstance(call_id, (str, unicode)): + raise Exception() + if not isinstance(curie_or_uri, (str, unicode)): + raise Exception() + + uri = self.prefixes.resolve(curie_or_uri) + + try: + result = self.procedures.call(uri, args) + result_msg = [self.MSG_CALL_RESULT, call_id, result] + except Exception, e: + result_msg = [self.MSG_CALL_ERROR, + call_id, 'http://TODO#generic', + str(type(e)), str(e)] + + self.app.on_message(serialize(result_msg)) + + def pubsub_action(self, data): + action = data[0] + curie_or_uri = data[1] + + if not isinstance(action, int): + raise Exception() + if not isinstance(curie_or_uri, (str, unicode)): + raise Exception() + + uri = self.prefixes.resolve(curie_or_uri) + + if action == self.MSG_SUBSCRIBE and len(data) == 2: + self.server.channels.subscribe(data[1], self.handler.active_client) + + elif action == self.MSG_UNSUBSCRIBE and len(data) == 2: + self.server.channels.unsubscribe( + data[1], self.handler.active_client) + + elif action == self.MSG_PUBLISH and len(data) >= 3: + payload = data[2] if len(data) >= 3 else None + exclude = data[3] if len(data) >= 4 else None + eligible = data[4] if len(data) >= 5 else None + + self.server.channels.publish(uri, payload, exclude, eligible) + + def on_open(self): + self.app.on_open() + self.do_handshake() + + def on_message(self, message): + data = json.loads(message) + + if not isinstance(data, list): + raise Exception('incoming data is no list') + + if data[0] == self.MSG_PREFIX and len(data) == 3: + prefix, uri = data[1:3] + self.prefixes.add(prefix, uri) + + elif data[0] == self.MSG_CALL and len(data) >= 3: + return self.rpc_call(data) + + elif data[0] in (self.MSG_SUBSCRIBE, self.MSG_UNSUBSCRIBE, + self.MSG_PUBLISH): + return self.pubsub_action(data) + else: + raise Exception("Unknown call") + diff --git a/awx/lib/site-packages/geventwebsocket/resource.py b/awx/lib/site-packages/geventwebsocket/resource.py new file mode 100644 index 0000000000..36c1fb367c --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/resource.py @@ -0,0 +1,74 @@ +import re + +from .protocols.base import BaseProtocol +from .exceptions import WebSocketError + + +class WebSocketApplication(object): + protocol_class = BaseProtocol + + def __init__(self, ws): + self.protocol = self.protocol_class(self) + self.ws = ws + + def handle(self): + self.protocol.on_open() + + while True: + try: + message = self.ws.receive() + except WebSocketError: + self.protocol.on_close() + break + + self.protocol.on_message(message) + + def on_open(self, *args, **kwargs): + pass + + def on_close(self, *args, **kwargs): + pass + + def on_message(self, message, *args, **kwargs): + self.ws.send(message, **kwargs) + + @classmethod + def protocol_name(cls): + return cls.protocol_class.PROTOCOL_NAME + + +class Resource(object): + def __init__(self, apps=None): + self.apps = apps if apps else [] + + def _app_by_path(self, environ_path): + # Which app matched the current path? + + for path, app in self.apps.iteritems(): + if re.match(path, environ_path): + return app + + def app_protocol(self, path): + app = self._app_by_path(path) + + if hasattr(app, 'protocol_name'): + return app.protocol_name() + else: + return '' + + def __call__(self, environ, start_response): + environ = environ + current_app = self._app_by_path(environ['PATH_INFO']) + + if current_app is None: + raise Exception("No apps defined") + + if 'wsgi.websocket' in environ: + ws = environ['wsgi.websocket'] + current_app = current_app(ws) + current_app.ws = ws # TODO: needed? + current_app.handle() + + return None + else: + return current_app(environ, start_response) diff --git a/awx/lib/site-packages/geventwebsocket/server.py b/awx/lib/site-packages/geventwebsocket/server.py new file mode 100644 index 0000000000..00443b8a32 --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/server.py @@ -0,0 +1,34 @@ +from gevent.pywsgi import WSGIServer + +from .handler import WebSocketHandler +from .logging import create_logger + + +class WebSocketServer(WSGIServer): + debug_log_format = ( + '-' * 80 + '\n' + + '%(levelname)s in %(module)s [%(pathname)s:%(lineno)d]:\n' + + '%(message)s\n' + + '-' * 80 + ) + + def __init__(self, *args, **kwargs): + self.debug = kwargs.pop('debug', False) + self.pre_start_hook = kwargs.pop('pre_start_hook', None) + self._logger = None + self.clients = {} + + kwargs['handler_class'] = WebSocketHandler + super(WebSocketServer, self).__init__(*args, **kwargs) + + def handle(self, socket, address): + handler = self.handler_class(socket, address, self) + handler.handle() + + @property + def logger(self): + if not self._logger: + self._logger = create_logger( + __name__, self.debug, self.debug_log_format) + + return self._logger diff --git a/awx/lib/site-packages/geventwebsocket/utf8validator.py b/awx/lib/site-packages/geventwebsocket/utf8validator.py new file mode 100644 index 0000000000..b8a3e8a5ad --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/utf8validator.py @@ -0,0 +1,128 @@ +############################################################################### +## +## Copyright 2011-2013 Tavendo GmbH +## +## Note: +## +## This code is a Python implementation of the algorithm +## +## "Flexible and Economical UTF-8 Decoder" +## +## by Bjoern Hoehrmann +## +## bjoern@hoehrmann.de +## http://bjoern.hoehrmann.de/utf-8/decoder/dfa/ +## +## Licensed under the Apache License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## +############################################################################### + + +## use Cython implementation of UTF8 validator if available +## +try: + from wsaccel.utf8validator import Utf8Validator +except: + ## fallback to pure Python implementation + + class Utf8Validator: + """ + Incremental UTF-8 validator with constant memory consumption (minimal + state). + + Implements the algorithm "Flexible and Economical UTF-8 Decoder" by + Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/). + """ + + ## DFA transitions + UTF8VALIDATOR_DFA = [ + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 00..1f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 20..3f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 40..5f + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 60..7f + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, # 80..9f + 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, # a0..bf + 8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, # c0..df + 0xa,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x4,0x3,0x3, # e0..ef + 0xb,0x6,0x6,0x6,0x5,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8, # f0..ff + 0x0,0x1,0x2,0x3,0x5,0x8,0x7,0x1,0x1,0x1,0x4,0x6,0x1,0x1,0x1,0x1, # s0..s0 + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,1, # s1..s2 + 1,2,1,1,1,1,1,2,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1, # s3..s4 + 1,2,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,1,1, # s5..s6 + 1,3,1,1,1,1,1,3,1,3,1,1,1,1,1,1,1,3,1,1,1,1,1,1,1,1,1,1,1,1,1,1, # s7..s8 + ] + + UTF8_ACCEPT = 0 + UTF8_REJECT = 1 + + def __init__(self): + self.reset() + + def decode(self, b): + """ + Eat one UTF-8 octet, and validate on the fly. + + Returns UTF8_ACCEPT when enough octets have been consumed, in which case + self.codepoint contains the decoded Unicode code point. + + Returns UTF8_REJECT when invalid UTF-8 was encountered. + + Returns some other positive integer when more octets need to be eaten. + """ + type = Utf8Validator.UTF8VALIDATOR_DFA[b] + + if self.state != Utf8Validator.UTF8_ACCEPT: + self.codepoint = (b & 0x3f) | (self.codepoint << 6) + else: + self.codepoint = (0xff >> type) & b + + self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + self.state * 16 + type] + + return self.state + + def reset(self): + """ + Reset validator to start new incremental UTF-8 decode/validation. + """ + self.state = Utf8Validator.UTF8_ACCEPT + self.codepoint = 0 + self.i = 0 + + def validate(self, ba): + """ + Incrementally validate a chunk of bytes provided as string. + + Will return a quad (valid?, endsOnCodePoint?, currentIndex, totalIndex). + + As soon as an octet is encountered which renders the octet sequence + invalid, a quad with valid? == False is returned. currentIndex returns + the index within the currently consumed chunk, and totalIndex the + index within the total consumed sequence that was the point of bail out. + When valid? == True, currentIndex will be len(ba) and totalIndex the + total amount of consumed bytes. + """ + + l = len(ba) + + for i in xrange(l): + ## optimized version of decode(), since we are not interested in actual code points + + self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + (self.state << 4) + Utf8Validator.UTF8VALIDATOR_DFA[ord(ba[i])]] + + if self.state == Utf8Validator.UTF8_REJECT: + self.i += i + return False, False, i, self.i + + self.i += l + + return True, self.state == Utf8Validator.UTF8_ACCEPT, l, self.i diff --git a/awx/lib/site-packages/geventwebsocket/utils.py b/awx/lib/site-packages/geventwebsocket/utils.py new file mode 100644 index 0000000000..2e5bc3b7ee --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/utils.py @@ -0,0 +1,45 @@ +import subprocess + + +def get_version(version=None): + "Returns a PEP 386-compliant version number from VERSION." + + if version is None: + from geventwebsocket import VERSION as version + else: + assert len(version) == 5 + assert version[3] in ('alpha', 'beta', 'rc', 'final') + + # Now build the two parts of the version number: + # main = X.Y[.Z] + # sub = .devN - for pre-alpha releases + # | {a|b|c}N - for alpha, beta and rc releases + + parts = 2 if version[2] == 0 else 3 + main = '.'.join(str(x) for x in version[:parts]) + + sub = '' + if version[3] == 'alpha' and version[4] == 0: + hg_changeset = get_hg_changeset() + if hg_changeset: + sub = '.dev{0}'.format(hg_changeset) + + elif version[3] != 'final': + mapping = {'alpha': 'a', 'beta': 'b', 'rc': 'c'} + sub = mapping[version[3]] + str(version[4]) + + return str(main + sub) + + +def get_hg_changeset(): + rev, err = subprocess.Popen( + 'hg id -i', + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ).communicate() + + if err: + return None + else: + return rev.strip().replace('+', '') diff --git a/awx/lib/site-packages/geventwebsocket/websocket.py b/awx/lib/site-packages/geventwebsocket/websocket.py new file mode 100644 index 0000000000..6d4f76d32f --- /dev/null +++ b/awx/lib/site-packages/geventwebsocket/websocket.py @@ -0,0 +1,543 @@ +import struct + +from socket import error + +from .exceptions import ProtocolError +from .exceptions import WebSocketError +from .exceptions import FrameTooLargeException + +from .utf8validator import Utf8Validator + + +MSG_SOCKET_DEAD = "Socket is dead" +MSG_ALREADY_CLOSED = "Connection is already closed" +MSG_CLOSED = "Connection closed" + + +class WebSocket(object): + """ + Base class for supporting websocket operations. + + :ivar environ: The http environment referenced by this connection. + :ivar closed: Whether this connection is closed/closing. + :ivar stream: The underlying file like object that will be read from / + written to by this WebSocket object. + """ + + __slots__ = ('utf8validator', 'utf8validate_last', 'environ', 'closed', + 'stream', 'raw_write', 'raw_read', 'handler') + + OPCODE_CONTINUATION = 0x00 + OPCODE_TEXT = 0x01 + OPCODE_BINARY = 0x02 + OPCODE_CLOSE = 0x08 + OPCODE_PING = 0x09 + OPCODE_PONG = 0x0a + + def __init__(self, environ, stream, handler): + self.environ = environ + self.closed = False + + self.stream = stream + + self.raw_write = stream.write + self.raw_read = stream.read + + self.utf8validator = Utf8Validator() + self.handler = handler + + def __del__(self): + try: + self.close() + except: + # close() may fail if __init__ didn't complete + pass + + def _decode_bytes(self, bytestring): + """ + Internal method used to convert the utf-8 encoded bytestring into + unicode. + + If the conversion fails, the socket will be closed. + """ + + if not bytestring: + return u'' + + try: + return bytestring.decode('utf-8') + except UnicodeDecodeError: + self.close(1007) + + raise + + def _encode_bytes(self, text): + """ + :returns: The utf-8 byte string equivalent of `text`. + """ + + if isinstance(text, str): + return text + + if not isinstance(text, unicode): + text = unicode(text or '') + + return text.encode('utf-8') + + def _is_valid_close_code(self, code): + """ + :returns: Whether the returned close code is a valid hybi return code. + """ + if code < 1000: + return False + + if 1004 <= code <= 1006: + return False + + if 1012 <= code <= 1016: + return False + + if code == 1100: + # not sure about this one but the autobahn fuzzer requires it. + return False + + if 2000 <= code <= 2999: + return False + + return True + + @property + def current_app(self): + if hasattr(self.handler.server.application, 'current_app'): + return self.handler.server.application.current_app + else: + # For backwards compatibility reasons + class MockApp(): + def on_close(self, *args): + pass + + return MockApp() + + @property + def origin(self): + if not self.environ: + return + + return self.environ.get('HTTP_ORIGIN') + + @property + def protocol(self): + if not self.environ: + return + + return self.environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL') + + @property + def version(self): + if not self.environ: + return + + return self.environ.get('HTTP_SEC_WEBSOCKET_VERSION') + + @property + def path(self): + if not self.environ: + return + + return self.environ.get('PATH_INFO') + + @property + def logger(self): + return self.handler.logger + + def handle_close(self, header, payload): + """ + Called when a close frame has been decoded from the stream. + + :param header: The decoded `Header`. + :param payload: The bytestring payload associated with the close frame. + """ + if not payload: + self.close(1000, None) + + return + + if len(payload) < 2: + raise ProtocolError('Invalid close frame: {0} {1}'.format( + header, payload)) + + code = struct.unpack('!H', str(payload[:2]))[0] + payload = payload[2:] + + if payload: + validator = Utf8Validator() + val = validator.validate(payload) + + if not val[0]: + raise UnicodeError + + if not self._is_valid_close_code(code): + raise ProtocolError('Invalid close code {0}'.format(code)) + + self.close(code, payload) + + def handle_ping(self, header, payload): + self.send_frame(payload, self.OPCODE_PONG) + + def handle_pong(self, header, payload): + pass + + def read_frame(self): + """ + Block until a full frame has been read from the socket. + + This is an internal method as calling this will not cleanup correctly + if an exception is called. Use `receive` instead. + + :return: The header and payload as a tuple. + """ + + header = Header.decode_header(self.stream) + + if header.flags: + raise ProtocolError + + if not header.length: + return header, '' + + try: + payload = self.raw_read(header.length) + except error: + payload = '' + except Exception: + # TODO log out this exception + payload = '' + + if len(payload) != header.length: + raise WebSocketError('Unexpected EOF reading frame payload') + + if header.mask: + payload = header.unmask_payload(payload) + + return header, payload + + def validate_utf8(self, payload): + # Make sure the frames are decodable independently + self.utf8validate_last = self.utf8validator.validate(payload) + + if not self.utf8validate_last[0]: + raise UnicodeError("Encountered invalid UTF-8 while processing " + "text message at payload octet index " + "{0:d}".format(self.utf8validate_last[3])) + + def read_message(self): + """ + Return the next text or binary message from the socket. + + This is an internal method as calling this will not cleanup correctly + if an exception is called. Use `receive` instead. + """ + opcode = None + message = "" + + while True: + header, payload = self.read_frame() + f_opcode = header.opcode + + if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY): + # a new frame + if opcode: + raise ProtocolError("The opcode in non-fin frame is " + "expected to be zero, got " + "{0!r}".format(f_opcode)) + + # Start reading a new message, reset the validator + self.utf8validator.reset() + self.utf8validate_last = (True, True, 0, 0) + + opcode = f_opcode + + elif f_opcode == self.OPCODE_CONTINUATION: + if not opcode: + raise ProtocolError("Unexpected frame with opcode=0") + + elif f_opcode == self.OPCODE_PING: + self.handle_ping(header, payload) + continue + + elif f_opcode == self.OPCODE_PONG: + self.handle_pong(header, payload) + continue + + elif f_opcode == self.OPCODE_CLOSE: + self.handle_close(header, payload) + return + + else: + raise ProtocolError("Unexpected opcode={0!r}".format(f_opcode)) + + if opcode == self.OPCODE_TEXT: + self.validate_utf8(payload) + + message += payload + + if header.fin: + break + + if opcode == self.OPCODE_TEXT: + self.validate_utf8(message) + return message + else: + return bytearray(message) + + def receive(self): + """ + Read and return a message from the stream. If `None` is returned, then + the socket is considered closed/errored. + """ + + if self.closed: + self.current_app.on_close(MSG_ALREADY_CLOSED) + raise WebSocketError(MSG_ALREADY_CLOSED) + + try: + return self.read_message() + except UnicodeError: + self.close(1007) + except ProtocolError: + self.close(1002) + except error: + self.close() + self.current_app.on_close(MSG_CLOSED) + + return None + + def send_frame(self, message, opcode): + """ + Send a frame over the websocket with message as its payload + """ + if self.closed: + self.current_app.on_close(MSG_ALREADY_CLOSED) + raise WebSocketError(MSG_ALREADY_CLOSED) + + if opcode == self.OPCODE_TEXT: + message = self._encode_bytes(message) + elif opcode == self.OPCODE_BINARY: + message = str(message) + + header = Header.encode_header(True, opcode, '', len(message), 0) + + try: + self.raw_write(header + message) + except error: + raise WebSocketError(MSG_SOCKET_DEAD) + + def send(self, message, binary=None): + """ + Send a frame over the websocket with message as its payload + """ + if binary is None: + binary = not isinstance(message, (str, unicode)) + + opcode = self.OPCODE_BINARY if binary else self.OPCODE_TEXT + + try: + self.send_frame(message, opcode) + except WebSocketError: + self.current_app.on_close(MSG_SOCKET_DEAD) + raise WebSocketError(MSG_SOCKET_DEAD) + + def close(self, code=1000, message=''): + """ + Close the websocket and connection, sending the specified code and + message. The underlying socket object is _not_ closed, that is the + responsibility of the initiator. + """ + + if self.closed: + self.current_app.on_close(MSG_ALREADY_CLOSED) + + try: + message = self._encode_bytes(message) + + self.send_frame( + struct.pack('!H%ds' % len(message), code, message), + opcode=self.OPCODE_CLOSE) + except WebSocketError: + # Failed to write the closing frame but it's ok because we're + # closing the socket anyway. + self.logger.debug("Failed to write closing frame -> closing socket") + finally: + self.logger.debug("Closed WebSocket") + self.closed = True + + self.stream = None + self.raw_write = None + self.raw_read = None + + self.environ = None + + #self.current_app.on_close(MSG_ALREADY_CLOSED) + + +class Stream(object): + """ + Wraps the handler's socket/rfile attributes and makes it in to a file like + object that can be read from/written to by the lower level websocket api. + """ + + __slots__ = ('handler', 'read', 'write') + + def __init__(self, handler): + self.handler = handler + self.read = handler.rfile.read + self.write = handler.socket.sendall + + +class Header(object): + __slots__ = ('fin', 'mask', 'opcode', 'flags', 'length') + + FIN_MASK = 0x80 + OPCODE_MASK = 0x0f + MASK_MASK = 0x80 + LENGTH_MASK = 0x7f + + RSV0_MASK = 0x40 + RSV1_MASK = 0x20 + RSV2_MASK = 0x10 + + # bitwise mask that will determine the reserved bits for a frame header + HEADER_FLAG_MASK = RSV0_MASK | RSV1_MASK | RSV2_MASK + + def __init__(self, fin=0, opcode=0, flags=0, length=0): + self.mask = '' + self.fin = fin + self.opcode = opcode + self.flags = flags + self.length = length + + def mask_payload(self, payload): + payload = bytearray(payload) + mask = bytearray(self.mask) + + for i in xrange(self.length): + payload[i] ^= mask[i % 4] + + return str(payload) + + # it's the same operation + unmask_payload = mask_payload + + def __repr__(self): + return ("
").format(self.fin, self.opcode, self.length, + self.flags, id(self)) + + @classmethod + def decode_header(cls, stream): + """ + Decode a WebSocket header. + + :param stream: A file like object that can be 'read' from. + :returns: A `Header` instance. + """ + read = stream.read + data = read(2) + + if len(data) != 2: + raise WebSocketError("Unexpected EOF while decoding header") + + first_byte, second_byte = struct.unpack('!BB', data) + + header = cls( + fin=first_byte & cls.FIN_MASK == cls.FIN_MASK, + opcode=first_byte & cls.OPCODE_MASK, + flags=first_byte & cls.HEADER_FLAG_MASK, + length=second_byte & cls.LENGTH_MASK) + + has_mask = second_byte & cls.MASK_MASK == cls.MASK_MASK + + if header.opcode > 0x07: + if not header.fin: + raise ProtocolError( + "Received fragmented control frame: {0!r}".format(data)) + + # Control frames MUST have a payload length of 125 bytes or less + if header.length > 125: + raise FrameTooLargeException( + "Control frame cannot be larger than 125 bytes: " + "{0!r}".format(data)) + + if header.length == 126: + # 16 bit length + data = read(2) + + if len(data) != 2: + raise WebSocketError('Unexpected EOF while decoding header') + + header.length = struct.unpack('!H', data)[0] + elif header.length == 127: + # 64 bit length + data = read(8) + + if len(data) != 8: + raise WebSocketError('Unexpected EOF while decoding header') + + header.length = struct.unpack('!Q', data)[0] + + if has_mask: + mask = read(4) + + if len(mask) != 4: + raise WebSocketError('Unexpected EOF while decoding header') + + header.mask = mask + + return header + + @classmethod + def encode_header(cls, fin, opcode, mask, length, flags): + """ + Encodes a WebSocket header. + + :param fin: Whether this is the final frame for this opcode. + :param opcode: The opcode of the payload, see `OPCODE_*` + :param mask: Whether the payload is masked. + :param length: The length of the frame. + :param flags: The RSV* flags. + :return: A bytestring encoded header. + """ + first_byte = opcode + second_byte = 0 + extra = '' + + if fin: + first_byte |= cls.FIN_MASK + + if flags & cls.RSV0_MASK: + first_byte |= cls.RSV0_MASK + + if flags & cls.RSV1_MASK: + first_byte |= cls.RSV1_MASK + + if flags & cls.RSV2_MASK: + first_byte |= cls.RSV2_MASK + + # now deal with length complexities + if length < 126: + second_byte += length + elif length <= 0xffff: + second_byte += 126 + extra = struct.pack('!H', length) + elif length <= 0xffffffffffffffff: + second_byte += 127 + extra = struct.pack('!Q', length) + else: + raise FrameTooLargeException + + if mask: + second_byte |= cls.MASK_MASK + + extra += mask + + return chr(first_byte) + chr(second_byte) + extra diff --git a/awx/lib/site-packages/socketio/__init__.py b/awx/lib/site-packages/socketio/__init__.py new file mode 100644 index 0000000000..ee4faba6a9 --- /dev/null +++ b/awx/lib/site-packages/socketio/__init__.py @@ -0,0 +1,76 @@ +__version__ = (0, 3, 5) + +import logging +import gevent + +log = logging.getLogger(__name__) + + +def socketio_manage(environ, namespaces, request=None, error_handler=None): + """Main SocketIO management function, call from within your Framework of + choice's view. + + The ``environ`` variable is the WSGI ``environ``. It is used to extract + Socket object from the underlying server (as the 'socketio' key), and will + be attached to both the ``Socket`` and ``Namespace`` objects. + + The ``namespaces`` parameter is a dictionary of the namespace string + representation as key, and the BaseNamespace namespace class descendant as + a value. The empty string ('') namespace is the global namespace. You can + use Socket.GLOBAL_NS to be more explicit. So it would look like: + + .. code-block:: python + + namespaces={'': GlobalNamespace, + '/chat': ChatNamespace} + + The ``request`` object is not required, but will probably be useful to pass + framework-specific things into your Socket and Namespace functions. It will + simply be attached to the Socket and Namespace object (accessible through + ``self.request`` in both cases), and it is not accessed in any case by the + ``gevent-socketio`` library. + + Pass in an ``error_handler`` if you want to override the default + error_handler (which is :func:`socketio.virtsocket.default_error_handler`. + The callable you pass in should have the same signature as the default + error handler. + + This function will block the current "view" or "controller" in your + framework to do the recv/send on the socket, and dispatch incoming messages + to your namespaces. + + This is a simple example using Pyramid: + + .. code-block:: python + + def my_view(request): + socketio_manage(request.environ, {'': GlobalNamespace}, request) + NOTE: You must understand that this function is going to be called + *only once* per socket opening, *even though* you are using a long + polling mechanism. The subsequent calls (for long polling) will + be hooked directly at the server-level, to interact with the + active ``Socket`` instance. This means you will *not* get access + to the future ``request`` or ``environ`` objects. This is of + particular importance regarding sessions (like Beaker). The + session will be opened once at the opening of the Socket, and not + closed until the socket is closed. You are responsible for + opening and closing the cookie-based session yourself if you want + to keep its data in sync with the rest of your GET/POST calls. + """ + socket = environ['socketio'] + socket._set_environ(environ) + socket._set_namespaces(namespaces) + + if request: + socket._set_request(request) + + if error_handler: + socket._set_error_handler(error_handler) + + receiver_loop = socket._spawn_receiver_loop() + watcher = socket._spawn_watcher() + + gevent.joinall([receiver_loop, watcher]) + + # TODO: double check, what happens to the WSGI request here ? it vanishes ? + return diff --git a/awx/lib/site-packages/socketio/handler.py b/awx/lib/site-packages/socketio/handler.py new file mode 100644 index 0000000000..2bdae0b263 --- /dev/null +++ b/awx/lib/site-packages/socketio/handler.py @@ -0,0 +1,149 @@ +import sys +import re +import gevent +import urlparse + +from gevent.pywsgi import WSGIHandler +from socketio import transports +from geventwebsocket.handler import WebSocketHandler + + +class SocketIOHandler(WSGIHandler): + RE_REQUEST_URL = re.compile(r""" + ^/(?P[^/]+) + /(?P[^/]+) + /(?P[^/]+) + /(?P[^/]+)/?$ + """, re.X) + RE_HANDSHAKE_URL = re.compile(r"^/(?P[^/]+)/1/$", re.X) + + handler_types = { + 'websocket': transports.WebsocketTransport, + 'flashsocket': transports.FlashSocketTransport, + 'htmlfile': transports.HTMLFileTransport, + 'xhr-multipart': transports.XHRMultipartTransport, + 'xhr-polling': transports.XHRPollingTransport, + 'jsonp-polling': transports.JSONPolling, + } + + def __init__(self, *args, **kwargs): + self.socketio_connection = False + self.allowed_paths = None + + super(SocketIOHandler, self).__init__(*args, **kwargs) + + self.transports = self.handler_types.keys() + if self.server.transports: + self.transports = self.server.transports + if not set(self.transports).issubset(set(self.handler_types)): + raise Exception("transports should be elements of: %s" % + (self.handler_types.keys())) + + def _do_handshake(self, tokens): + if tokens["resource"] != self.server.resource: + self.log_error("socket.io URL mismatch") + else: + socket = self.server.get_socket() + data = "%s:15:10:%s" % (socket.sessid, ",".join(self.transports)) + self.write_smart(data) + + def write_jsonp_result(self, data, wrapper="0"): + self.start_response("200 OK", [ + ("Content-Type", "application/javascript"), + ]) + self.result = ['io.j[%s]("%s");' % (wrapper, data)] + + def write_plain_result(self, data): + self.start_response("200 OK", [ + ("Access-Control-Allow-Origin", self.environ.get('HTTP_ORIGIN', '*')), + ("Access-Control-Allow-Credentials", "true"), + ("Access-Control-Allow-Methods", "POST, GET, OPTIONS"), + ("Access-Control-Max-Age", 3600), + ("Content-Type", "text/plain"), + ]) + self.result = [data] + + def write_smart(self, data): + args = urlparse.parse_qs(self.environ.get("QUERY_STRING")) + + if "jsonp" in args: + self.write_jsonp_result(data, args["jsonp"][0]) + else: + self.write_plain_result(data) + + self.process_result() + + def handle_one_response(self): + path = self.environ.get('PATH_INFO') + + # Kick non-socket.io requests to our superclass + if not path.lstrip('/').startswith(self.server.resource): + return super(SocketIOHandler, self).handle_one_response() + + self.status = None + self.headers_sent = False + self.result = None + self.response_length = 0 + self.response_use_chunked = False + request_method = self.environ.get("REQUEST_METHOD") + request_tokens = self.RE_REQUEST_URL.match(path) + + # Parse request URL and QUERY_STRING and do handshake + if request_tokens: + request_tokens = request_tokens.groupdict() + else: + handshake_tokens = self.RE_HANDSHAKE_URL.match(path) + + if handshake_tokens: + return self._do_handshake(handshake_tokens.groupdict()) + else: + # This is no socket.io request. Let the WSGI app handle it. + return super(SocketIOHandler, self).handle_one_response() + + # Setup the transport and socket + transport = self.handler_types.get(request_tokens["transport_id"]) + sessid = request_tokens["sessid"] + socket = self.server.get_socket(sessid) + + # In case this is WebSocket request, switch to the WebSocketHandler + # FIXME: fix this ugly class change + if issubclass(transport, (transports.WebsocketTransport, + transports.FlashSocketTransport)): + self.__class__ = WebSocketHandler + self.prevent_wsgi_call = True # thank you + # TODO: any errors, treat them ?? + self.handle_one_response() + + # Make the socket object available for WSGI apps + self.environ['socketio'] = socket + + # Create a transport and handle the request likewise + self.transport = transport(self) + + jobs = self.transport.connect(socket, request_method) + # Keep track of those jobs (reading, writing and heartbeat jobs) so + # that we can kill them later with Socket.kill() + socket.jobs.extend(jobs) + + try: + # We'll run the WSGI app if it wasn't already done. + if socket.wsgi_app_greenlet is None: + # TODO: why don't we spawn a call to handle_one_response here ? + # why call directly the WSGI machinery ? + start_response = lambda status, headers, exc=None: None + socket.wsgi_app_greenlet = gevent.spawn(self.application, + self.environ, + start_response) + except: + self.handle_error(*sys.exc_info()) + + # TODO DOUBLE-CHECK: do we need to joinall here ? + gevent.joinall(jobs) + + def handle_bad_request(self): + self.close_connection = True + self.start_reponse("400 Bad Request", [ + ('Content-Type', 'text/plain'), + ('Connection', 'close'), + ('Content-Length', 0) + ]) diff --git a/awx/lib/site-packages/socketio/mixins.py b/awx/lib/site-packages/socketio/mixins.py new file mode 100644 index 0000000000..b101f2433e --- /dev/null +++ b/awx/lib/site-packages/socketio/mixins.py @@ -0,0 +1,73 @@ +""" +These are general-purpose Mixins -- for use with Namespaces -- that are +generally useful for most simple projects, e.g. Rooms, Broadcast. + +You'll likely want to create your own Mixins. +""" + + +class RoomsMixin(object): + def __init__(self, *args, **kwargs): + super(RoomsMixin, self).__init__(*args, **kwargs) + if 'rooms' not in self.session: + self.session['rooms'] = set() # a set of simple strings + + def join(self, room): + """Lets a user join a room on a specific Namespace.""" + self.session['rooms'].add(self._get_room_name(room)) + + def leave(self, room): + """Lets a user leave a room on a specific Namespace.""" + self.session['rooms'].remove(self._get_room_name(room)) + + def _get_room_name(self, room): + return self.ns_name + '_' + room + + def emit_to_room(self, room, event, *args): + """This is sent to all in the room (in this particular Namespace)""" + pkt = dict(type="event", + name=event, + args=args, + endpoint=self.ns_name) + room_name = self._get_room_name(room) + for sessid, socket in self.socket.server.sockets.iteritems(): + if 'rooms' not in socket.session: + continue + if room_name in socket.session['rooms'] and self.socket != socket: + socket.send_packet(pkt) + + +class BroadcastMixin(object): + """Mix in this class with your Namespace to have a broadcast event method. + + Use it like this: + class MyNamespace(BaseNamespace, BroadcastMixin): + def on_chatmsg(self, event): + self.broadcast_event('chatmsg', event) + """ + def broadcast_event(self, event, *args): + """ + This is sent to all in the sockets in this particular Namespace, + including itself. + """ + pkt = dict(type="event", + name=event, + args=args, + endpoint=self.ns_name) + + for sessid, socket in self.socket.server.sockets.iteritems(): + socket.send_packet(pkt) + + def broadcast_event_not_me(self, event, *args): + """ + This is sent to all in the sockets in this particular Namespace, + except itself. + """ + pkt = dict(type="event", + name=event, + args=args, + endpoint=self.ns_name) + + for sessid, socket in self.socket.server.sockets.iteritems(): + if socket is not self.socket: + socket.send_packet(pkt) diff --git a/awx/lib/site-packages/socketio/namespace.py b/awx/lib/site-packages/socketio/namespace.py new file mode 100644 index 0000000000..6fc42af80c --- /dev/null +++ b/awx/lib/site-packages/socketio/namespace.py @@ -0,0 +1,468 @@ +import gevent +import re +import logging +import inspect + +log = logging.getLogger(__name__) + +# regex to check the event name contains only alpha numerical characters +allowed_event_name_regex = re.compile(r'^[A-Za-z][A-Za-z0-9_ ]*$') + + +class BaseNamespace(object): + """The **Namespace** is the primary interface a developer will use + to create a gevent-socketio-based application. + + You should create your own subclass of this class, optionally using one + of the :mod:`socketio.mixins` provided (or your own), and define methods + such as: + + .. code-block:: python + :linenos: + + def on_my_event(self, my_first_arg, my_second_arg): + print "This is the my_first_arg object", my_first_arg + print "This is the my_second_arg object", my_second_arg + + def on_my_second_event(self, whatever): + print "This holds the first arg that was passed", whatever + + We can also access the full packet directly by making an event handler + that accepts a single argument named 'packet': + + .. code-block:: python + :linenos: + + def on_third_event(self, packet): + print "The full packet", packet + print "See the BaseNamespace::call_method() method for details" + """ + def __init__(self, environ, ns_name, request=None): + self.environ = environ + self.socket = environ['socketio'] + self.session = self.socket.session # easily accessible session + self.request = request + self.ns_name = ns_name + self.allowed_methods = None # be careful, None means all methods + # are allowed while an empty list + # means totally closed. + self.jobs = [] + + self.reset_acl() + + # Init the mixins if specified after. + super(BaseNamespace, self).__init__() + + def is_method_allowed(self, method_name): + """ACL system: this checks if you have access to that method_name, + according to the set ACLs""" + if self.allowed_methods is None: + return True + else: + return method_name in self.allowed_methods + + def add_acl_method(self, method_name): + """ACL system: make the method_name accessible to the current socket""" + + if isinstance(self.allowed_methods, set): + self.allowed_methods.add(method_name) + else: + self.allowed_methods = set([method_name]) + + def del_acl_method(self, method_name): + """ACL system: ensure the user will not have access to that method.""" + if self.allowed_methods is None: + raise ValueError( + "Trying to delete an ACL method, but none were" + + " defined yet! Or: No ACL restrictions yet, why would you" + + " delete one?" + ) + + self.allowed_methods.remove(method_name) + + def lift_acl_restrictions(self): + """ACL system: This removes restrictions on the Namespace's methods, so + that all the ``on_*()`` and ``recv_*()`` can be accessed. + """ + self.allowed_methods = None + + def get_initial_acl(self): + """ACL system: If you define this function, you must return + all the 'event' names that you want your User (the established + virtual Socket) to have access to. + + If you do not define this function, the user will have free + access to all of the ``on_*()`` and ``recv_*()`` functions, + etc.. methods. + + Return something like: ``['on_connect', 'on_public_method']`` + + You can later modify this list dynamically (inside + ``on_connect()`` for example) using: + + .. code-block:: python + + self.add_acl_method('on_secure_method') + + ``self.request`` is available in here, if you're already ready to + do some auth. check. + + The ACLs are checked by the :meth:`process_packet` and/or + :meth:`process_event` default implementations, before calling + the class's methods. + + **Beware**, returning ``None`` leaves the namespace completely + accessible. + """ + return None + + def reset_acl(self): + """Resets ACL to its initial value (calling + :meth:`get_initial_acl`` and applying that again). + """ + self.allowed_methods = self.get_initial_acl() + + def process_packet(self, packet): + """If you override this, NONE of the functions in this class + will be called. It is responsible for dispatching to + :meth:`process_event` (which in turn calls ``on_*()`` and + ``recv_*()`` methods). + + If the packet arrived here, it is because it belongs to this endpoint. + + For each packet arriving, the only possible path of execution, that is, + the only methods that *can* be called are the following: + + * recv_connect() + * recv_message() + * recv_json() + * recv_error() + * recv_disconnect() + * on_*() + """ + packet_type = packet['type'] + + if packet_type == 'event': + return self.process_event(packet) + elif packet_type == 'message': + return self.call_method_with_acl('recv_message', packet, + packet['data']) + elif packet_type == 'json': + return self.call_method_with_acl('recv_json', packet, + packet['data']) + elif packet_type == 'connect': + self.socket.send_packet(packet) + return self.call_method_with_acl('recv_connect', packet) + elif packet_type == 'error': + return self.call_method_with_acl('recv_error', packet) + elif packet_type == 'ack': + callback = self.socket._pop_ack_callback(packet['ackId']) + if not callback: + print "ERROR: No such callback for ackId %s" % packet['ackId'] + return + try: + return callback(*(packet['args'])) + except TypeError, e: + print "ERROR: Call to callback function failed", packet + elif packet_type == 'disconnect': + # Force a disconnect on the namespace. + return self.call_method_with_acl('recv_disconnect', packet) + else: + print "Unprocessed packet", packet + # TODO: manage the other packet types: disconnect + + def process_event(self, packet): + """This function dispatches ``event`` messages to the correct + functions. You should override this method only if you are not + satisfied with the automatic dispatching to + ``on_``-prefixed methods. You could then implement your own dispatch. + See the source code for inspiration. + + To process events that have callbacks on the client side, you + must define your event with a single parameter: ``packet``. + In this case, it will be the full ``packet`` object and you + can inspect its ``ack`` and ``id`` keys to define if and how + you reply. A correct reply to an event with a callback would + look like this: + + .. code-block:: python + + def on_my_callback(self, packet): + if 'ack' in packet': + self.emit('go_back', 'param1', id=packet['id']) + """ + args = packet['args'] + name = packet['name'] + if not allowed_event_name_regex.match(name): + self.error("unallowed_event_name", + "name must only contains alpha numerical characters") + return + + method_name = 'on_' + name.replace(' ', '_') + # This means the args, passed as a list, will be expanded to + # the method arg and if you passed a dict, it will be a dict + # as the first parameter. + + return self.call_method_with_acl(method_name, packet, *args) + + def call_method_with_acl(self, method_name, packet, *args): + """You should always use this function to call the methods, + as it checks if the user is allowed according to the ACLs. + + If you override :meth:`process_packet` or + :meth:`process_event`, you should definitely want to use this + instead of ``getattr(self, 'my_method')()`` + """ + if not self.is_method_allowed(method_name): + self.error('method_access_denied', + 'You do not have access to method "%s"' % method_name) + return + + return self.call_method(method_name, packet, *args) + + def call_method(self, method_name, packet, *args): + """This function is used to implement the two behaviors on dispatched + ``on_*()`` and ``recv_*()`` method calls. + + Those are the two behaviors: + + * If there is only one parameter on the dispatched method and + it is equal to ``packet``, then pass in the packet as the + sole parameter. + + * Otherwise, pass in the arguments as specified by the + different ``recv_*()`` methods args specs, or the + :meth:`process_event` documentation. + """ + method = getattr(self, method_name, None) + if method is None: + self.error('no_such_method', + 'The method "%s" was not found' % method_name) + return + + specs = inspect.getargspec(method) + func_args = specs.args + if not len(func_args) or func_args[0] != 'self': + self.error("invalid_method_args", + "The server-side method is invalid, as it doesn't " + "have 'self' as its first argument") + return + if len(func_args) == 2 and func_args[1] == 'packet': + return method(packet) + else: + return method(*args) + + def initialize(self): + """This is called right after ``__init__``, on the initial + creation of a namespace so you may handle any setup job you + need. + + Namespaces are created only when some packets arrive that ask + for the namespace. They are not created altogether when a new + :class:`~socketio.virtsocket.Socket` connection is established, + so you can have many many namespaces assigned (when calling + :func:`~socketio.socketio_manage`) without clogging the + memory. + + If you override this method, you probably want to only + initialize the variables you're going to use in the events of + this namespace, say, with some default values, but not perform + any operation that assumes authentication/authorization. + """ + pass + + def recv_message(self, data): + """This is more of a backwards compatibility hack. This will be + called for messages sent with the original send() call on the client + side. This is NOT the 'message' event, which you will catch with + 'on_message()'. The data arriving here is a simple string, with no + other info. + + If you want to handle those messages, you should override this method. + """ + return data + + def recv_json(self, data): + """This is more of a backwards compatibility hack. This will be + called for JSON packets sent with the original json() call on the + JavaScript side. This is NOT the 'json' event, which you will catch + with 'on_json()'. The data arriving here is a python dict, with no + event name. + + If you want to handle those messages, you should override this method. + """ + return data + + def recv_disconnect(self): + """Override this function if you want to do something when you get a + *force disconnect* packet. + + By default, this function calls the :meth:`disconnect` clean-up + function. You probably want to call it yourself also, and put + your clean-up routines in :meth:`disconnect` rather than here, + because that :meth:`disconnect` function gets called + automatically upon disconnection. This function is a + pre-handle for when you get the `disconnect packet`. + """ + self.disconnect(silent=True) + + def recv_connect(self): + """Called the first time a client connection is open on a + Namespace. This *does not* fire on the global namespace. + + This allows you to do boilerplate stuff for + the namespace like connecting to rooms, broadcasting events + to others, doing authorization work, and tweaking the ACLs to open + up the rest of the namespace (if it was closed at the + beginning by having :meth:`get_initial_acl` return only + ['recv_connect']) + + Also see the different :ref:`mixins ` (like + `RoomsMixin`, `BroadcastMixin`). + """ + pass + + def recv_error(self, packet): + """Override this function to handle the errors we get from the client. + + :param packet: the full packet. + """ + pass + + def error(self, error_name, error_message, msg_id=None, quiet=False): + """Use this to use the configured ``error_handler`` yield an + error message to your application. + + :param error_name: is a short string, to associate messages to recovery + methods + :param error_message: is some human-readable text, describing the error + :param msg_id: is used to associate with a request + :param quiet: specific to error_handlers. The default doesn't send a + message to the user, but shows a debug message on the + developer console. + """ + self.socket.error(error_name, error_message, endpoint=self.ns_name, + msg_id=msg_id, quiet=quiet) + + def send(self, message, json=False, callback=None): + """Use send to send a simple string message. + + If ``json`` is True, the message will be encoded as a JSON object + on the wire, and decoded on the other side. + + This is mostly for backwards compatibility. ``emit()`` is more fun. + + :param callback: This is a callback function that will be + called automatically by the client upon + reception. It does not verify that the + listener over there was completed with + success. It just tells you that the browser + got a hold of the packet. + :type callback: callable + """ + pkt = dict(type="message", data=message, endpoint=self.ns_name) + if json: + pkt['type'] = "json" + + if callback: + # By passing ack=True, we use the old behavior of being returned + # an 'ack' packet, automatically triggered by the client-side + # with no user-code being run. The emit() version of the + # callback is more useful I think :) So migrate your code. + pkt['ack'] = True + pkt['id'] = msgid = self.socket._get_next_msgid() + self.socket._save_ack_callback(msgid, callback) + + self.socket.send_packet(pkt) + + def emit(self, event, *args, **kwargs): + """Use this to send a structured event, with a name and arguments, to + the client. + + By default, it uses this namespace's endpoint. You can send messages on + other endpoints with something like: + + ``self.socket['/other_endpoint'].emit()``. + + However, it is possible that the ``'/other_endpoint'`` was not + initialized yet, and that would yield a ``KeyError``. + + The only supported ``kwargs`` is ``callback``. All other parameters + must be passed positionally. + + :param event: The name of the event to trigger on the other end. + :param callback: Pass in the callback keyword argument to define a + call-back that will be called when the client acks. + + This callback is slightly different from the one from + ``send()``, as this callback will receive parameters + from the explicit call of the ``ack()`` function + passed to the listener on the client side. + + The remote listener will need to explicitly ack (by + calling its last argument, a function which is + usually called 'ack') with some parameters indicating + success or error. The 'ack' packet coming back here + will then trigger the callback function with the + returned values. + :type callback: callable + """ + callback = kwargs.pop('callback', None) + + if kwargs: + raise ValueError( + "emit() only supports positional argument, to stay " + "compatible with the Socket.IO protocol. You can " + "however pass in a dictionary as the first argument") + pkt = dict(type="event", name=event, args=args, + endpoint=self.ns_name) + + if callback: + # By passing 'data', we indicate that we *want* an explicit ack + # by the client code, not an automatic as with send(). + pkt['ack'] = 'data' + pkt['id'] = msgid = self.socket._get_next_msgid() + self.socket._save_ack_callback(msgid, callback) + + self.socket.send_packet(pkt) + + def spawn(self, fn, *args, **kwargs): + """Spawn a new process, attached to this Namespace. + + It will be monitored by the "watcher" process in the Socket. If the + socket disconnects, all these greenlets are going to be killed, after + calling BaseNamespace.disconnect() + """ + # self.log.debug("Spawning sub-Namespace Greenlet: %s" % fn.__name__) + new = gevent.spawn(fn, *args, **kwargs) + self.jobs.append(new) + return new + + def disconnect(self, silent=False): + """Send a 'disconnect' packet, so that the user knows it has been + disconnected (booted actually). This will trigger an onDisconnect() + call on the client side. + + Over here, we will kill all ``spawn``ed processes and remove the + namespace from the Socket object. + + :param silent: do not actually send the packet (if they asked for a + disconnect for example), but just kill all jobs spawned + by this Namespace, and remove it from the Socket. + """ + if not silent: + packet = {"type": "disconnect", + "endpoint": self.ns_name} + self.socket.send_packet(packet) + self.socket.remove_namespace(self.ns_name) + self.kill_local_jobs() + + def kill_local_jobs(self): + """Kills all the jobs spawned with BaseNamespace.spawn() on a namespace + object. + + This will be called automatically if the ``watcher`` process detects + that the Socket was closed. + """ + gevent.killall(self.jobs) + self.jobs = [] diff --git a/awx/lib/site-packages/socketio/packet.py b/awx/lib/site-packages/socketio/packet.py new file mode 100644 index 0000000000..35cd5c3eae --- /dev/null +++ b/awx/lib/site-packages/socketio/packet.py @@ -0,0 +1,204 @@ +try: + import simplejson as json + json_decimal_args = {"use_decimal": True} # pragma: no cover +except ImportError: + import json + import decimal + + class DecimalEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, decimal.Decimal): + return float(o) + return super(DecimalEncoder, self).default(o) + json_decimal_args = {"cls": DecimalEncoder} + + +MSG_TYPES = { + 'disconnect': 0, + 'connect': 1, + 'heartbeat': 2, + 'message': 3, + 'json': 4, + 'event': 5, + 'ack': 6, + 'error': 7, + 'noop': 8, + } + +MSG_VALUES = dict((v, k) for k, v in MSG_TYPES.iteritems()) + +ERROR_REASONS = { + 'transport not supported': 0, + 'client not handshaken': 1, + 'unauthorized': 2 + } + +REASONS_VALUES = dict((v, k) for k, v in ERROR_REASONS.iteritems()) + +ERROR_ADVICES = { + 'reconnect': 0, + } + +ADVICES_VALUES = dict((v, k) for k, v in ERROR_ADVICES.iteritems()) + +socketio_packet_attributes = ['type', 'name', 'data', 'endpoint', 'args', + 'ackId', 'reason', 'advice', 'qs', 'id'] + + +def encode(data): + """ + Encode an attribute dict into a byte string. + """ + payload = '' + msg = str(MSG_TYPES[data['type']]) + + if msg in ['0', '1']: + # '1::' [path] [query] + msg += '::' + data['endpoint'] + if 'qs' in data and data['qs'] != '': + msg += ':' + data['qs'] + + elif msg == '2': + # heartbeat + msg += '::' + + elif msg in ['3', '4', '5']: + # '3:' [id ('+')] ':' [endpoint] ':' [data] + # '4:' [id ('+')] ':' [endpoint] ':' [json] + # '5:' [id ('+')] ':' [endpoint] ':' [json encoded event] + # The message id is an incremental integer, required for ACKs. + # If the message id is followed by a +, the ACK is not handled by + # socket.io, but by the user instead. + if msg == '3': + payload = data['data'] + if msg == '4': + payload = json.dumps(data['data'], separators=(',', ':'), + **json_decimal_args) + if msg == '5': + d = {} + d['name'] = data['name'] + if 'args' in data and data['args'] != []: + d['args'] = data['args'] + payload = json.dumps(d, separators=(',', ':'), **json_decimal_args) + if 'id' in data: + msg += ':' + str(data['id']) + if data['ack'] == 'data': + msg += '+' + msg += ':' + else: + msg += '::' + if 'endpoint' not in data: + data['endpoint'] = '' + if payload != '': + msg += data['endpoint'] + ':' + payload + else: + msg += data['endpoint'] + + elif msg == '6': + # '6:::' [id] '+' [data] + msg += '::' + data.get('endpoint', '') + ':' + str(data['ackId']) + if 'args' in data and data['args'] != []: + msg += '+' + json.dumps(data['args'], separators=(',', ':'), + **json_decimal_args) + + elif msg == '7': + # '7::' [endpoint] ':' [reason] '+' [advice] + msg += ':::' + if 'reason' in data and data['reason'] != '': + msg += str(ERROR_REASONS[data['reason']]) + if 'advice' in data and data['advice'] != '': + msg += '+' + str(ERROR_ADVICES[data['advice']]) + msg += data['endpoint'] + + # NoOp, used to close a poll after the polling duration time + elif msg == '8': + msg += '::' + + return msg + + +def decode(rawstr): + """ + Decode a rawstr packet arriving from the socket into a dict. + """ + decoded_msg = {} + split_data = rawstr.split(":", 3) + msg_type = split_data[0] + msg_id = split_data[1] + endpoint = split_data[2] + + data = '' + + if msg_id != '': + if "+" in msg_id: + msg_id = msg_id.split('+')[0] + decoded_msg['id'] = int(msg_id) + decoded_msg['ack'] = 'data' + else: + decoded_msg['id'] = int(msg_id) + decoded_msg['ack'] = True + + # common to every message + msg_type_id = int(msg_type) + if msg_type_id in MSG_VALUES: + decoded_msg['type'] = MSG_VALUES[int(msg_type)] + else: + raise Exception("Unknown message type: %s" % msg_type) + + decoded_msg['endpoint'] = endpoint + + if len(split_data) > 3: + data = split_data[3] + + if msg_type == "0": # disconnect + pass + + elif msg_type == "1": # connect + decoded_msg['qs'] = data + + elif msg_type == "2": # heartbeat + pass + + elif msg_type == "3": # message + decoded_msg['data'] = data + + elif msg_type == "4": # json msg + decoded_msg['data'] = json.loads(data) + + elif msg_type == "5": # event + try: + data = json.loads(data) + except ValueError, e: + print("Invalid JSON event message", data) + decoded_msg['args'] = [] + else: + decoded_msg['name'] = data.pop('name') + if 'args' in data: + decoded_msg['args'] = data['args'] + else: + decoded_msg['args'] = [] + + elif msg_type == "6": # ack + if '+' in data: + ackId, data = data.split('+') + decoded_msg['ackId'] = int(ackId) + decoded_msg['args'] = json.loads(data) + else: + decoded_msg['ackId'] = int(data) + decoded_msg['args'] = [] + + elif msg_type == "7": # error + if '+' in data: + reason, advice = data.split('+') + decoded_msg['reason'] = REASONS_VALUES[int(reason)] + decoded_msg['advice'] = ADVICES_VALUES[int(advice)] + else: + decoded_msg['advice'] = '' + if data != '': + decoded_msg['reason'] = REASONS_VALUES[int(data)] + else: + decoded_msg['reason'] = '' + elif msg_type == "8": # noop + pass + + return decoded_msg diff --git a/awx/lib/site-packages/socketio/policyserver.py b/awx/lib/site-packages/socketio/policyserver.py new file mode 100644 index 0000000000..900db4b457 --- /dev/null +++ b/awx/lib/site-packages/socketio/policyserver.py @@ -0,0 +1,27 @@ +from gevent.server import StreamServer +import socket + +__all__ = ['FlashPolicyServer'] + + +class FlashPolicyServer(StreamServer): + policyrequest = "" + policy = """ +""" + + def __init__(self, listener=None, backlog=None): + if listener is None: + listener = ('0.0.0.0', 10843) + StreamServer.__init__(self, listener=listener, backlog=backlog) + + def handle(self, sock, address): + # send and read functions should not wait longer than three seconds + sock.settimeout(3) + try: + # try to receive at most 128 bytes (`POLICYREQUEST` is shorter) + input = sock.recv(128) + if input.startswith(FlashPolicyServer.policyrequest): + sock.sendall(FlashPolicyServer.policy) + except socket.timeout: + pass + sock.close() diff --git a/awx/lib/site-packages/socketio/server.py b/awx/lib/site-packages/socketio/server.py new file mode 100644 index 0000000000..100ec577f0 --- /dev/null +++ b/awx/lib/site-packages/socketio/server.py @@ -0,0 +1,83 @@ +import sys +import traceback + +from socket import error + +from gevent.pywsgi import WSGIServer + +from socketio.handler import SocketIOHandler +from socketio.policyserver import FlashPolicyServer +from socketio.virtsocket import Socket + +__all__ = ['SocketIOServer'] + + +class SocketIOServer(WSGIServer): + """A WSGI Server with a resource that acts like an SocketIO.""" + + def __init__(self, *args, **kwargs): + """ + This is just like the standard WSGIServer __init__, except with a + few additional ``kwargs``: + + :param resource: The URL which has to be identified as a socket.io request. Defaults to the /socket.io/ URL. + :param transports: Optional list of transports to allow. List of + strings, each string should be one of + handler.SocketIOHandler.handler_types. + :param policy_server: Boolean describing whether or not to use the + Flash policy server. Default True. + :param policy_listener : A tuple containing (host, port) for the + policy server. This is optional and used only if policy server + is set to true. The default value is 0.0.0.0:843 + """ + self.sockets = {} + if 'namespace' in kwargs: + print("DEPRECATION WARNING: use resource instead of namespace") + self.resource = kwargs.pop('namespace', 'socket.io') + else: + self.resource = kwargs.pop('resource', 'socket.io') + + self.transports = kwargs.pop('transports', None) + + if kwargs.pop('policy_server', True): + policylistener = kwargs.pop('policy_listener', (args[0][0], 10843)) + self.policy_server = FlashPolicyServer(policylistener) + else: + self.policy_server = None + + kwargs['handler_class'] = SocketIOHandler + super(SocketIOServer, self).__init__(*args, **kwargs) + + def start_accepting(self): + if self.policy_server is not None: + try: + self.policy_server.start() + except error, ex: + sys.stderr.write( + 'FAILED to start flash policy server: %s\n' % (ex, )) + except Exception: + traceback.print_exc() + sys.stderr.write('FAILED to start flash policy server.\n\n') + super(SocketIOServer, self).start_accepting() + + def kill(self): + if self.policy_server is not None: + self.policy_server.kill() + super(SocketIOServer, self).kill() + + def handle(self, socket, address): + handler = self.handler_class(socket, address, self) + handler.handle() + + def get_socket(self, sessid=''): + """Return an existing or new client Socket.""" + + socket = self.sockets.get(sessid) + + if socket is None: + socket = Socket(self) + self.sockets[socket.sessid] = socket + else: + socket.incr_hits() + + return socket diff --git a/awx/lib/site-packages/socketio/sgunicorn.py b/awx/lib/site-packages/socketio/sgunicorn.py new file mode 100644 index 0000000000..aaa01131d9 --- /dev/null +++ b/awx/lib/site-packages/socketio/sgunicorn.py @@ -0,0 +1,57 @@ +import os + +import gevent +from gevent.pool import Pool + +from gunicorn.workers.ggevent import GeventPyWSGIWorker +from socketio.server import SocketIOServer +from socketio.handler import SocketIOHandler + + +class GeventSocketIOBaseWorker(GeventPyWSGIWorker): + """ The base gunicorn worker class """ + def run(self): + self.socket.setblocking(1) + pool = Pool(self.worker_connections) + self.server_class.base_env['wsgi.multiprocess'] = \ + self.cfg.workers > 1 + server = self.server_class( + self.socket, application=self.wsgi, + spawn=pool, handler_class=self.wsgi_handler, + namespace=self.namespace, policy_server=self.policy_server) + server.start() + try: + while self.alive: + self.notify() + + if self.ppid != os.getppid(): + self.log.info("Parent changed, shutting down: %s", self) + break + + gevent.sleep(1.0) + + except KeyboardInterrupt: + pass + + # try to stop the connections + try: + self.notify() + server.stop(timeout=self.timeout) + except: + pass + + +class GeventSocketIOWorker(GeventSocketIOBaseWorker): + """ + Default gunicorn worker utilizing gevent + + Uses the namespace 'socket.io' and defaults to the flash policy server + being disabled. + """ + server_class = SocketIOServer + wsgi_handler = SocketIOHandler + # We need to define a namespace for the server, it would be nice if this + # was a configuration option, will probably end up how this implemented, + # for now this is just a proof of concept to make sure this will work + namespace = 'socket.io' + policy_server = False # Don't run the flash policy server diff --git a/awx/lib/site-packages/socketio/transports.py b/awx/lib/site-packages/socketio/transports.py new file mode 100644 index 0000000000..6564b5abd8 --- /dev/null +++ b/awx/lib/site-packages/socketio/transports.py @@ -0,0 +1,270 @@ +import gevent +import urllib + +from gevent.queue import Empty + + +class BaseTransport(object): + """Base class for all transports. Mostly wraps handler class functions.""" + + def __init__(self, handler): + self.content_type = ("Content-Type", "text/plain; charset=UTF-8") + self.headers = [ + ("Access-Control-Allow-Origin", "*"), + ("Access-Control-Allow-Credentials", "true"), + ("Access-Control-Allow-Methods", "POST, GET, OPTIONS"), + ("Access-Control-Max-Age", 3600), + ] + self.headers_list = [] + self.handler = handler + + def write(self, data=""): + if 'Content-Length' not in self.handler.response_headers_list: + self.handler.response_headers.append(('Content-Length', len(data))) + self.handler.response_headers_list.append('Content-Length') + + self.handler.write(data) + + def start_response(self, status, headers, **kwargs): + if "Content-Type" not in [x[0] for x in headers]: + headers.append(self.content_type) + + headers.extend(self.headers) + #print headers + self.handler.start_response(status, headers, **kwargs) + + +class XHRPollingTransport(BaseTransport): + def __init__(self, *args, **kwargs): + super(XHRPollingTransport, self).__init__(*args, **kwargs) + + def options(self): + self.start_response("200 OK", ()) + self.write() + return [] + + def get(self, socket): + socket.heartbeat() + + payload = self.get_messages_payload(socket, timeout=5.0) + if not payload: + payload = "8::" # NOOP + + self.start_response("200 OK", []) + self.write(payload) + + return [] + + def _request_body(self): + return self.handler.wsgi_input.readline() + + def post(self, socket): + for message in self.decode_payload(self._request_body()): + socket.put_server_msg(message) + + self.start_response("200 OK", [ + ("Connection", "close"), + ("Content-Type", "text/plain") + ]) + self.write("1") + + return [] + + def get_messages_payload(self, socket, timeout=None): + """This will fetch the messages from the Socket's queue, and if + there are many messes, pack multiple messages in one payload and return + """ + try: + msgs = socket.get_multiple_client_msgs(timeout=timeout) + data = self.encode_payload(msgs) + except Empty: + data = "" + return data + + def encode_payload(self, messages): + """Encode list of messages. Expects messages to be unicode. + + ``messages`` - List of raw messages to encode, if necessary + + """ + if not messages: + return '' + + if len(messages) == 1: + return messages[0].encode('utf-8') + + payload = u''.join(u'\ufffd%d\ufffd%s' % (len(p), p) + for p in messages) + + return payload.encode('utf-8') + + def decode_payload(self, payload): + """This function can extract multiple messages from one HTTP payload. + Some times, the XHR/JSONP/.. transports can pack more than one message + on a single packet. They are encoding following the WebSocket + semantics, which need to be reproduced here to unwrap the messages. + + The semantics are: + + \ufffd + [length as a string] + \ufffd + [payload as a unicode string] + + This function returns a list of messages, even though there is only + one. + + Inspired by socket.io/lib/transports/http.js + """ + payload = payload.decode('utf-8') + if payload[0] == u"\ufffd": + #print "MULTIMSG FULL", payload + ret = [] + while len(payload) != 0: + len_end = payload.find(u"\ufffd", 1) + length = int(payload[1:len_end]) + msg_start = len_end + 1 + msg_end = length + msg_start + message = payload[msg_start:msg_end] + #print "MULTIMSG", length, message + ret.append(message) + payload = payload[msg_end:] + return ret + return [payload] + + def connect(self, socket, request_method): + if not socket.connection_confirmed: + socket.connection_confirmed = True + self.start_response("200 OK", [ + ("Connection", "close"), + ]) + self.write("1::") # 'connect' packet + + return [] + elif request_method in ("GET", "POST", "OPTIONS"): + return getattr(self, request_method.lower())(socket) + else: + raise Exception("No support for the method: " + request_method) + + +class JSONPolling(XHRPollingTransport): + def __init__(self, handler): + super(JSONPolling, self).__init__(handler) + self.content_type = ("Content-Type", "text/javascript; charset=UTF-8") + + def _request_body(self): + data = super(JSONPolling, self)._request_body() + # resolve %20%3F's, take out wrapping d="...", etc.. + return urllib.unquote_plus(data)[3:-1] \ + .replace(r'\"', '"') \ + .replace(r"\\", "\\") + + def write(self, data): + """Just quote out stuff before sending it out""" + # TODO: don't we need to quote this data in here ? + super(JSONPolling, self).write("io.j[0]('%s');" % data) + + +class XHRMultipartTransport(XHRPollingTransport): + def __init__(self, handler): + super(JSONPolling, self).__init__(handler) + self.content_type = ( + "Content-Type", + "multipart/x-mixed-replace;boundary=\"socketio\"" + ) + + def connect(self, socket, request_method): + if request_method == "GET": + # TODO: double verify this, because we're not sure. look at git revs. + heartbeat = socket._spawn_heartbeat() + return [heartbeat] + self.get(socket) + elif request_method == "POST": + return self.post(socket) + else: + raise Exception("No support for such method: " + request_method) + + def get(self, socket): + header = "Content-Type: text/plain; charset=UTF-8\r\n\r\n" + + self.start_response("200 OK", [("Connection", "keep-alive")]) + self.write_multipart("--socketio\r\n") + self.write_multipart(header) + self.write_multipart(str(socket.sessid) + "\r\n") + self.write_multipart("--socketio\r\n") + + def chunk(): + while True: + payload = self.get_messages_payload(socket) + + if not payload: + # That would mean the call to Queue.get() returned Empty, + # so it was in fact killed, since we pass no timeout=.. + socket.kill() + break + else: + try: + self.write_multipart(header) + self.write_multipart(payload) + self.write_multipart("--socketio\r\n") + except socket.error: + socket.kill() + break + + return [gevent.spawn(chunk)] + + +class WebsocketTransport(BaseTransport): + def connect(self, socket, request_method): + websocket = self.handler.environ['wsgi.websocket'] + websocket.send("1::") # 'connect' packet + + def send_into_ws(): + while True: + message = socket.get_client_msg() + + if message is None: + socket.kill() + break + + websocket.send(message) + + def read_from_ws(): + while True: + message = websocket.receive() + + if not message: + socket.kill() + break + else: + if message is not None: + socket.put_server_msg(message) + + gr1 = gevent.spawn(send_into_ws) + gr2 = gevent.spawn(read_from_ws) + heartbeat1, heartbeat2 = socket._spawn_heartbeat() + + return [gr1, gr2, heartbeat1, heartbeat2] + + +class FlashSocketTransport(WebsocketTransport): + pass + + +class HTMLFileTransport(XHRPollingTransport): + """Not tested at all!""" + + def __init__(self, handler): + super(HTMLFileTransport, self).__init__(handler) + self.content_type = ("Content-Type", "text/html") + + def write_packed(self, data): + self.write("" % data) + + def handle_get_response(self, socket): + self.start_response("200 OK", [ + ("Connection", "keep-alive"), + ("Content-Type", "text/html"), + ("Transfer-Encoding", "chunked"), + ]) + self.write("" + " " * 244) + + payload = self.get_messages_payload(socket, timeout=5.0) + + self.write_packed(payload) diff --git a/awx/lib/site-packages/socketio/virtsocket.py b/awx/lib/site-packages/socketio/virtsocket.py new file mode 100644 index 0000000000..8218313f25 --- /dev/null +++ b/awx/lib/site-packages/socketio/virtsocket.py @@ -0,0 +1,428 @@ +"""Virtual Socket implementation, unifies all the Transports into one +single interface, and abstracts the work of the long-polling methods. + +This module also has the ``default_error_handler`` implementation. +You can define your own so that the error messages are logged or sent +in a different way + +:copyright: 2012, Alexandre Bourget +:moduleauthor: Alexandre Bourget + +""" +import random +import weakref + +import gevent +from gevent.queue import Queue +from gevent.event import Event + +from socketio import packet + + +def default_error_handler(socket, error_name, error_message, endpoint, + msg_id, quiet): + """This is the default error handler, you can override this when + calling :func:`socketio.socketio_manage`. + + It basically sends an event through the socket with the 'error' name. + + See documentation for :meth:`Socket.error`. + + :param quiet: if quiet, this handler will not send a packet to the + user, but only log for the server developer. + """ + pkt = dict(type='event', name='error', + args=[error_name, error_message], + endpoint=endpoint) + if msg_id: + pkt['id'] = msg_id + + # Send an error event through the Socket + if not quiet: + socket.send_packet(pkt) + + # Log that error somewhere for debugging... + print "default_error_handler: %s, %s (endpoint=%s, msg_id=%s)" % ( + error_name, error_message, endpoint, msg_id) + + +class Socket(object): + """ + Virtual Socket implementation, checks heartbeats, writes to local queues + for message passing, holds the Namespace objects, dispatches de packets + to the underlying namespaces. + + This is the abstraction on top of the different transports. It's like + if you used a WebSocket only... + """ + + STATE_CONNECTING = "CONNECTING" + STATE_CONNECTED = "CONNECTED" + STATE_DISCONNECTING = "DISCONNECTING" + STATE_DISCONNECTED = "DISCONNECTED" + + GLOBAL_NS = '' + """Use this to be explicit when specifying a Global Namespace (an endpoint + with no name, not '/chat' or anything.""" + + def __init__(self, server, error_handler=None): + self.server = weakref.proxy(server) + self.sessid = str(random.random())[2:] + self.session = {} # the session dict, for general developer usage + self.client_queue = Queue() # queue for messages to client + self.server_queue = Queue() # queue for messages to server + self.hits = 0 + self.heartbeats = 0 + self.timeout = Event() + self.wsgi_app_greenlet = None + self.state = "NEW" + self.connection_confirmed = False + self.ack_callbacks = {} + self.ack_counter = 0 + self.request = None + self.environ = None + self.namespaces = {} + self.active_ns = {} # Namespace sessions that were instantiated + self.jobs = [] + self.error_handler = default_error_handler + if error_handler is not None: + self.error_handler = error_handler + + def _set_namespaces(self, namespaces): + """This is a mapping (dict) of the different '/namespaces' to their + BaseNamespace object derivative. + + This is called by socketio_manage().""" + self.namespaces = namespaces + + def _set_request(self, request): + """Saves the request object for future use by the different Namespaces. + + This is called by socketio_manage(). + """ + self.request = request + + def _set_environ(self, environ): + """Save the WSGI environ, for future use. + + This is called by socketio_manage(). + """ + self.environ = environ + + def _set_error_handler(self, error_handler): + """Changes the default error_handler function to the one specified + + This is called by socketio_manage(). + """ + self.error_handler = error_handler + + def _get_next_msgid(self): + """This retrieves the next value for the 'id' field when sending + an 'event' or 'message' or 'json' that asks the remote client + to 'ack' back, so that we trigger the local callback. + """ + self.ack_counter += 1 + return self.ack_counter + + def _save_ack_callback(self, msgid, callback): + """Keep a reference of the callback on this socket.""" + if msgid in self.ack_callbacks: + return False + self.ack_callbacks[msgid] = callback + + def _pop_ack_callback(self, msgid): + """Fetch the callback for a given msgid, if it exists, otherwise, + return None""" + if msgid not in self.ack_callbacks: + return None + return self.ack_callbacks.pop(msgid) + + def __str__(self): + result = ['sessid=%r' % self.sessid] + if self.state == self.STATE_CONNECTED: + result.append('connected') + if self.client_queue.qsize(): + result.append('client_queue[%s]' % self.client_queue.qsize()) + if self.server_queue.qsize(): + result.append('server_queue[%s]' % self.server_queue.qsize()) + if self.hits: + result.append('hits=%s' % self.hits) + if self.heartbeats: + result.append('heartbeats=%s' % self.heartbeats) + + return ' '.join(result) + + def __getitem__(self, key): + """This will get the nested Namespace using its '/chat' reference. + + Using this, you can go from one Namespace to the other (to emit, add + ACLs, etc..) with: + + adminnamespace.socket['/chat'].add_acl_method('kick-ban') + + """ + return self.active_ns[key] + + def __hasitem__(self, key): + """Verifies if the namespace is active (was initialized)""" + return key in self.active_ns + + @property + def connected(self): + """Returns whether the state is CONNECTED or not.""" + return self.state == self.STATE_CONNECTED + + def incr_hits(self): + self.hits += 1 + + if self.hits == 1: + self.state = self.STATE_CONNECTED + + def heartbeat(self): + """This makes the heart beat for another X seconds. Call this when + you get a heartbeat packet in. + + This clear the heartbeat disconnect timeout (resets for X seconds). + """ + self.timeout.set() + + def kill(self): + """This function must/will be called when a socket is to be completely + shut down, closed by connection timeout, connection error or explicit + disconnection from the client. + + It will call all of the Namespace's + :meth:`~socketio.namespace.BaseNamespace.disconnect` methods + so that you can shut-down things properly. + + """ + # Clear out the callbacks + self.ack_callbacks = {} + if self.connected: + self.state = self.STATE_DISCONNECTING + self.server_queue.put_nowait(None) + self.client_queue.put_nowait(None) + self.disconnect() + + if self.sessid in self.server.sockets: + self.server.sockets.pop(self.sessid) + + # gevent.kill(self.wsgi_app_greenlet) + else: + pass # Fail silently + + def put_server_msg(self, msg): + """Writes to the server's pipe, to end up in in the Namespaces""" + self.heartbeat() + self.server_queue.put_nowait(msg) + + def put_client_msg(self, msg): + """Writes to the client's pipe, to end up in the browser""" + self.heartbeat() + self.client_queue.put_nowait(msg) + + def get_client_msg(self, **kwargs): + """Grab a message to send it to the browser""" + return self.client_queue.get(**kwargs) + + def get_server_msg(self, **kwargs): + """Grab a message, to process it by the server and dispatch calls + """ + return self.server_queue.get(**kwargs) + + def get_multiple_client_msgs(self, **kwargs): + """Get multiple messages, in case we're going through the various + XHR-polling methods, on which we can pack more than one message if the + rate is high, and encode the payload for the HTTP channel.""" + client_queue = self.client_queue + msgs = [client_queue.get(**kwargs)] + while client_queue.qsize(): + msgs.append(client_queue.get()) + return msgs + + def error(self, error_name, error_message, endpoint=None, msg_id=None, + quiet=False): + """Send an error to the user, using the custom or default + ErrorHandler configured on the [TODO: Revise this] Socket/Handler + object. + + :param error_name: is a simple string, for easy association on + the client side + + :param error_message: is a human readable message, the user + will eventually see + + :param endpoint: set this if you have a message specific to an + end point + + :param msg_id: set this if your error is relative to a + specific message + + :param quiet: way to make the error handler quiet. Specific to + the handler. The default handler will only log, + with quiet. + """ + handler = self.error_handler + return handler( + self, error_name, error_message, endpoint, msg_id, quiet) + + # User facing low-level function + def disconnect(self, silent=False): + """Calling this method will call the + :meth:`~socketio.namespace.BaseNamespace.disconnect` method on + all the active Namespaces that were open, killing all their + jobs and sending 'disconnect' packets for each of them. + + Normally, the Global namespace (endpoint = '') has special meaning, + as it represents the whole connection, + + :param silent: when True, pass on the ``silent`` flag to the Namespace + :meth:`~socketio.namespace.BaseNamespace.disconnect` + calls. + """ + for ns_name, ns in list(self.active_ns.iteritems()): + ns.recv_disconnect() + + def remove_namespace(self, namespace): + """This removes a Namespace object from the socket. + + This is usually called by + :meth:`~socketio.namespace.BaseNamespace.disconnect`. + + """ + if namespace in self.active_ns: + del self.active_ns[namespace] + + def send_packet(self, pkt): + """Low-level interface to queue a packet on the wire (encoded as wire + protocol""" + self.put_client_msg(packet.encode(pkt)) + + def spawn(self, fn, *args, **kwargs): + """Spawn a new Greenlet, attached to this Socket instance. + + It will be monitored by the "watcher" method + """ + + self.debug("Spawning sub-Socket Greenlet: %s" % fn.__name__) + job = gevent.spawn(fn, *args, **kwargs) + self.jobs.append(job) + return job + + def _receiver_loop(self): + """This is the loop that takes messages from the queue for the server + to consume, decodes them and dispatches them. + """ + + while True: + rawdata = self.get_server_msg() + + if not rawdata: + continue # or close the connection ? + try: + pkt = packet.decode(rawdata) + except (ValueError, KeyError, Exception), e: + self.error('invalid_packet', + "There was a decoding error when dealing with packet " + "with event: %s... (%s)" % (rawdata[:20], e)) + continue + + if pkt['type'] == 'heartbeat': + # This is already dealth with in put_server_msg() when + # any incoming raw data arrives. + continue + + if pkt['type'] == 'disconnect' and pkt['endpoint'] == '': + # On global namespace, we kill everything. + self.kill() + continue + + endpoint = pkt['endpoint'] + + if endpoint not in self.namespaces: + self.error("no_such_namespace", + "The endpoint you tried to connect to " + "doesn't exist: %s" % endpoint, endpoint=endpoint) + continue + elif endpoint in self.active_ns: + pkt_ns = self.active_ns[endpoint] + else: + new_ns_class = self.namespaces[endpoint] + pkt_ns = new_ns_class(self.environ, endpoint, + request=self.request) + pkt_ns.initialize() # use this instead of __init__, + # for less confusion + + self.active_ns[endpoint] = pkt_ns + + retval = pkt_ns.process_packet(pkt) + + # Has the client requested an 'ack' with the reply parameters ? + if pkt.get('ack') == "data" and pkt.get('id'): + returning_ack = dict(type='ack', ackId=pkt['id'], + args=retval, + endpoint=pkt.get('endpoint', '')) + self.send_packet(returning_ack) + + # Now, are we still connected ? + if not self.connected: + self.kill() # ?? what,s the best clean-up when its not a + # user-initiated disconnect + return + + def _spawn_receiver_loop(self): + """Spawns the reader loop. This is called internall by + socketio_manage(). + """ + job = gevent.spawn(self._receiver_loop) + self.jobs.append(job) + return job + + def _watcher(self): + """Watch if any of the greenlets for a request have died. If so, kill + the request and the socket. + """ + # TODO: add that if any of the request.jobs die, kill them all and exit + gevent.sleep(5.0) + + while True: + gevent.sleep(1.0) + + if not self.connected: + # Killing Socket-level jobs + gevent.killall(self.jobs) + for ns_name, ns in list(self.active_ns.iteritems()): + ns.recv_disconnect() + break + + def _spawn_watcher(self): + job = gevent.spawn(self._watcher) + return job + + def _heartbeat(self): + """Start the heartbeat Greenlet to check connection health.""" + self.state = self.STATE_CONNECTED + + while self.connected: + gevent.sleep(5.0) # FIXME: make this a setting + # TODO: this process could use a timeout object like the disconnect + # timeout thing, and ONLY send packets when none are sent! + # We would do that by calling timeout.set() for a "sending" + # timeout. If we're sending 100 messages a second, there is + # no need to push some heartbeats in there also. + self.put_client_msg("2::") # TODO: make it a heartbeat packet + + def _disconnect_timeout(self): + self.timeout.clear() + + if self.timeout.wait(10.0): + gevent.spawn(self._disconnect_timeout) + else: + self.kill() + + def _spawn_heartbeat(self): + """This functions returns a list of jobs""" + job_sender = gevent.spawn(self._heartbeat) + job_waiter = gevent.spawn(self._disconnect_timeout) + self.jobs.extend((job_sender, job_waiter)) + return job_sender, job_waiter