Vendor gevent's socketio and websocket modules

This commit is contained in:
Matthew Jones 2014-04-15 11:06:39 -04:00
parent 2546cbdbb6
commit 507d2e158d
24 changed files with 3278 additions and 0 deletions

View File

@ -0,0 +1,21 @@
VERSION = (0, 9, 3, 'final', 0)
__all__ = [
'WebSocketApplication',
'Resource',
'WebSocketServer',
'WebSocketError',
'get_version'
]
def get_version(*args, **kwargs):
from .utils import get_version
return get_version(*args, **kwargs)
try:
from .resource import WebSocketApplication, Resource
from .server import WebSocketServer
from .exceptions import WebSocketError
except ImportError:
pass

View File

@ -0,0 +1,19 @@
from socket import error as socket_error
class WebSocketError(socket_error):
"""
Base class for all websocket errors.
"""
class ProtocolError(WebSocketError):
"""
Raised if an error occurs when de/encoding the websocket protocol.
"""
class FrameTooLargeException(ProtocolError):
"""
Raised if a frame is received that is too large.
"""

View File

@ -0,0 +1,6 @@
from geventwebsocket.handler import WebSocketHandler
from gunicorn.workers.ggevent import GeventPyWSGIWorker
class GeventWebSocketWorker(GeventPyWSGIWorker):
wsgi_handler = WebSocketHandler

View File

@ -0,0 +1,278 @@
import base64
import hashlib
import warnings
from gevent.pywsgi import WSGIHandler
from .websocket import WebSocket, Stream
from .logging import create_logger
class Client(object):
def __init__(self, address, ws):
self.address = address
self.ws = ws
class WebSocketHandler(WSGIHandler):
"""
Automatically upgrades the connection to a websocket.
To prevent the WebSocketHandler to call the underlying WSGI application,
but only setup the WebSocket negotiations, do:
mywebsockethandler.prevent_wsgi_call = True
before calling run_application(). This is useful if you want to do more
things before calling the app, and want to off-load the WebSocket
negotiations to this library. Socket.IO needs this for example, to send
the 'ack' before yielding the control to your WSGI app.
"""
SUPPORTED_VERSIONS = ('13', '8', '7')
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
def run_websocket(self):
"""
Called when a websocket has been created successfully.
"""
if getattr(self, 'prevent_wsgi_call', False):
return
# In case WebSocketServer is not used
if not hasattr(self.server, 'clients'):
self.server.clients = {}
# Since we're now a websocket connection, we don't care what the
# application actually responds with for the http response
try:
self.server.clients[self.client_address] = Client(
self.client_address, self.websocket)
self.application(self.environ, lambda s, h: [])
finally:
del self.server.clients[self.client_address]
if not self.websocket.closed:
self.websocket.close()
self.environ.update({
'wsgi.websocket': None
})
self.websocket = None
def run_application(self):
if (hasattr(self.server, 'pre_start_hook')
and self.server.pre_start_hook):
self.logger.debug("Calling pre-start hook")
if self.server.pre_start_hook(self):
return super(WebSocketHandler, self).run_application()
self.logger.debug("Initializing WebSocket")
self.result = self.upgrade_websocket()
if hasattr(self, 'websocket'):
if self.status and not self.headers_sent:
self.write('')
self.run_websocket()
else:
if self.status:
# A status was set, likely an error so just send the response
if not self.result:
self.result = []
self.process_result()
return
# This handler did not handle the request, so defer it to the
# underlying application object
return super(WebSocketHandler, self).run_application()
def upgrade_websocket(self):
"""
Attempt to upgrade the current environ into a websocket enabled
connection. If successful, the environ dict with be updated with two
new entries, `wsgi.websocket` and `wsgi.websocket_version`.
:returns: Whether the upgrade was successful.
"""
# Some basic sanity checks first
self.logger.debug("Validating WebSocket request")
if self.environ.get('REQUEST_METHOD', '') != 'GET':
# This is not a websocket request, so we must not handle it
self.logger.debug('Can only upgrade connection if using GET method.')
return
upgrade = self.environ.get('HTTP_UPGRADE', '').lower()
if upgrade == 'websocket':
connection = self.environ.get('HTTP_CONNECTION', '').lower()
if 'upgrade' not in connection:
# This is not a websocket request, so we must not handle it
self.logger.warning("Client didn't ask for a connection "
"upgrade")
return
else:
# This is not a websocket request, so we must not handle it
return
if self.request_version != 'HTTP/1.1':
self.start_response('402 Bad Request', [])
self.logger.warning("Bad server protocol in headers")
return ['Bad protocol version']
if self.environ.get('HTTP_SEC_WEBSOCKET_VERSION'):
return self.upgrade_connection()
else:
self.logger.warning("No protocol defined")
self.start_response('426 Upgrade Required', [
('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS))])
return ['No Websocket protocol version defined']
def upgrade_connection(self):
"""
Validate and 'upgrade' the HTTP request to a WebSocket request.
If an upgrade succeeded then then handler will have `start_response`
with a status of `101`, the environ will also be updated with
`wsgi.websocket` and `wsgi.websocket_version` keys.
:param environ: The WSGI environ dict.
:param start_response: The callable used to start the response.
:param stream: File like object that will be read from/written to by
the underlying WebSocket object, if created.
:return: The WSGI response iterator is something went awry.
"""
self.logger.debug("Attempting to upgrade connection")
version = self.environ.get("HTTP_SEC_WEBSOCKET_VERSION")
if version not in self.SUPPORTED_VERSIONS:
msg = "Unsupported WebSocket Version: {0}".format(version)
self.logger.warning(msg)
self.start_response('400 Bad Request', [
('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS))
])
return [msg]
key = self.environ.get("HTTP_SEC_WEBSOCKET_KEY", '').strip()
if not key:
# 5.2.1 (3)
msg = "Sec-WebSocket-Key header is missing/empty"
self.logger.warning(msg)
self.start_response('400 Bad Request', [])
return [msg]
try:
key_len = len(base64.b64decode(key))
except TypeError:
msg = "Invalid key: {0}".format(key)
self.logger.warning(msg)
self.start_response('400 Bad Request', [])
return [msg]
if key_len != 16:
# 5.2.1 (3)
msg = "Invalid key: {0}".format(key)
self.logger.warning(msg)
self.start_response('400 Bad Request', [])
return [msg]
# Check for WebSocket Protocols
requested_protocols = self.environ.get(
'HTTP_SEC_WEBSOCKET_PROTOCOL', '')
protocol = None
if hasattr(self.application, 'app_protocol'):
allowed_protocol = self.application.app_protocol(
self.environ['PATH_INFO'])
if allowed_protocol and allowed_protocol in requested_protocols:
protocol = allowed_protocol
self.logger.debug("Protocol allowed: {0}".format(protocol))
self.websocket = WebSocket(self.environ, Stream(self), self)
self.environ.update({
'wsgi.websocket_version': version,
'wsgi.websocket': self.websocket
})
headers = [
("Upgrade", "websocket"),
("Connection", "Upgrade"),
("Sec-WebSocket-Accept", base64.b64encode(
hashlib.sha1(key + self.GUID).digest())),
]
if protocol:
headers.append(("Sec-WebSocket-Protocol", protocol))
self.logger.debug("WebSocket request accepted, switching protocols")
self.start_response("101 Switching Protocols", headers)
@property
def logger(self):
if not hasattr(self.server, 'logger'):
self.server.logger = create_logger(__name__)
return self.server.logger
def log_request(self):
if '101' not in self.status:
self.logger.info(self.format_request())
@property
def active_client(self):
return self.server.clients[self.client_address]
def start_response(self, status, headers, exc_info=None):
"""
Called when the handler is ready to send a response back to the remote
endpoint. A websocket connection may have not been created.
"""
writer = super(WebSocketHandler, self).start_response(
status, headers, exc_info=exc_info)
self._prepare_response()
return writer
def _prepare_response(self):
"""
Sets up the ``pywsgi.Handler`` to work with a websocket response.
This is used by other projects that need to support WebSocket
connections as part of a larger effort.
"""
assert not self.headers_sent
if not self.environ.get('wsgi.websocket'):
# a WebSocket connection is not established, do nothing
return
# So that `finalize_headers` doesn't write a Content-Length header
self.provided_content_length = False
# The websocket is now controlling the response
self.response_use_chunked = False
# Once the request is over, the connection must be closed
self.close_connection = True
# Prevents the Date header from being written
self.provided_date = True

View File

@ -0,0 +1,31 @@
from __future__ import absolute_import
from logging import getLogger, StreamHandler, getLoggerClass, Formatter, DEBUG
def create_logger(name, debug=False, format=None):
Logger = getLoggerClass()
class DebugLogger(Logger):
def getEffectiveLevel(x):
if x.level == 0 and debug:
return DEBUG
else:
return Logger.getEffectiveLevel(x)
class DebugHandler(StreamHandler):
def emit(x, record):
StreamHandler.emit(x, record) if debug else None
handler = DebugHandler()
handler.setLevel(DEBUG)
if format:
handler.setFormatter(Formatter(format))
logger = getLogger(name)
del logger.handlers[:]
logger.__class__ = DebugLogger
logger.addHandler(handler)
return logger

View File

@ -0,0 +1,35 @@
class BaseProtocol(object):
PROTOCOL_NAME = ''
def __init__(self, app):
self._app = app
def on_open(self):
self.app.on_open()
def on_message(self, message):
self.app.on_message(message)
def on_close(self, reason=None):
self.app.on_close(reason)
@property
def app(self):
if self._app:
return self._app
else:
raise Exception("No application coupled")
@property
def server(self):
if not hasattr(self.app, 'ws'):
return None
return self.app.ws.handler.server
@property
def handler(self):
if not hasattr(self.app, 'ws'):
return None
return self.app.ws.handler

View File

@ -0,0 +1,229 @@
import inspect
import random
import string
import types
try:
import ujson as json
except ImportError:
try:
import simplejson as json
except ImportError:
import json
from ..exceptions import WebSocketError
from .base import BaseProtocol
def export_rpc(arg=None):
if isinstance(arg, types.FunctionType):
arg._rpc = arg.__name__
return arg
def serialize(data):
return json.dumps(data)
class Prefixes(object):
def __init__(self):
self.prefixes = {}
def add(self, prefix, uri):
self.prefixes[prefix] = uri
def resolve(self, curie_or_uri):
if "http://" in curie_or_uri:
return curie_or_uri
elif ':' in curie_or_uri:
prefix, proc = curie_or_uri.split(':', 1)
return self.prefixes[prefix] + proc
else:
raise Exception(curie_or_uri)
class RemoteProcedures(object):
def __init__(self):
self.calls = {}
def register_procedure(self, uri, proc):
self.calls[uri] = proc
def register_object(self, uri, obj):
for k in inspect.getmembers(obj, inspect.ismethod):
if '_rpc' in k[1].__dict__:
proc_uri = uri + k[1]._rpc
self.calls[proc_uri] = (obj, k[1])
def call(self, uri, args):
if uri in self.calls:
proc = self.calls[uri]
# Do the correct call whether it's a function or instance method.
if isinstance(proc, tuple):
if proc[1].__self__ is None:
# Create instance of object and call method
return proc[1](proc[0](), *args)
else:
# Call bound method on instance
return proc[1](*args)
else:
return self.calls[uri](*args)
else:
raise Exception("no such uri '{}'".format(uri))
class Channels(object):
def __init__(self):
self.channels = {}
def create(self, uri, prefix_matching=False):
if uri not in self.channels:
self.channels[uri] = []
# TODO: implement prefix matching
def subscribe(self, uri, client):
if uri in self.channels:
self.channels[uri].append(client)
def unsubscribe(self, uri, client):
if uri not in self.channels:
return
client_index = self.channels[uri].index(client)
self.channels[uri].pop(client_index)
if len(self.channels[uri]) == 0:
del self.channels[uri]
def publish(self, uri, event, exclude=None, eligible=None):
if uri not in self.channels:
return
# TODO: exclude & eligible
msg = [WampProtocol.MSG_EVENT, uri, event]
for client in self.channels[uri]:
try:
client.ws.send(serialize(msg))
except WebSocketError:
# Seems someone didn't unsubscribe before disconnecting
self.channels[uri].remove(client)
class WampProtocol(BaseProtocol):
MSG_WELCOME = 0
MSG_PREFIX = 1
MSG_CALL = 2
MSG_CALL_RESULT = 3
MSG_CALL_ERROR = 4
MSG_SUBSCRIBE = 5
MSG_UNSUBSCRIBE = 6
MSG_PUBLISH = 7
MSG_EVENT = 8
PROTOCOL_NAME = "wamp"
def __init__(self, *args, **kwargs):
self.procedures = RemoteProcedures()
self.prefixes = Prefixes()
self.session_id = ''.join(
[random.choice(string.digits + string.letters)
for i in xrange(16)])
super(WampProtocol, self).__init__(*args, **kwargs)
def register_procedure(self, *args, **kwargs):
self.procedures.register_procedure(*args, **kwargs)
def register_object(self, *args, **kwargs):
self.procedures.register_object(*args, **kwargs)
def register_pubsub(self, *args, **kwargs):
if not hasattr(self.server, 'channels'):
self.server.channels = Channels()
self.server.channels.create(*args, **kwargs)
def do_handshake(self):
from geventwebsocket import get_version
welcome = [
self.MSG_WELCOME,
self.session_id,
1,
'gevent-websocket/' + get_version()
]
self.app.ws.send(serialize(welcome))
def rpc_call(self, data):
call_id, curie_or_uri = data[1:3]
args = data[3:]
if not isinstance(call_id, (str, unicode)):
raise Exception()
if not isinstance(curie_or_uri, (str, unicode)):
raise Exception()
uri = self.prefixes.resolve(curie_or_uri)
try:
result = self.procedures.call(uri, args)
result_msg = [self.MSG_CALL_RESULT, call_id, result]
except Exception, e:
result_msg = [self.MSG_CALL_ERROR,
call_id, 'http://TODO#generic',
str(type(e)), str(e)]
self.app.on_message(serialize(result_msg))
def pubsub_action(self, data):
action = data[0]
curie_or_uri = data[1]
if not isinstance(action, int):
raise Exception()
if not isinstance(curie_or_uri, (str, unicode)):
raise Exception()
uri = self.prefixes.resolve(curie_or_uri)
if action == self.MSG_SUBSCRIBE and len(data) == 2:
self.server.channels.subscribe(data[1], self.handler.active_client)
elif action == self.MSG_UNSUBSCRIBE and len(data) == 2:
self.server.channels.unsubscribe(
data[1], self.handler.active_client)
elif action == self.MSG_PUBLISH and len(data) >= 3:
payload = data[2] if len(data) >= 3 else None
exclude = data[3] if len(data) >= 4 else None
eligible = data[4] if len(data) >= 5 else None
self.server.channels.publish(uri, payload, exclude, eligible)
def on_open(self):
self.app.on_open()
self.do_handshake()
def on_message(self, message):
data = json.loads(message)
if not isinstance(data, list):
raise Exception('incoming data is no list')
if data[0] == self.MSG_PREFIX and len(data) == 3:
prefix, uri = data[1:3]
self.prefixes.add(prefix, uri)
elif data[0] == self.MSG_CALL and len(data) >= 3:
return self.rpc_call(data)
elif data[0] in (self.MSG_SUBSCRIBE, self.MSG_UNSUBSCRIBE,
self.MSG_PUBLISH):
return self.pubsub_action(data)
else:
raise Exception("Unknown call")

View File

@ -0,0 +1,74 @@
import re
from .protocols.base import BaseProtocol
from .exceptions import WebSocketError
class WebSocketApplication(object):
protocol_class = BaseProtocol
def __init__(self, ws):
self.protocol = self.protocol_class(self)
self.ws = ws
def handle(self):
self.protocol.on_open()
while True:
try:
message = self.ws.receive()
except WebSocketError:
self.protocol.on_close()
break
self.protocol.on_message(message)
def on_open(self, *args, **kwargs):
pass
def on_close(self, *args, **kwargs):
pass
def on_message(self, message, *args, **kwargs):
self.ws.send(message, **kwargs)
@classmethod
def protocol_name(cls):
return cls.protocol_class.PROTOCOL_NAME
class Resource(object):
def __init__(self, apps=None):
self.apps = apps if apps else []
def _app_by_path(self, environ_path):
# Which app matched the current path?
for path, app in self.apps.iteritems():
if re.match(path, environ_path):
return app
def app_protocol(self, path):
app = self._app_by_path(path)
if hasattr(app, 'protocol_name'):
return app.protocol_name()
else:
return ''
def __call__(self, environ, start_response):
environ = environ
current_app = self._app_by_path(environ['PATH_INFO'])
if current_app is None:
raise Exception("No apps defined")
if 'wsgi.websocket' in environ:
ws = environ['wsgi.websocket']
current_app = current_app(ws)
current_app.ws = ws # TODO: needed?
current_app.handle()
return None
else:
return current_app(environ, start_response)

View File

@ -0,0 +1,34 @@
from gevent.pywsgi import WSGIServer
from .handler import WebSocketHandler
from .logging import create_logger
class WebSocketServer(WSGIServer):
debug_log_format = (
'-' * 80 + '\n' +
'%(levelname)s in %(module)s [%(pathname)s:%(lineno)d]:\n' +
'%(message)s\n' +
'-' * 80
)
def __init__(self, *args, **kwargs):
self.debug = kwargs.pop('debug', False)
self.pre_start_hook = kwargs.pop('pre_start_hook', None)
self._logger = None
self.clients = {}
kwargs['handler_class'] = WebSocketHandler
super(WebSocketServer, self).__init__(*args, **kwargs)
def handle(self, socket, address):
handler = self.handler_class(socket, address, self)
handler.handle()
@property
def logger(self):
if not self._logger:
self._logger = create_logger(
__name__, self.debug, self.debug_log_format)
return self._logger

View File

@ -0,0 +1,128 @@
###############################################################################
##
## Copyright 2011-2013 Tavendo GmbH
##
## Note:
##
## This code is a Python implementation of the algorithm
##
## "Flexible and Economical UTF-8 Decoder"
##
## by Bjoern Hoehrmann
##
## bjoern@hoehrmann.de
## http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################
## use Cython implementation of UTF8 validator if available
##
try:
from wsaccel.utf8validator import Utf8Validator
except:
## fallback to pure Python implementation
class Utf8Validator:
"""
Incremental UTF-8 validator with constant memory consumption (minimal
state).
Implements the algorithm "Flexible and Economical UTF-8 Decoder" by
Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/).
"""
## DFA transitions
UTF8VALIDATOR_DFA = [
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 00..1f
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 20..3f
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 40..5f
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 60..7f
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, # 80..9f
7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, # a0..bf
8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, # c0..df
0xa,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x4,0x3,0x3, # e0..ef
0xb,0x6,0x6,0x6,0x5,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8, # f0..ff
0x0,0x1,0x2,0x3,0x5,0x8,0x7,0x1,0x1,0x1,0x4,0x6,0x1,0x1,0x1,0x1, # s0..s0
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,1, # s1..s2
1,2,1,1,1,1,1,2,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1, # s3..s4
1,2,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,1,1, # s5..s6
1,3,1,1,1,1,1,3,1,3,1,1,1,1,1,1,1,3,1,1,1,1,1,1,1,1,1,1,1,1,1,1, # s7..s8
]
UTF8_ACCEPT = 0
UTF8_REJECT = 1
def __init__(self):
self.reset()
def decode(self, b):
"""
Eat one UTF-8 octet, and validate on the fly.
Returns UTF8_ACCEPT when enough octets have been consumed, in which case
self.codepoint contains the decoded Unicode code point.
Returns UTF8_REJECT when invalid UTF-8 was encountered.
Returns some other positive integer when more octets need to be eaten.
"""
type = Utf8Validator.UTF8VALIDATOR_DFA[b]
if self.state != Utf8Validator.UTF8_ACCEPT:
self.codepoint = (b & 0x3f) | (self.codepoint << 6)
else:
self.codepoint = (0xff >> type) & b
self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + self.state * 16 + type]
return self.state
def reset(self):
"""
Reset validator to start new incremental UTF-8 decode/validation.
"""
self.state = Utf8Validator.UTF8_ACCEPT
self.codepoint = 0
self.i = 0
def validate(self, ba):
"""
Incrementally validate a chunk of bytes provided as string.
Will return a quad (valid?, endsOnCodePoint?, currentIndex, totalIndex).
As soon as an octet is encountered which renders the octet sequence
invalid, a quad with valid? == False is returned. currentIndex returns
the index within the currently consumed chunk, and totalIndex the
index within the total consumed sequence that was the point of bail out.
When valid? == True, currentIndex will be len(ba) and totalIndex the
total amount of consumed bytes.
"""
l = len(ba)
for i in xrange(l):
## optimized version of decode(), since we are not interested in actual code points
self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + (self.state << 4) + Utf8Validator.UTF8VALIDATOR_DFA[ord(ba[i])]]
if self.state == Utf8Validator.UTF8_REJECT:
self.i += i
return False, False, i, self.i
self.i += l
return True, self.state == Utf8Validator.UTF8_ACCEPT, l, self.i

View File

@ -0,0 +1,45 @@
import subprocess
def get_version(version=None):
"Returns a PEP 386-compliant version number from VERSION."
if version is None:
from geventwebsocket import VERSION as version
else:
assert len(version) == 5
assert version[3] in ('alpha', 'beta', 'rc', 'final')
# Now build the two parts of the version number:
# main = X.Y[.Z]
# sub = .devN - for pre-alpha releases
# | {a|b|c}N - for alpha, beta and rc releases
parts = 2 if version[2] == 0 else 3
main = '.'.join(str(x) for x in version[:parts])
sub = ''
if version[3] == 'alpha' and version[4] == 0:
hg_changeset = get_hg_changeset()
if hg_changeset:
sub = '.dev{0}'.format(hg_changeset)
elif version[3] != 'final':
mapping = {'alpha': 'a', 'beta': 'b', 'rc': 'c'}
sub = mapping[version[3]] + str(version[4])
return str(main + sub)
def get_hg_changeset():
rev, err = subprocess.Popen(
'hg id -i',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
).communicate()
if err:
return None
else:
return rev.strip().replace('+', '')

View File

@ -0,0 +1,543 @@
import struct
from socket import error
from .exceptions import ProtocolError
from .exceptions import WebSocketError
from .exceptions import FrameTooLargeException
from .utf8validator import Utf8Validator
MSG_SOCKET_DEAD = "Socket is dead"
MSG_ALREADY_CLOSED = "Connection is already closed"
MSG_CLOSED = "Connection closed"
class WebSocket(object):
"""
Base class for supporting websocket operations.
:ivar environ: The http environment referenced by this connection.
:ivar closed: Whether this connection is closed/closing.
:ivar stream: The underlying file like object that will be read from /
written to by this WebSocket object.
"""
__slots__ = ('utf8validator', 'utf8validate_last', 'environ', 'closed',
'stream', 'raw_write', 'raw_read', 'handler')
OPCODE_CONTINUATION = 0x00
OPCODE_TEXT = 0x01
OPCODE_BINARY = 0x02
OPCODE_CLOSE = 0x08
OPCODE_PING = 0x09
OPCODE_PONG = 0x0a
def __init__(self, environ, stream, handler):
self.environ = environ
self.closed = False
self.stream = stream
self.raw_write = stream.write
self.raw_read = stream.read
self.utf8validator = Utf8Validator()
self.handler = handler
def __del__(self):
try:
self.close()
except:
# close() may fail if __init__ didn't complete
pass
def _decode_bytes(self, bytestring):
"""
Internal method used to convert the utf-8 encoded bytestring into
unicode.
If the conversion fails, the socket will be closed.
"""
if not bytestring:
return u''
try:
return bytestring.decode('utf-8')
except UnicodeDecodeError:
self.close(1007)
raise
def _encode_bytes(self, text):
"""
:returns: The utf-8 byte string equivalent of `text`.
"""
if isinstance(text, str):
return text
if not isinstance(text, unicode):
text = unicode(text or '')
return text.encode('utf-8')
def _is_valid_close_code(self, code):
"""
:returns: Whether the returned close code is a valid hybi return code.
"""
if code < 1000:
return False
if 1004 <= code <= 1006:
return False
if 1012 <= code <= 1016:
return False
if code == 1100:
# not sure about this one but the autobahn fuzzer requires it.
return False
if 2000 <= code <= 2999:
return False
return True
@property
def current_app(self):
if hasattr(self.handler.server.application, 'current_app'):
return self.handler.server.application.current_app
else:
# For backwards compatibility reasons
class MockApp():
def on_close(self, *args):
pass
return MockApp()
@property
def origin(self):
if not self.environ:
return
return self.environ.get('HTTP_ORIGIN')
@property
def protocol(self):
if not self.environ:
return
return self.environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL')
@property
def version(self):
if not self.environ:
return
return self.environ.get('HTTP_SEC_WEBSOCKET_VERSION')
@property
def path(self):
if not self.environ:
return
return self.environ.get('PATH_INFO')
@property
def logger(self):
return self.handler.logger
def handle_close(self, header, payload):
"""
Called when a close frame has been decoded from the stream.
:param header: The decoded `Header`.
:param payload: The bytestring payload associated with the close frame.
"""
if not payload:
self.close(1000, None)
return
if len(payload) < 2:
raise ProtocolError('Invalid close frame: {0} {1}'.format(
header, payload))
code = struct.unpack('!H', str(payload[:2]))[0]
payload = payload[2:]
if payload:
validator = Utf8Validator()
val = validator.validate(payload)
if not val[0]:
raise UnicodeError
if not self._is_valid_close_code(code):
raise ProtocolError('Invalid close code {0}'.format(code))
self.close(code, payload)
def handle_ping(self, header, payload):
self.send_frame(payload, self.OPCODE_PONG)
def handle_pong(self, header, payload):
pass
def read_frame(self):
"""
Block until a full frame has been read from the socket.
This is an internal method as calling this will not cleanup correctly
if an exception is called. Use `receive` instead.
:return: The header and payload as a tuple.
"""
header = Header.decode_header(self.stream)
if header.flags:
raise ProtocolError
if not header.length:
return header, ''
try:
payload = self.raw_read(header.length)
except error:
payload = ''
except Exception:
# TODO log out this exception
payload = ''
if len(payload) != header.length:
raise WebSocketError('Unexpected EOF reading frame payload')
if header.mask:
payload = header.unmask_payload(payload)
return header, payload
def validate_utf8(self, payload):
# Make sure the frames are decodable independently
self.utf8validate_last = self.utf8validator.validate(payload)
if not self.utf8validate_last[0]:
raise UnicodeError("Encountered invalid UTF-8 while processing "
"text message at payload octet index "
"{0:d}".format(self.utf8validate_last[3]))
def read_message(self):
"""
Return the next text or binary message from the socket.
This is an internal method as calling this will not cleanup correctly
if an exception is called. Use `receive` instead.
"""
opcode = None
message = ""
while True:
header, payload = self.read_frame()
f_opcode = header.opcode
if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY):
# a new frame
if opcode:
raise ProtocolError("The opcode in non-fin frame is "
"expected to be zero, got "
"{0!r}".format(f_opcode))
# Start reading a new message, reset the validator
self.utf8validator.reset()
self.utf8validate_last = (True, True, 0, 0)
opcode = f_opcode
elif f_opcode == self.OPCODE_CONTINUATION:
if not opcode:
raise ProtocolError("Unexpected frame with opcode=0")
elif f_opcode == self.OPCODE_PING:
self.handle_ping(header, payload)
continue
elif f_opcode == self.OPCODE_PONG:
self.handle_pong(header, payload)
continue
elif f_opcode == self.OPCODE_CLOSE:
self.handle_close(header, payload)
return
else:
raise ProtocolError("Unexpected opcode={0!r}".format(f_opcode))
if opcode == self.OPCODE_TEXT:
self.validate_utf8(payload)
message += payload
if header.fin:
break
if opcode == self.OPCODE_TEXT:
self.validate_utf8(message)
return message
else:
return bytearray(message)
def receive(self):
"""
Read and return a message from the stream. If `None` is returned, then
the socket is considered closed/errored.
"""
if self.closed:
self.current_app.on_close(MSG_ALREADY_CLOSED)
raise WebSocketError(MSG_ALREADY_CLOSED)
try:
return self.read_message()
except UnicodeError:
self.close(1007)
except ProtocolError:
self.close(1002)
except error:
self.close()
self.current_app.on_close(MSG_CLOSED)
return None
def send_frame(self, message, opcode):
"""
Send a frame over the websocket with message as its payload
"""
if self.closed:
self.current_app.on_close(MSG_ALREADY_CLOSED)
raise WebSocketError(MSG_ALREADY_CLOSED)
if opcode == self.OPCODE_TEXT:
message = self._encode_bytes(message)
elif opcode == self.OPCODE_BINARY:
message = str(message)
header = Header.encode_header(True, opcode, '', len(message), 0)
try:
self.raw_write(header + message)
except error:
raise WebSocketError(MSG_SOCKET_DEAD)
def send(self, message, binary=None):
"""
Send a frame over the websocket with message as its payload
"""
if binary is None:
binary = not isinstance(message, (str, unicode))
opcode = self.OPCODE_BINARY if binary else self.OPCODE_TEXT
try:
self.send_frame(message, opcode)
except WebSocketError:
self.current_app.on_close(MSG_SOCKET_DEAD)
raise WebSocketError(MSG_SOCKET_DEAD)
def close(self, code=1000, message=''):
"""
Close the websocket and connection, sending the specified code and
message. The underlying socket object is _not_ closed, that is the
responsibility of the initiator.
"""
if self.closed:
self.current_app.on_close(MSG_ALREADY_CLOSED)
try:
message = self._encode_bytes(message)
self.send_frame(
struct.pack('!H%ds' % len(message), code, message),
opcode=self.OPCODE_CLOSE)
except WebSocketError:
# Failed to write the closing frame but it's ok because we're
# closing the socket anyway.
self.logger.debug("Failed to write closing frame -> closing socket")
finally:
self.logger.debug("Closed WebSocket")
self.closed = True
self.stream = None
self.raw_write = None
self.raw_read = None
self.environ = None
#self.current_app.on_close(MSG_ALREADY_CLOSED)
class Stream(object):
"""
Wraps the handler's socket/rfile attributes and makes it in to a file like
object that can be read from/written to by the lower level websocket api.
"""
__slots__ = ('handler', 'read', 'write')
def __init__(self, handler):
self.handler = handler
self.read = handler.rfile.read
self.write = handler.socket.sendall
class Header(object):
__slots__ = ('fin', 'mask', 'opcode', 'flags', 'length')
FIN_MASK = 0x80
OPCODE_MASK = 0x0f
MASK_MASK = 0x80
LENGTH_MASK = 0x7f
RSV0_MASK = 0x40
RSV1_MASK = 0x20
RSV2_MASK = 0x10
# bitwise mask that will determine the reserved bits for a frame header
HEADER_FLAG_MASK = RSV0_MASK | RSV1_MASK | RSV2_MASK
def __init__(self, fin=0, opcode=0, flags=0, length=0):
self.mask = ''
self.fin = fin
self.opcode = opcode
self.flags = flags
self.length = length
def mask_payload(self, payload):
payload = bytearray(payload)
mask = bytearray(self.mask)
for i in xrange(self.length):
payload[i] ^= mask[i % 4]
return str(payload)
# it's the same operation
unmask_payload = mask_payload
def __repr__(self):
return ("<Header fin={0} opcode={1} length={2} flags={3} at "
"0x{4:x}>").format(self.fin, self.opcode, self.length,
self.flags, id(self))
@classmethod
def decode_header(cls, stream):
"""
Decode a WebSocket header.
:param stream: A file like object that can be 'read' from.
:returns: A `Header` instance.
"""
read = stream.read
data = read(2)
if len(data) != 2:
raise WebSocketError("Unexpected EOF while decoding header")
first_byte, second_byte = struct.unpack('!BB', data)
header = cls(
fin=first_byte & cls.FIN_MASK == cls.FIN_MASK,
opcode=first_byte & cls.OPCODE_MASK,
flags=first_byte & cls.HEADER_FLAG_MASK,
length=second_byte & cls.LENGTH_MASK)
has_mask = second_byte & cls.MASK_MASK == cls.MASK_MASK
if header.opcode > 0x07:
if not header.fin:
raise ProtocolError(
"Received fragmented control frame: {0!r}".format(data))
# Control frames MUST have a payload length of 125 bytes or less
if header.length > 125:
raise FrameTooLargeException(
"Control frame cannot be larger than 125 bytes: "
"{0!r}".format(data))
if header.length == 126:
# 16 bit length
data = read(2)
if len(data) != 2:
raise WebSocketError('Unexpected EOF while decoding header')
header.length = struct.unpack('!H', data)[0]
elif header.length == 127:
# 64 bit length
data = read(8)
if len(data) != 8:
raise WebSocketError('Unexpected EOF while decoding header')
header.length = struct.unpack('!Q', data)[0]
if has_mask:
mask = read(4)
if len(mask) != 4:
raise WebSocketError('Unexpected EOF while decoding header')
header.mask = mask
return header
@classmethod
def encode_header(cls, fin, opcode, mask, length, flags):
"""
Encodes a WebSocket header.
:param fin: Whether this is the final frame for this opcode.
:param opcode: The opcode of the payload, see `OPCODE_*`
:param mask: Whether the payload is masked.
:param length: The length of the frame.
:param flags: The RSV* flags.
:return: A bytestring encoded header.
"""
first_byte = opcode
second_byte = 0
extra = ''
if fin:
first_byte |= cls.FIN_MASK
if flags & cls.RSV0_MASK:
first_byte |= cls.RSV0_MASK
if flags & cls.RSV1_MASK:
first_byte |= cls.RSV1_MASK
if flags & cls.RSV2_MASK:
first_byte |= cls.RSV2_MASK
# now deal with length complexities
if length < 126:
second_byte += length
elif length <= 0xffff:
second_byte += 126
extra = struct.pack('!H', length)
elif length <= 0xffffffffffffffff:
second_byte += 127
extra = struct.pack('!Q', length)
else:
raise FrameTooLargeException
if mask:
second_byte |= cls.MASK_MASK
extra += mask
return chr(first_byte) + chr(second_byte) + extra

View File

@ -0,0 +1,76 @@
__version__ = (0, 3, 5)
import logging
import gevent
log = logging.getLogger(__name__)
def socketio_manage(environ, namespaces, request=None, error_handler=None):
"""Main SocketIO management function, call from within your Framework of
choice's view.
The ``environ`` variable is the WSGI ``environ``. It is used to extract
Socket object from the underlying server (as the 'socketio' key), and will
be attached to both the ``Socket`` and ``Namespace`` objects.
The ``namespaces`` parameter is a dictionary of the namespace string
representation as key, and the BaseNamespace namespace class descendant as
a value. The empty string ('') namespace is the global namespace. You can
use Socket.GLOBAL_NS to be more explicit. So it would look like:
.. code-block:: python
namespaces={'': GlobalNamespace,
'/chat': ChatNamespace}
The ``request`` object is not required, but will probably be useful to pass
framework-specific things into your Socket and Namespace functions. It will
simply be attached to the Socket and Namespace object (accessible through
``self.request`` in both cases), and it is not accessed in any case by the
``gevent-socketio`` library.
Pass in an ``error_handler`` if you want to override the default
error_handler (which is :func:`socketio.virtsocket.default_error_handler`.
The callable you pass in should have the same signature as the default
error handler.
This function will block the current "view" or "controller" in your
framework to do the recv/send on the socket, and dispatch incoming messages
to your namespaces.
This is a simple example using Pyramid:
.. code-block:: python
def my_view(request):
socketio_manage(request.environ, {'': GlobalNamespace}, request)
NOTE: You must understand that this function is going to be called
*only once* per socket opening, *even though* you are using a long
polling mechanism. The subsequent calls (for long polling) will
be hooked directly at the server-level, to interact with the
active ``Socket`` instance. This means you will *not* get access
to the future ``request`` or ``environ`` objects. This is of
particular importance regarding sessions (like Beaker). The
session will be opened once at the opening of the Socket, and not
closed until the socket is closed. You are responsible for
opening and closing the cookie-based session yourself if you want
to keep its data in sync with the rest of your GET/POST calls.
"""
socket = environ['socketio']
socket._set_environ(environ)
socket._set_namespaces(namespaces)
if request:
socket._set_request(request)
if error_handler:
socket._set_error_handler(error_handler)
receiver_loop = socket._spawn_receiver_loop()
watcher = socket._spawn_watcher()
gevent.joinall([receiver_loop, watcher])
# TODO: double check, what happens to the WSGI request here ? it vanishes ?
return

View File

@ -0,0 +1,149 @@
import sys
import re
import gevent
import urlparse
from gevent.pywsgi import WSGIHandler
from socketio import transports
from geventwebsocket.handler import WebSocketHandler
class SocketIOHandler(WSGIHandler):
RE_REQUEST_URL = re.compile(r"""
^/(?P<resource>[^/]+)
/(?P<protocol_version>[^/]+)
/(?P<transport_id>[^/]+)
/(?P<sessid>[^/]+)/?$
""", re.X)
RE_HANDSHAKE_URL = re.compile(r"^/(?P<resource>[^/]+)/1/$", re.X)
handler_types = {
'websocket': transports.WebsocketTransport,
'flashsocket': transports.FlashSocketTransport,
'htmlfile': transports.HTMLFileTransport,
'xhr-multipart': transports.XHRMultipartTransport,
'xhr-polling': transports.XHRPollingTransport,
'jsonp-polling': transports.JSONPolling,
}
def __init__(self, *args, **kwargs):
self.socketio_connection = False
self.allowed_paths = None
super(SocketIOHandler, self).__init__(*args, **kwargs)
self.transports = self.handler_types.keys()
if self.server.transports:
self.transports = self.server.transports
if not set(self.transports).issubset(set(self.handler_types)):
raise Exception("transports should be elements of: %s" %
(self.handler_types.keys()))
def _do_handshake(self, tokens):
if tokens["resource"] != self.server.resource:
self.log_error("socket.io URL mismatch")
else:
socket = self.server.get_socket()
data = "%s:15:10:%s" % (socket.sessid, ",".join(self.transports))
self.write_smart(data)
def write_jsonp_result(self, data, wrapper="0"):
self.start_response("200 OK", [
("Content-Type", "application/javascript"),
])
self.result = ['io.j[%s]("%s");' % (wrapper, data)]
def write_plain_result(self, data):
self.start_response("200 OK", [
("Access-Control-Allow-Origin", self.environ.get('HTTP_ORIGIN', '*')),
("Access-Control-Allow-Credentials", "true"),
("Access-Control-Allow-Methods", "POST, GET, OPTIONS"),
("Access-Control-Max-Age", 3600),
("Content-Type", "text/plain"),
])
self.result = [data]
def write_smart(self, data):
args = urlparse.parse_qs(self.environ.get("QUERY_STRING"))
if "jsonp" in args:
self.write_jsonp_result(data, args["jsonp"][0])
else:
self.write_plain_result(data)
self.process_result()
def handle_one_response(self):
path = self.environ.get('PATH_INFO')
# Kick non-socket.io requests to our superclass
if not path.lstrip('/').startswith(self.server.resource):
return super(SocketIOHandler, self).handle_one_response()
self.status = None
self.headers_sent = False
self.result = None
self.response_length = 0
self.response_use_chunked = False
request_method = self.environ.get("REQUEST_METHOD")
request_tokens = self.RE_REQUEST_URL.match(path)
# Parse request URL and QUERY_STRING and do handshake
if request_tokens:
request_tokens = request_tokens.groupdict()
else:
handshake_tokens = self.RE_HANDSHAKE_URL.match(path)
if handshake_tokens:
return self._do_handshake(handshake_tokens.groupdict())
else:
# This is no socket.io request. Let the WSGI app handle it.
return super(SocketIOHandler, self).handle_one_response()
# Setup the transport and socket
transport = self.handler_types.get(request_tokens["transport_id"])
sessid = request_tokens["sessid"]
socket = self.server.get_socket(sessid)
# In case this is WebSocket request, switch to the WebSocketHandler
# FIXME: fix this ugly class change
if issubclass(transport, (transports.WebsocketTransport,
transports.FlashSocketTransport)):
self.__class__ = WebSocketHandler
self.prevent_wsgi_call = True # thank you
# TODO: any errors, treat them ??
self.handle_one_response()
# Make the socket object available for WSGI apps
self.environ['socketio'] = socket
# Create a transport and handle the request likewise
self.transport = transport(self)
jobs = self.transport.connect(socket, request_method)
# Keep track of those jobs (reading, writing and heartbeat jobs) so
# that we can kill them later with Socket.kill()
socket.jobs.extend(jobs)
try:
# We'll run the WSGI app if it wasn't already done.
if socket.wsgi_app_greenlet is None:
# TODO: why don't we spawn a call to handle_one_response here ?
# why call directly the WSGI machinery ?
start_response = lambda status, headers, exc=None: None
socket.wsgi_app_greenlet = gevent.spawn(self.application,
self.environ,
start_response)
except:
self.handle_error(*sys.exc_info())
# TODO DOUBLE-CHECK: do we need to joinall here ?
gevent.joinall(jobs)
def handle_bad_request(self):
self.close_connection = True
self.start_reponse("400 Bad Request", [
('Content-Type', 'text/plain'),
('Connection', 'close'),
('Content-Length', 0)
])

View File

@ -0,0 +1,73 @@
"""
These are general-purpose Mixins -- for use with Namespaces -- that are
generally useful for most simple projects, e.g. Rooms, Broadcast.
You'll likely want to create your own Mixins.
"""
class RoomsMixin(object):
def __init__(self, *args, **kwargs):
super(RoomsMixin, self).__init__(*args, **kwargs)
if 'rooms' not in self.session:
self.session['rooms'] = set() # a set of simple strings
def join(self, room):
"""Lets a user join a room on a specific Namespace."""
self.session['rooms'].add(self._get_room_name(room))
def leave(self, room):
"""Lets a user leave a room on a specific Namespace."""
self.session['rooms'].remove(self._get_room_name(room))
def _get_room_name(self, room):
return self.ns_name + '_' + room
def emit_to_room(self, room, event, *args):
"""This is sent to all in the room (in this particular Namespace)"""
pkt = dict(type="event",
name=event,
args=args,
endpoint=self.ns_name)
room_name = self._get_room_name(room)
for sessid, socket in self.socket.server.sockets.iteritems():
if 'rooms' not in socket.session:
continue
if room_name in socket.session['rooms'] and self.socket != socket:
socket.send_packet(pkt)
class BroadcastMixin(object):
"""Mix in this class with your Namespace to have a broadcast event method.
Use it like this:
class MyNamespace(BaseNamespace, BroadcastMixin):
def on_chatmsg(self, event):
self.broadcast_event('chatmsg', event)
"""
def broadcast_event(self, event, *args):
"""
This is sent to all in the sockets in this particular Namespace,
including itself.
"""
pkt = dict(type="event",
name=event,
args=args,
endpoint=self.ns_name)
for sessid, socket in self.socket.server.sockets.iteritems():
socket.send_packet(pkt)
def broadcast_event_not_me(self, event, *args):
"""
This is sent to all in the sockets in this particular Namespace,
except itself.
"""
pkt = dict(type="event",
name=event,
args=args,
endpoint=self.ns_name)
for sessid, socket in self.socket.server.sockets.iteritems():
if socket is not self.socket:
socket.send_packet(pkt)

View File

@ -0,0 +1,468 @@
import gevent
import re
import logging
import inspect
log = logging.getLogger(__name__)
# regex to check the event name contains only alpha numerical characters
allowed_event_name_regex = re.compile(r'^[A-Za-z][A-Za-z0-9_ ]*$')
class BaseNamespace(object):
"""The **Namespace** is the primary interface a developer will use
to create a gevent-socketio-based application.
You should create your own subclass of this class, optionally using one
of the :mod:`socketio.mixins` provided (or your own), and define methods
such as:
.. code-block:: python
:linenos:
def on_my_event(self, my_first_arg, my_second_arg):
print "This is the my_first_arg object", my_first_arg
print "This is the my_second_arg object", my_second_arg
def on_my_second_event(self, whatever):
print "This holds the first arg that was passed", whatever
We can also access the full packet directly by making an event handler
that accepts a single argument named 'packet':
.. code-block:: python
:linenos:
def on_third_event(self, packet):
print "The full packet", packet
print "See the BaseNamespace::call_method() method for details"
"""
def __init__(self, environ, ns_name, request=None):
self.environ = environ
self.socket = environ['socketio']
self.session = self.socket.session # easily accessible session
self.request = request
self.ns_name = ns_name
self.allowed_methods = None # be careful, None means all methods
# are allowed while an empty list
# means totally closed.
self.jobs = []
self.reset_acl()
# Init the mixins if specified after.
super(BaseNamespace, self).__init__()
def is_method_allowed(self, method_name):
"""ACL system: this checks if you have access to that method_name,
according to the set ACLs"""
if self.allowed_methods is None:
return True
else:
return method_name in self.allowed_methods
def add_acl_method(self, method_name):
"""ACL system: make the method_name accessible to the current socket"""
if isinstance(self.allowed_methods, set):
self.allowed_methods.add(method_name)
else:
self.allowed_methods = set([method_name])
def del_acl_method(self, method_name):
"""ACL system: ensure the user will not have access to that method."""
if self.allowed_methods is None:
raise ValueError(
"Trying to delete an ACL method, but none were"
+ " defined yet! Or: No ACL restrictions yet, why would you"
+ " delete one?"
)
self.allowed_methods.remove(method_name)
def lift_acl_restrictions(self):
"""ACL system: This removes restrictions on the Namespace's methods, so
that all the ``on_*()`` and ``recv_*()`` can be accessed.
"""
self.allowed_methods = None
def get_initial_acl(self):
"""ACL system: If you define this function, you must return
all the 'event' names that you want your User (the established
virtual Socket) to have access to.
If you do not define this function, the user will have free
access to all of the ``on_*()`` and ``recv_*()`` functions,
etc.. methods.
Return something like: ``['on_connect', 'on_public_method']``
You can later modify this list dynamically (inside
``on_connect()`` for example) using:
.. code-block:: python
self.add_acl_method('on_secure_method')
``self.request`` is available in here, if you're already ready to
do some auth. check.
The ACLs are checked by the :meth:`process_packet` and/or
:meth:`process_event` default implementations, before calling
the class's methods.
**Beware**, returning ``None`` leaves the namespace completely
accessible.
"""
return None
def reset_acl(self):
"""Resets ACL to its initial value (calling
:meth:`get_initial_acl`` and applying that again).
"""
self.allowed_methods = self.get_initial_acl()
def process_packet(self, packet):
"""If you override this, NONE of the functions in this class
will be called. It is responsible for dispatching to
:meth:`process_event` (which in turn calls ``on_*()`` and
``recv_*()`` methods).
If the packet arrived here, it is because it belongs to this endpoint.
For each packet arriving, the only possible path of execution, that is,
the only methods that *can* be called are the following:
* recv_connect()
* recv_message()
* recv_json()
* recv_error()
* recv_disconnect()
* on_*()
"""
packet_type = packet['type']
if packet_type == 'event':
return self.process_event(packet)
elif packet_type == 'message':
return self.call_method_with_acl('recv_message', packet,
packet['data'])
elif packet_type == 'json':
return self.call_method_with_acl('recv_json', packet,
packet['data'])
elif packet_type == 'connect':
self.socket.send_packet(packet)
return self.call_method_with_acl('recv_connect', packet)
elif packet_type == 'error':
return self.call_method_with_acl('recv_error', packet)
elif packet_type == 'ack':
callback = self.socket._pop_ack_callback(packet['ackId'])
if not callback:
print "ERROR: No such callback for ackId %s" % packet['ackId']
return
try:
return callback(*(packet['args']))
except TypeError, e:
print "ERROR: Call to callback function failed", packet
elif packet_type == 'disconnect':
# Force a disconnect on the namespace.
return self.call_method_with_acl('recv_disconnect', packet)
else:
print "Unprocessed packet", packet
# TODO: manage the other packet types: disconnect
def process_event(self, packet):
"""This function dispatches ``event`` messages to the correct
functions. You should override this method only if you are not
satisfied with the automatic dispatching to
``on_``-prefixed methods. You could then implement your own dispatch.
See the source code for inspiration.
To process events that have callbacks on the client side, you
must define your event with a single parameter: ``packet``.
In this case, it will be the full ``packet`` object and you
can inspect its ``ack`` and ``id`` keys to define if and how
you reply. A correct reply to an event with a callback would
look like this:
.. code-block:: python
def on_my_callback(self, packet):
if 'ack' in packet':
self.emit('go_back', 'param1', id=packet['id'])
"""
args = packet['args']
name = packet['name']
if not allowed_event_name_regex.match(name):
self.error("unallowed_event_name",
"name must only contains alpha numerical characters")
return
method_name = 'on_' + name.replace(' ', '_')
# This means the args, passed as a list, will be expanded to
# the method arg and if you passed a dict, it will be a dict
# as the first parameter.
return self.call_method_with_acl(method_name, packet, *args)
def call_method_with_acl(self, method_name, packet, *args):
"""You should always use this function to call the methods,
as it checks if the user is allowed according to the ACLs.
If you override :meth:`process_packet` or
:meth:`process_event`, you should definitely want to use this
instead of ``getattr(self, 'my_method')()``
"""
if not self.is_method_allowed(method_name):
self.error('method_access_denied',
'You do not have access to method "%s"' % method_name)
return
return self.call_method(method_name, packet, *args)
def call_method(self, method_name, packet, *args):
"""This function is used to implement the two behaviors on dispatched
``on_*()`` and ``recv_*()`` method calls.
Those are the two behaviors:
* If there is only one parameter on the dispatched method and
it is equal to ``packet``, then pass in the packet as the
sole parameter.
* Otherwise, pass in the arguments as specified by the
different ``recv_*()`` methods args specs, or the
:meth:`process_event` documentation.
"""
method = getattr(self, method_name, None)
if method is None:
self.error('no_such_method',
'The method "%s" was not found' % method_name)
return
specs = inspect.getargspec(method)
func_args = specs.args
if not len(func_args) or func_args[0] != 'self':
self.error("invalid_method_args",
"The server-side method is invalid, as it doesn't "
"have 'self' as its first argument")
return
if len(func_args) == 2 and func_args[1] == 'packet':
return method(packet)
else:
return method(*args)
def initialize(self):
"""This is called right after ``__init__``, on the initial
creation of a namespace so you may handle any setup job you
need.
Namespaces are created only when some packets arrive that ask
for the namespace. They are not created altogether when a new
:class:`~socketio.virtsocket.Socket` connection is established,
so you can have many many namespaces assigned (when calling
:func:`~socketio.socketio_manage`) without clogging the
memory.
If you override this method, you probably want to only
initialize the variables you're going to use in the events of
this namespace, say, with some default values, but not perform
any operation that assumes authentication/authorization.
"""
pass
def recv_message(self, data):
"""This is more of a backwards compatibility hack. This will be
called for messages sent with the original send() call on the client
side. This is NOT the 'message' event, which you will catch with
'on_message()'. The data arriving here is a simple string, with no
other info.
If you want to handle those messages, you should override this method.
"""
return data
def recv_json(self, data):
"""This is more of a backwards compatibility hack. This will be
called for JSON packets sent with the original json() call on the
JavaScript side. This is NOT the 'json' event, which you will catch
with 'on_json()'. The data arriving here is a python dict, with no
event name.
If you want to handle those messages, you should override this method.
"""
return data
def recv_disconnect(self):
"""Override this function if you want to do something when you get a
*force disconnect* packet.
By default, this function calls the :meth:`disconnect` clean-up
function. You probably want to call it yourself also, and put
your clean-up routines in :meth:`disconnect` rather than here,
because that :meth:`disconnect` function gets called
automatically upon disconnection. This function is a
pre-handle for when you get the `disconnect packet`.
"""
self.disconnect(silent=True)
def recv_connect(self):
"""Called the first time a client connection is open on a
Namespace. This *does not* fire on the global namespace.
This allows you to do boilerplate stuff for
the namespace like connecting to rooms, broadcasting events
to others, doing authorization work, and tweaking the ACLs to open
up the rest of the namespace (if it was closed at the
beginning by having :meth:`get_initial_acl` return only
['recv_connect'])
Also see the different :ref:`mixins <mixins_module>` (like
`RoomsMixin`, `BroadcastMixin`).
"""
pass
def recv_error(self, packet):
"""Override this function to handle the errors we get from the client.
:param packet: the full packet.
"""
pass
def error(self, error_name, error_message, msg_id=None, quiet=False):
"""Use this to use the configured ``error_handler`` yield an
error message to your application.
:param error_name: is a short string, to associate messages to recovery
methods
:param error_message: is some human-readable text, describing the error
:param msg_id: is used to associate with a request
:param quiet: specific to error_handlers. The default doesn't send a
message to the user, but shows a debug message on the
developer console.
"""
self.socket.error(error_name, error_message, endpoint=self.ns_name,
msg_id=msg_id, quiet=quiet)
def send(self, message, json=False, callback=None):
"""Use send to send a simple string message.
If ``json`` is True, the message will be encoded as a JSON object
on the wire, and decoded on the other side.
This is mostly for backwards compatibility. ``emit()`` is more fun.
:param callback: This is a callback function that will be
called automatically by the client upon
reception. It does not verify that the
listener over there was completed with
success. It just tells you that the browser
got a hold of the packet.
:type callback: callable
"""
pkt = dict(type="message", data=message, endpoint=self.ns_name)
if json:
pkt['type'] = "json"
if callback:
# By passing ack=True, we use the old behavior of being returned
# an 'ack' packet, automatically triggered by the client-side
# with no user-code being run. The emit() version of the
# callback is more useful I think :) So migrate your code.
pkt['ack'] = True
pkt['id'] = msgid = self.socket._get_next_msgid()
self.socket._save_ack_callback(msgid, callback)
self.socket.send_packet(pkt)
def emit(self, event, *args, **kwargs):
"""Use this to send a structured event, with a name and arguments, to
the client.
By default, it uses this namespace's endpoint. You can send messages on
other endpoints with something like:
``self.socket['/other_endpoint'].emit()``.
However, it is possible that the ``'/other_endpoint'`` was not
initialized yet, and that would yield a ``KeyError``.
The only supported ``kwargs`` is ``callback``. All other parameters
must be passed positionally.
:param event: The name of the event to trigger on the other end.
:param callback: Pass in the callback keyword argument to define a
call-back that will be called when the client acks.
This callback is slightly different from the one from
``send()``, as this callback will receive parameters
from the explicit call of the ``ack()`` function
passed to the listener on the client side.
The remote listener will need to explicitly ack (by
calling its last argument, a function which is
usually called 'ack') with some parameters indicating
success or error. The 'ack' packet coming back here
will then trigger the callback function with the
returned values.
:type callback: callable
"""
callback = kwargs.pop('callback', None)
if kwargs:
raise ValueError(
"emit() only supports positional argument, to stay "
"compatible with the Socket.IO protocol. You can "
"however pass in a dictionary as the first argument")
pkt = dict(type="event", name=event, args=args,
endpoint=self.ns_name)
if callback:
# By passing 'data', we indicate that we *want* an explicit ack
# by the client code, not an automatic as with send().
pkt['ack'] = 'data'
pkt['id'] = msgid = self.socket._get_next_msgid()
self.socket._save_ack_callback(msgid, callback)
self.socket.send_packet(pkt)
def spawn(self, fn, *args, **kwargs):
"""Spawn a new process, attached to this Namespace.
It will be monitored by the "watcher" process in the Socket. If the
socket disconnects, all these greenlets are going to be killed, after
calling BaseNamespace.disconnect()
"""
# self.log.debug("Spawning sub-Namespace Greenlet: %s" % fn.__name__)
new = gevent.spawn(fn, *args, **kwargs)
self.jobs.append(new)
return new
def disconnect(self, silent=False):
"""Send a 'disconnect' packet, so that the user knows it has been
disconnected (booted actually). This will trigger an onDisconnect()
call on the client side.
Over here, we will kill all ``spawn``ed processes and remove the
namespace from the Socket object.
:param silent: do not actually send the packet (if they asked for a
disconnect for example), but just kill all jobs spawned
by this Namespace, and remove it from the Socket.
"""
if not silent:
packet = {"type": "disconnect",
"endpoint": self.ns_name}
self.socket.send_packet(packet)
self.socket.remove_namespace(self.ns_name)
self.kill_local_jobs()
def kill_local_jobs(self):
"""Kills all the jobs spawned with BaseNamespace.spawn() on a namespace
object.
This will be called automatically if the ``watcher`` process detects
that the Socket was closed.
"""
gevent.killall(self.jobs)
self.jobs = []

View File

@ -0,0 +1,204 @@
try:
import simplejson as json
json_decimal_args = {"use_decimal": True} # pragma: no cover
except ImportError:
import json
import decimal
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
return float(o)
return super(DecimalEncoder, self).default(o)
json_decimal_args = {"cls": DecimalEncoder}
MSG_TYPES = {
'disconnect': 0,
'connect': 1,
'heartbeat': 2,
'message': 3,
'json': 4,
'event': 5,
'ack': 6,
'error': 7,
'noop': 8,
}
MSG_VALUES = dict((v, k) for k, v in MSG_TYPES.iteritems())
ERROR_REASONS = {
'transport not supported': 0,
'client not handshaken': 1,
'unauthorized': 2
}
REASONS_VALUES = dict((v, k) for k, v in ERROR_REASONS.iteritems())
ERROR_ADVICES = {
'reconnect': 0,
}
ADVICES_VALUES = dict((v, k) for k, v in ERROR_ADVICES.iteritems())
socketio_packet_attributes = ['type', 'name', 'data', 'endpoint', 'args',
'ackId', 'reason', 'advice', 'qs', 'id']
def encode(data):
"""
Encode an attribute dict into a byte string.
"""
payload = ''
msg = str(MSG_TYPES[data['type']])
if msg in ['0', '1']:
# '1::' [path] [query]
msg += '::' + data['endpoint']
if 'qs' in data and data['qs'] != '':
msg += ':' + data['qs']
elif msg == '2':
# heartbeat
msg += '::'
elif msg in ['3', '4', '5']:
# '3:' [id ('+')] ':' [endpoint] ':' [data]
# '4:' [id ('+')] ':' [endpoint] ':' [json]
# '5:' [id ('+')] ':' [endpoint] ':' [json encoded event]
# The message id is an incremental integer, required for ACKs.
# If the message id is followed by a +, the ACK is not handled by
# socket.io, but by the user instead.
if msg == '3':
payload = data['data']
if msg == '4':
payload = json.dumps(data['data'], separators=(',', ':'),
**json_decimal_args)
if msg == '5':
d = {}
d['name'] = data['name']
if 'args' in data and data['args'] != []:
d['args'] = data['args']
payload = json.dumps(d, separators=(',', ':'), **json_decimal_args)
if 'id' in data:
msg += ':' + str(data['id'])
if data['ack'] == 'data':
msg += '+'
msg += ':'
else:
msg += '::'
if 'endpoint' not in data:
data['endpoint'] = ''
if payload != '':
msg += data['endpoint'] + ':' + payload
else:
msg += data['endpoint']
elif msg == '6':
# '6:::' [id] '+' [data]
msg += '::' + data.get('endpoint', '') + ':' + str(data['ackId'])
if 'args' in data and data['args'] != []:
msg += '+' + json.dumps(data['args'], separators=(',', ':'),
**json_decimal_args)
elif msg == '7':
# '7::' [endpoint] ':' [reason] '+' [advice]
msg += ':::'
if 'reason' in data and data['reason'] != '':
msg += str(ERROR_REASONS[data['reason']])
if 'advice' in data and data['advice'] != '':
msg += '+' + str(ERROR_ADVICES[data['advice']])
msg += data['endpoint']
# NoOp, used to close a poll after the polling duration time
elif msg == '8':
msg += '::'
return msg
def decode(rawstr):
"""
Decode a rawstr packet arriving from the socket into a dict.
"""
decoded_msg = {}
split_data = rawstr.split(":", 3)
msg_type = split_data[0]
msg_id = split_data[1]
endpoint = split_data[2]
data = ''
if msg_id != '':
if "+" in msg_id:
msg_id = msg_id.split('+')[0]
decoded_msg['id'] = int(msg_id)
decoded_msg['ack'] = 'data'
else:
decoded_msg['id'] = int(msg_id)
decoded_msg['ack'] = True
# common to every message
msg_type_id = int(msg_type)
if msg_type_id in MSG_VALUES:
decoded_msg['type'] = MSG_VALUES[int(msg_type)]
else:
raise Exception("Unknown message type: %s" % msg_type)
decoded_msg['endpoint'] = endpoint
if len(split_data) > 3:
data = split_data[3]
if msg_type == "0": # disconnect
pass
elif msg_type == "1": # connect
decoded_msg['qs'] = data
elif msg_type == "2": # heartbeat
pass
elif msg_type == "3": # message
decoded_msg['data'] = data
elif msg_type == "4": # json msg
decoded_msg['data'] = json.loads(data)
elif msg_type == "5": # event
try:
data = json.loads(data)
except ValueError, e:
print("Invalid JSON event message", data)
decoded_msg['args'] = []
else:
decoded_msg['name'] = data.pop('name')
if 'args' in data:
decoded_msg['args'] = data['args']
else:
decoded_msg['args'] = []
elif msg_type == "6": # ack
if '+' in data:
ackId, data = data.split('+')
decoded_msg['ackId'] = int(ackId)
decoded_msg['args'] = json.loads(data)
else:
decoded_msg['ackId'] = int(data)
decoded_msg['args'] = []
elif msg_type == "7": # error
if '+' in data:
reason, advice = data.split('+')
decoded_msg['reason'] = REASONS_VALUES[int(reason)]
decoded_msg['advice'] = ADVICES_VALUES[int(advice)]
else:
decoded_msg['advice'] = ''
if data != '':
decoded_msg['reason'] = REASONS_VALUES[int(data)]
else:
decoded_msg['reason'] = ''
elif msg_type == "8": # noop
pass
return decoded_msg

View File

@ -0,0 +1,27 @@
from gevent.server import StreamServer
import socket
__all__ = ['FlashPolicyServer']
class FlashPolicyServer(StreamServer):
policyrequest = "<policy-file-request/>"
policy = """<?xml version="1.0"?><!DOCTYPE cross-domain-policy SYSTEM "http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd">
<cross-domain-policy><allow-access-from domain="*" to-ports="*"/></cross-domain-policy>"""
def __init__(self, listener=None, backlog=None):
if listener is None:
listener = ('0.0.0.0', 10843)
StreamServer.__init__(self, listener=listener, backlog=backlog)
def handle(self, sock, address):
# send and read functions should not wait longer than three seconds
sock.settimeout(3)
try:
# try to receive at most 128 bytes (`POLICYREQUEST` is shorter)
input = sock.recv(128)
if input.startswith(FlashPolicyServer.policyrequest):
sock.sendall(FlashPolicyServer.policy)
except socket.timeout:
pass
sock.close()

View File

@ -0,0 +1,83 @@
import sys
import traceback
from socket import error
from gevent.pywsgi import WSGIServer
from socketio.handler import SocketIOHandler
from socketio.policyserver import FlashPolicyServer
from socketio.virtsocket import Socket
__all__ = ['SocketIOServer']
class SocketIOServer(WSGIServer):
"""A WSGI Server with a resource that acts like an SocketIO."""
def __init__(self, *args, **kwargs):
"""
This is just like the standard WSGIServer __init__, except with a
few additional ``kwargs``:
:param resource: The URL which has to be identified as a socket.io request. Defaults to the /socket.io/ URL.
:param transports: Optional list of transports to allow. List of
strings, each string should be one of
handler.SocketIOHandler.handler_types.
:param policy_server: Boolean describing whether or not to use the
Flash policy server. Default True.
:param policy_listener : A tuple containing (host, port) for the
policy server. This is optional and used only if policy server
is set to true. The default value is 0.0.0.0:843
"""
self.sockets = {}
if 'namespace' in kwargs:
print("DEPRECATION WARNING: use resource instead of namespace")
self.resource = kwargs.pop('namespace', 'socket.io')
else:
self.resource = kwargs.pop('resource', 'socket.io')
self.transports = kwargs.pop('transports', None)
if kwargs.pop('policy_server', True):
policylistener = kwargs.pop('policy_listener', (args[0][0], 10843))
self.policy_server = FlashPolicyServer(policylistener)
else:
self.policy_server = None
kwargs['handler_class'] = SocketIOHandler
super(SocketIOServer, self).__init__(*args, **kwargs)
def start_accepting(self):
if self.policy_server is not None:
try:
self.policy_server.start()
except error, ex:
sys.stderr.write(
'FAILED to start flash policy server: %s\n' % (ex, ))
except Exception:
traceback.print_exc()
sys.stderr.write('FAILED to start flash policy server.\n\n')
super(SocketIOServer, self).start_accepting()
def kill(self):
if self.policy_server is not None:
self.policy_server.kill()
super(SocketIOServer, self).kill()
def handle(self, socket, address):
handler = self.handler_class(socket, address, self)
handler.handle()
def get_socket(self, sessid=''):
"""Return an existing or new client Socket."""
socket = self.sockets.get(sessid)
if socket is None:
socket = Socket(self)
self.sockets[socket.sessid] = socket
else:
socket.incr_hits()
return socket

View File

@ -0,0 +1,57 @@
import os
import gevent
from gevent.pool import Pool
from gunicorn.workers.ggevent import GeventPyWSGIWorker
from socketio.server import SocketIOServer
from socketio.handler import SocketIOHandler
class GeventSocketIOBaseWorker(GeventPyWSGIWorker):
""" The base gunicorn worker class """
def run(self):
self.socket.setblocking(1)
pool = Pool(self.worker_connections)
self.server_class.base_env['wsgi.multiprocess'] = \
self.cfg.workers > 1
server = self.server_class(
self.socket, application=self.wsgi,
spawn=pool, handler_class=self.wsgi_handler,
namespace=self.namespace, policy_server=self.policy_server)
server.start()
try:
while self.alive:
self.notify()
if self.ppid != os.getppid():
self.log.info("Parent changed, shutting down: %s", self)
break
gevent.sleep(1.0)
except KeyboardInterrupt:
pass
# try to stop the connections
try:
self.notify()
server.stop(timeout=self.timeout)
except:
pass
class GeventSocketIOWorker(GeventSocketIOBaseWorker):
"""
Default gunicorn worker utilizing gevent
Uses the namespace 'socket.io' and defaults to the flash policy server
being disabled.
"""
server_class = SocketIOServer
wsgi_handler = SocketIOHandler
# We need to define a namespace for the server, it would be nice if this
# was a configuration option, will probably end up how this implemented,
# for now this is just a proof of concept to make sure this will work
namespace = 'socket.io'
policy_server = False # Don't run the flash policy server

View File

@ -0,0 +1,270 @@
import gevent
import urllib
from gevent.queue import Empty
class BaseTransport(object):
"""Base class for all transports. Mostly wraps handler class functions."""
def __init__(self, handler):
self.content_type = ("Content-Type", "text/plain; charset=UTF-8")
self.headers = [
("Access-Control-Allow-Origin", "*"),
("Access-Control-Allow-Credentials", "true"),
("Access-Control-Allow-Methods", "POST, GET, OPTIONS"),
("Access-Control-Max-Age", 3600),
]
self.headers_list = []
self.handler = handler
def write(self, data=""):
if 'Content-Length' not in self.handler.response_headers_list:
self.handler.response_headers.append(('Content-Length', len(data)))
self.handler.response_headers_list.append('Content-Length')
self.handler.write(data)
def start_response(self, status, headers, **kwargs):
if "Content-Type" not in [x[0] for x in headers]:
headers.append(self.content_type)
headers.extend(self.headers)
#print headers
self.handler.start_response(status, headers, **kwargs)
class XHRPollingTransport(BaseTransport):
def __init__(self, *args, **kwargs):
super(XHRPollingTransport, self).__init__(*args, **kwargs)
def options(self):
self.start_response("200 OK", ())
self.write()
return []
def get(self, socket):
socket.heartbeat()
payload = self.get_messages_payload(socket, timeout=5.0)
if not payload:
payload = "8::" # NOOP
self.start_response("200 OK", [])
self.write(payload)
return []
def _request_body(self):
return self.handler.wsgi_input.readline()
def post(self, socket):
for message in self.decode_payload(self._request_body()):
socket.put_server_msg(message)
self.start_response("200 OK", [
("Connection", "close"),
("Content-Type", "text/plain")
])
self.write("1")
return []
def get_messages_payload(self, socket, timeout=None):
"""This will fetch the messages from the Socket's queue, and if
there are many messes, pack multiple messages in one payload and return
"""
try:
msgs = socket.get_multiple_client_msgs(timeout=timeout)
data = self.encode_payload(msgs)
except Empty:
data = ""
return data
def encode_payload(self, messages):
"""Encode list of messages. Expects messages to be unicode.
``messages`` - List of raw messages to encode, if necessary
"""
if not messages:
return ''
if len(messages) == 1:
return messages[0].encode('utf-8')
payload = u''.join(u'\ufffd%d\ufffd%s' % (len(p), p)
for p in messages)
return payload.encode('utf-8')
def decode_payload(self, payload):
"""This function can extract multiple messages from one HTTP payload.
Some times, the XHR/JSONP/.. transports can pack more than one message
on a single packet. They are encoding following the WebSocket
semantics, which need to be reproduced here to unwrap the messages.
The semantics are:
\ufffd + [length as a string] + \ufffd + [payload as a unicode string]
This function returns a list of messages, even though there is only
one.
Inspired by socket.io/lib/transports/http.js
"""
payload = payload.decode('utf-8')
if payload[0] == u"\ufffd":
#print "MULTIMSG FULL", payload
ret = []
while len(payload) != 0:
len_end = payload.find(u"\ufffd", 1)
length = int(payload[1:len_end])
msg_start = len_end + 1
msg_end = length + msg_start
message = payload[msg_start:msg_end]
#print "MULTIMSG", length, message
ret.append(message)
payload = payload[msg_end:]
return ret
return [payload]
def connect(self, socket, request_method):
if not socket.connection_confirmed:
socket.connection_confirmed = True
self.start_response("200 OK", [
("Connection", "close"),
])
self.write("1::") # 'connect' packet
return []
elif request_method in ("GET", "POST", "OPTIONS"):
return getattr(self, request_method.lower())(socket)
else:
raise Exception("No support for the method: " + request_method)
class JSONPolling(XHRPollingTransport):
def __init__(self, handler):
super(JSONPolling, self).__init__(handler)
self.content_type = ("Content-Type", "text/javascript; charset=UTF-8")
def _request_body(self):
data = super(JSONPolling, self)._request_body()
# resolve %20%3F's, take out wrapping d="...", etc..
return urllib.unquote_plus(data)[3:-1] \
.replace(r'\"', '"') \
.replace(r"\\", "\\")
def write(self, data):
"""Just quote out stuff before sending it out"""
# TODO: don't we need to quote this data in here ?
super(JSONPolling, self).write("io.j[0]('%s');" % data)
class XHRMultipartTransport(XHRPollingTransport):
def __init__(self, handler):
super(JSONPolling, self).__init__(handler)
self.content_type = (
"Content-Type",
"multipart/x-mixed-replace;boundary=\"socketio\""
)
def connect(self, socket, request_method):
if request_method == "GET":
# TODO: double verify this, because we're not sure. look at git revs.
heartbeat = socket._spawn_heartbeat()
return [heartbeat] + self.get(socket)
elif request_method == "POST":
return self.post(socket)
else:
raise Exception("No support for such method: " + request_method)
def get(self, socket):
header = "Content-Type: text/plain; charset=UTF-8\r\n\r\n"
self.start_response("200 OK", [("Connection", "keep-alive")])
self.write_multipart("--socketio\r\n")
self.write_multipart(header)
self.write_multipart(str(socket.sessid) + "\r\n")
self.write_multipart("--socketio\r\n")
def chunk():
while True:
payload = self.get_messages_payload(socket)
if not payload:
# That would mean the call to Queue.get() returned Empty,
# so it was in fact killed, since we pass no timeout=..
socket.kill()
break
else:
try:
self.write_multipart(header)
self.write_multipart(payload)
self.write_multipart("--socketio\r\n")
except socket.error:
socket.kill()
break
return [gevent.spawn(chunk)]
class WebsocketTransport(BaseTransport):
def connect(self, socket, request_method):
websocket = self.handler.environ['wsgi.websocket']
websocket.send("1::") # 'connect' packet
def send_into_ws():
while True:
message = socket.get_client_msg()
if message is None:
socket.kill()
break
websocket.send(message)
def read_from_ws():
while True:
message = websocket.receive()
if not message:
socket.kill()
break
else:
if message is not None:
socket.put_server_msg(message)
gr1 = gevent.spawn(send_into_ws)
gr2 = gevent.spawn(read_from_ws)
heartbeat1, heartbeat2 = socket._spawn_heartbeat()
return [gr1, gr2, heartbeat1, heartbeat2]
class FlashSocketTransport(WebsocketTransport):
pass
class HTMLFileTransport(XHRPollingTransport):
"""Not tested at all!"""
def __init__(self, handler):
super(HTMLFileTransport, self).__init__(handler)
self.content_type = ("Content-Type", "text/html")
def write_packed(self, data):
self.write("<script>parent.s._('%s', document);</script>" % data)
def handle_get_response(self, socket):
self.start_response("200 OK", [
("Connection", "keep-alive"),
("Content-Type", "text/html"),
("Transfer-Encoding", "chunked"),
])
self.write("<html><body>" + " " * 244)
payload = self.get_messages_payload(socket, timeout=5.0)
self.write_packed(payload)

View File

@ -0,0 +1,428 @@
"""Virtual Socket implementation, unifies all the Transports into one
single interface, and abstracts the work of the long-polling methods.
This module also has the ``default_error_handler`` implementation.
You can define your own so that the error messages are logged or sent
in a different way
:copyright: 2012, Alexandre Bourget <alexandre.bourget@savoirfairelinux.com>
:moduleauthor: Alexandre Bourget <alexandre.bourget@savoirfairelinux.com>
"""
import random
import weakref
import gevent
from gevent.queue import Queue
from gevent.event import Event
from socketio import packet
def default_error_handler(socket, error_name, error_message, endpoint,
msg_id, quiet):
"""This is the default error handler, you can override this when
calling :func:`socketio.socketio_manage`.
It basically sends an event through the socket with the 'error' name.
See documentation for :meth:`Socket.error`.
:param quiet: if quiet, this handler will not send a packet to the
user, but only log for the server developer.
"""
pkt = dict(type='event', name='error',
args=[error_name, error_message],
endpoint=endpoint)
if msg_id:
pkt['id'] = msg_id
# Send an error event through the Socket
if not quiet:
socket.send_packet(pkt)
# Log that error somewhere for debugging...
print "default_error_handler: %s, %s (endpoint=%s, msg_id=%s)" % (
error_name, error_message, endpoint, msg_id)
class Socket(object):
"""
Virtual Socket implementation, checks heartbeats, writes to local queues
for message passing, holds the Namespace objects, dispatches de packets
to the underlying namespaces.
This is the abstraction on top of the different transports. It's like
if you used a WebSocket only...
"""
STATE_CONNECTING = "CONNECTING"
STATE_CONNECTED = "CONNECTED"
STATE_DISCONNECTING = "DISCONNECTING"
STATE_DISCONNECTED = "DISCONNECTED"
GLOBAL_NS = ''
"""Use this to be explicit when specifying a Global Namespace (an endpoint
with no name, not '/chat' or anything."""
def __init__(self, server, error_handler=None):
self.server = weakref.proxy(server)
self.sessid = str(random.random())[2:]
self.session = {} # the session dict, for general developer usage
self.client_queue = Queue() # queue for messages to client
self.server_queue = Queue() # queue for messages to server
self.hits = 0
self.heartbeats = 0
self.timeout = Event()
self.wsgi_app_greenlet = None
self.state = "NEW"
self.connection_confirmed = False
self.ack_callbacks = {}
self.ack_counter = 0
self.request = None
self.environ = None
self.namespaces = {}
self.active_ns = {} # Namespace sessions that were instantiated
self.jobs = []
self.error_handler = default_error_handler
if error_handler is not None:
self.error_handler = error_handler
def _set_namespaces(self, namespaces):
"""This is a mapping (dict) of the different '/namespaces' to their
BaseNamespace object derivative.
This is called by socketio_manage()."""
self.namespaces = namespaces
def _set_request(self, request):
"""Saves the request object for future use by the different Namespaces.
This is called by socketio_manage().
"""
self.request = request
def _set_environ(self, environ):
"""Save the WSGI environ, for future use.
This is called by socketio_manage().
"""
self.environ = environ
def _set_error_handler(self, error_handler):
"""Changes the default error_handler function to the one specified
This is called by socketio_manage().
"""
self.error_handler = error_handler
def _get_next_msgid(self):
"""This retrieves the next value for the 'id' field when sending
an 'event' or 'message' or 'json' that asks the remote client
to 'ack' back, so that we trigger the local callback.
"""
self.ack_counter += 1
return self.ack_counter
def _save_ack_callback(self, msgid, callback):
"""Keep a reference of the callback on this socket."""
if msgid in self.ack_callbacks:
return False
self.ack_callbacks[msgid] = callback
def _pop_ack_callback(self, msgid):
"""Fetch the callback for a given msgid, if it exists, otherwise,
return None"""
if msgid not in self.ack_callbacks:
return None
return self.ack_callbacks.pop(msgid)
def __str__(self):
result = ['sessid=%r' % self.sessid]
if self.state == self.STATE_CONNECTED:
result.append('connected')
if self.client_queue.qsize():
result.append('client_queue[%s]' % self.client_queue.qsize())
if self.server_queue.qsize():
result.append('server_queue[%s]' % self.server_queue.qsize())
if self.hits:
result.append('hits=%s' % self.hits)
if self.heartbeats:
result.append('heartbeats=%s' % self.heartbeats)
return ' '.join(result)
def __getitem__(self, key):
"""This will get the nested Namespace using its '/chat' reference.
Using this, you can go from one Namespace to the other (to emit, add
ACLs, etc..) with:
adminnamespace.socket['/chat'].add_acl_method('kick-ban')
"""
return self.active_ns[key]
def __hasitem__(self, key):
"""Verifies if the namespace is active (was initialized)"""
return key in self.active_ns
@property
def connected(self):
"""Returns whether the state is CONNECTED or not."""
return self.state == self.STATE_CONNECTED
def incr_hits(self):
self.hits += 1
if self.hits == 1:
self.state = self.STATE_CONNECTED
def heartbeat(self):
"""This makes the heart beat for another X seconds. Call this when
you get a heartbeat packet in.
This clear the heartbeat disconnect timeout (resets for X seconds).
"""
self.timeout.set()
def kill(self):
"""This function must/will be called when a socket is to be completely
shut down, closed by connection timeout, connection error or explicit
disconnection from the client.
It will call all of the Namespace's
:meth:`~socketio.namespace.BaseNamespace.disconnect` methods
so that you can shut-down things properly.
"""
# Clear out the callbacks
self.ack_callbacks = {}
if self.connected:
self.state = self.STATE_DISCONNECTING
self.server_queue.put_nowait(None)
self.client_queue.put_nowait(None)
self.disconnect()
if self.sessid in self.server.sockets:
self.server.sockets.pop(self.sessid)
# gevent.kill(self.wsgi_app_greenlet)
else:
pass # Fail silently
def put_server_msg(self, msg):
"""Writes to the server's pipe, to end up in in the Namespaces"""
self.heartbeat()
self.server_queue.put_nowait(msg)
def put_client_msg(self, msg):
"""Writes to the client's pipe, to end up in the browser"""
self.heartbeat()
self.client_queue.put_nowait(msg)
def get_client_msg(self, **kwargs):
"""Grab a message to send it to the browser"""
return self.client_queue.get(**kwargs)
def get_server_msg(self, **kwargs):
"""Grab a message, to process it by the server and dispatch calls
"""
return self.server_queue.get(**kwargs)
def get_multiple_client_msgs(self, **kwargs):
"""Get multiple messages, in case we're going through the various
XHR-polling methods, on which we can pack more than one message if the
rate is high, and encode the payload for the HTTP channel."""
client_queue = self.client_queue
msgs = [client_queue.get(**kwargs)]
while client_queue.qsize():
msgs.append(client_queue.get())
return msgs
def error(self, error_name, error_message, endpoint=None, msg_id=None,
quiet=False):
"""Send an error to the user, using the custom or default
ErrorHandler configured on the [TODO: Revise this] Socket/Handler
object.
:param error_name: is a simple string, for easy association on
the client side
:param error_message: is a human readable message, the user
will eventually see
:param endpoint: set this if you have a message specific to an
end point
:param msg_id: set this if your error is relative to a
specific message
:param quiet: way to make the error handler quiet. Specific to
the handler. The default handler will only log,
with quiet.
"""
handler = self.error_handler
return handler(
self, error_name, error_message, endpoint, msg_id, quiet)
# User facing low-level function
def disconnect(self, silent=False):
"""Calling this method will call the
:meth:`~socketio.namespace.BaseNamespace.disconnect` method on
all the active Namespaces that were open, killing all their
jobs and sending 'disconnect' packets for each of them.
Normally, the Global namespace (endpoint = '') has special meaning,
as it represents the whole connection,
:param silent: when True, pass on the ``silent`` flag to the Namespace
:meth:`~socketio.namespace.BaseNamespace.disconnect`
calls.
"""
for ns_name, ns in list(self.active_ns.iteritems()):
ns.recv_disconnect()
def remove_namespace(self, namespace):
"""This removes a Namespace object from the socket.
This is usually called by
:meth:`~socketio.namespace.BaseNamespace.disconnect`.
"""
if namespace in self.active_ns:
del self.active_ns[namespace]
def send_packet(self, pkt):
"""Low-level interface to queue a packet on the wire (encoded as wire
protocol"""
self.put_client_msg(packet.encode(pkt))
def spawn(self, fn, *args, **kwargs):
"""Spawn a new Greenlet, attached to this Socket instance.
It will be monitored by the "watcher" method
"""
self.debug("Spawning sub-Socket Greenlet: %s" % fn.__name__)
job = gevent.spawn(fn, *args, **kwargs)
self.jobs.append(job)
return job
def _receiver_loop(self):
"""This is the loop that takes messages from the queue for the server
to consume, decodes them and dispatches them.
"""
while True:
rawdata = self.get_server_msg()
if not rawdata:
continue # or close the connection ?
try:
pkt = packet.decode(rawdata)
except (ValueError, KeyError, Exception), e:
self.error('invalid_packet',
"There was a decoding error when dealing with packet "
"with event: %s... (%s)" % (rawdata[:20], e))
continue
if pkt['type'] == 'heartbeat':
# This is already dealth with in put_server_msg() when
# any incoming raw data arrives.
continue
if pkt['type'] == 'disconnect' and pkt['endpoint'] == '':
# On global namespace, we kill everything.
self.kill()
continue
endpoint = pkt['endpoint']
if endpoint not in self.namespaces:
self.error("no_such_namespace",
"The endpoint you tried to connect to "
"doesn't exist: %s" % endpoint, endpoint=endpoint)
continue
elif endpoint in self.active_ns:
pkt_ns = self.active_ns[endpoint]
else:
new_ns_class = self.namespaces[endpoint]
pkt_ns = new_ns_class(self.environ, endpoint,
request=self.request)
pkt_ns.initialize() # use this instead of __init__,
# for less confusion
self.active_ns[endpoint] = pkt_ns
retval = pkt_ns.process_packet(pkt)
# Has the client requested an 'ack' with the reply parameters ?
if pkt.get('ack') == "data" and pkt.get('id'):
returning_ack = dict(type='ack', ackId=pkt['id'],
args=retval,
endpoint=pkt.get('endpoint', ''))
self.send_packet(returning_ack)
# Now, are we still connected ?
if not self.connected:
self.kill() # ?? what,s the best clean-up when its not a
# user-initiated disconnect
return
def _spawn_receiver_loop(self):
"""Spawns the reader loop. This is called internall by
socketio_manage().
"""
job = gevent.spawn(self._receiver_loop)
self.jobs.append(job)
return job
def _watcher(self):
"""Watch if any of the greenlets for a request have died. If so, kill
the request and the socket.
"""
# TODO: add that if any of the request.jobs die, kill them all and exit
gevent.sleep(5.0)
while True:
gevent.sleep(1.0)
if not self.connected:
# Killing Socket-level jobs
gevent.killall(self.jobs)
for ns_name, ns in list(self.active_ns.iteritems()):
ns.recv_disconnect()
break
def _spawn_watcher(self):
job = gevent.spawn(self._watcher)
return job
def _heartbeat(self):
"""Start the heartbeat Greenlet to check connection health."""
self.state = self.STATE_CONNECTED
while self.connected:
gevent.sleep(5.0) # FIXME: make this a setting
# TODO: this process could use a timeout object like the disconnect
# timeout thing, and ONLY send packets when none are sent!
# We would do that by calling timeout.set() for a "sending"
# timeout. If we're sending 100 messages a second, there is
# no need to push some heartbeats in there also.
self.put_client_msg("2::") # TODO: make it a heartbeat packet
def _disconnect_timeout(self):
self.timeout.clear()
if self.timeout.wait(10.0):
gevent.spawn(self._disconnect_timeout)
else:
self.kill()
def _spawn_heartbeat(self):
"""This functions returns a list of jobs"""
job_sender = gevent.spawn(self._heartbeat)
job_waiter = gevent.spawn(self._disconnect_timeout)
self.jobs.extend((job_sender, job_waiter))
return job_sender, job_waiter