From 35fe4c8448e629350f40d92fe3253c06cbbaaa6f Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Fri, 9 Dec 2016 14:34:32 -0500 Subject: [PATCH] remove pyzmq --- .../commands/run_socketio_service.py | 293 ------------------ awx/main/socket_queue.py | 169 ---------- requirements/requirements.in | 1 - requirements/requirements.txt | 1 - 4 files changed, 464 deletions(-) delete mode 100644 awx/main/management/commands/run_socketio_service.py delete mode 100644 awx/main/socket_queue.py diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py deleted file mode 100644 index 9b7e5a61d2..0000000000 --- a/awx/main/management/commands/run_socketio_service.py +++ /dev/null @@ -1,293 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved. - -# Python -import os -import logging -import urllib -import weakref -from optparse import make_option -from threading import Thread - -# Django -from django.conf import settings -from django.core.management.base import NoArgsCommand - -# AWX -import awx -from awx.main.models import * # noqa -from awx.main.socket_queue import Socket - -# socketio -from socketio import socketio_manage -from socketio.server import SocketIOServer -from socketio.namespace import BaseNamespace - -logger = logging.getLogger('awx.main.commands.run_socketio_service') - - -class SocketSession(object): - def __init__(self, session_id, token_key, socket): - self.socket = weakref.ref(socket) - self.session_id = session_id - self.token_key = token_key - self._valid = True - - def is_valid(self): - return bool(self._valid) - - def invalidate(self): - self._valid = False - - def is_db_token_valid(self): - auth_token = AuthToken.objects.filter(key=self.token_key, reason='') - if not auth_token.exists(): - return False - auth_token = auth_token[0] - return bool(not auth_token.is_expired()) - - -class SocketSessionManager(object): - def __init__(self): - self.SESSIONS_MAX = 1000 - self.socket_sessions = [] - self.socket_session_token_key_map = {} - - def _prune(self): - if len(self.socket_sessions) > self.SESSIONS_MAX: - session = self.socket_sessions[0] - entries = self.socket_session_token_key_map[session.token_key] - del entries[session.session_id] - if len(entries) == 0: - del self.socket_session_token_key_map[session.token_key] - self.socket_sessions.pop(0) - - ''' - Returns an dict of sessions - ''' - def lookup(self, token_key=None): - if not token_key: - raise ValueError("token_key required") - return self.socket_session_token_key_map.get(token_key, None) - - def add_session(self, session): - self.socket_sessions.append(session) - entries = self.socket_session_token_key_map.get(session.token_key, None) - if not entries: - entries = {} - self.socket_session_token_key_map[session.token_key] = entries - entries[session.session_id] = session - self._prune() - return session - - -class SocketController(object): - def __init__(self, SocketSessionManager): - self.server = None - self.SocketSessionManager = SocketSessionManager - - def add_session(self, session): - return self.SocketSessionManager.add_session(session) - - def broadcast_packet(self, packet): - # Broadcast message to everyone at endpoint - # Loop over the 'raw' list of sockets (don't trust our list) - for session_id, socket in list(self.server.sockets.iteritems()): - socket_session = socket.session.get('socket_session', None) - if socket_session and socket_session.is_valid(): - try: - socket.send_packet(packet) - except Exception as e: - logger.error("Error sending client packet to %s: %s" % (str(session_id), str(packet))) - logger.error("Error was: " + str(e)) - - def send_packet(self, packet, token_key): - if not token_key: - raise ValueError("token_key is required") - socket_sessions = self.SocketSessionManager.lookup(token_key=token_key) - # We may not find the socket_session if the user disconnected - # (it's actually more compliciated than that because of our prune logic) - if not socket_sessions: - return None - for session_id, socket_session in socket_sessions.iteritems(): - logger.warn("Maybe sending packet to %s" % session_id) - if socket_session and socket_session.is_valid(): - logger.warn("Sending packet to %s" % session_id) - socket = socket_session.socket() - if socket: - try: - socket.send_packet(packet) - except Exception as e: - logger.error("Error sending client packet to %s: %s" % (str(socket_session.session_id), str(packet))) - logger.error("Error was: " + str(e)) - - def set_server(self, server): - self.server = server - return server - - -socketController = SocketController(SocketSessionManager()) - - -# -# Socket session is attached to self.session['socket_session'] -# self.session and self.socket.session point to the same dict -# -class TowerBaseNamespace(BaseNamespace): - def get_allowed_methods(self): - return ['recv_disconnect'] - - def get_initial_acl(self): - request_token = self._get_request_token() - if request_token: - # (1) This is the first time the socket has been seen (first - # namespace joined). - # (2) This socket has already been seen (already joined and maybe - # left a namespace) - # - # Note: Assume that the user token is valid if the session is found - socket_session = self.session.get('socket_session', None) - if not socket_session: - socket_session = SocketSession(self.socket.sessid, request_token, self.socket) - if socket_session.is_db_token_valid(): - self.session['socket_session'] = socket_session - socketController.add_session(socket_session) - else: - socket_session.invalidate() - - return set(['recv_connect'] + self.get_allowed_methods()) - else: - logger.warn("Authentication Failure validating user") - self.emit("connect_failed", "Authentication failed") - return set(['recv_connect']) - - def _get_request_token(self): - if 'QUERY_STRING' not in self.environ: - return False - - try: - k, v = self.environ['QUERY_STRING'].split("=") - if k == "Token": - token_actual = urllib.unquote_plus(v).decode().replace("\"","") - return token_actual - except Exception as e: - logger.error("Exception validating user: " + str(e)) - return False - return False - - def recv_connect(self): - socket_session = self.session.get('socket_session', None) - if socket_session and not socket_session.is_valid(): - self.disconnect(silent=False) - - -class TestNamespace(TowerBaseNamespace): - def recv_connect(self): - logger.info("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR'])) - self.emit('test', "If you see this then you attempted to connect to the test socket endpoint") - super(TestNamespace, self).recv_connect() - - -class JobNamespace(TowerBaseNamespace): - def recv_connect(self): - logger.info("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR'])) - super(JobNamespace, self).recv_connect() - - -class JobEventNamespace(TowerBaseNamespace): - def recv_connect(self): - logger.info("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR'])) - super(JobEventNamespace, self).recv_connect() - - -class AdHocCommandEventNamespace(TowerBaseNamespace): - def recv_connect(self): - logger.info("Received client connect for ad hoc command event namespace from %s" % str(self.environ['REMOTE_ADDR'])) - super(AdHocCommandEventNamespace, self).recv_connect() - - -class ScheduleNamespace(TowerBaseNamespace): - def get_allowed_methods(self): - parent_allowed = super(ScheduleNamespace, self).get_allowed_methods() - return parent_allowed + ["schedule_changed"] - - def recv_connect(self): - logger.info("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR'])) - super(ScheduleNamespace, self).recv_connect() - - -# Catch-all namespace. -# Deliver 'global' events over this namespace -class ControlNamespace(TowerBaseNamespace): - def recv_connect(self): - logger.warn("Received client connect for control namespace from %s" % str(self.environ['REMOTE_ADDR'])) - super(ControlNamespace, self).recv_connect() - - -class TowerSocket(object): - def __call__(self, environ, start_response): - path = environ['PATH_INFO'].strip('/') or 'index.html' - if path.startswith('socket.io'): - socketio_manage(environ, {'/socket.io/test': TestNamespace, - '/socket.io/jobs': JobNamespace, - '/socket.io/job_events': JobEventNamespace, - '/socket.io/ad_hoc_command_events': AdHocCommandEventNamespace, - '/socket.io/schedules': ScheduleNamespace, - '/socket.io/control': ControlNamespace}) - else: - logger.warn("Invalid connect path received: " + path) - start_response('404 Not Found', []) - return ['Tower version %s' % awx.__version__] - - -def notification_handler(server): - with Socket('websocket', 'r') as websocket: - for message in websocket.listen(): - packet = { - 'args': message, - 'endpoint': message['endpoint'], - 'name': message['event'], - 'type': 'event', - } - - if 'token_key' in message: - # Best practice not to send the token over the socket - socketController.send_packet(packet, message.pop('token_key')) - else: - socketController.broadcast_packet(packet) - - -class Command(NoArgsCommand): - ''' - SocketIO event emitter Tower service - Receives notifications from other services destined for UI notification - ''' - - help = 'Launch the SocketIO event emitter service' - - option_list = NoArgsCommand.option_list + ( - make_option('--receive_port', dest='receive_port', type='int', default=5559, - help='Port to listen for new events that will be destined for a client'), - make_option('--socketio_port', dest='socketio_port', type='int', default=8080, - help='Port to accept socketio requests from clients'),) - - def handle_noargs(self, **options): - socketio_listen_port = settings.SOCKETIO_LISTEN_PORT - - try: - if os.path.exists('/etc/tower/tower.cert') and os.path.exists('/etc/tower/tower.key'): - logger.info('Listening on port https://0.0.0.0:' + str(socketio_listen_port)) - server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io', - keyfile='/etc/tower/tower.key', certfile='/etc/tower/tower.cert') - else: - logger.info('Listening on port http://0.0.0.0:' + str(socketio_listen_port)) - server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io') - - socketController.set_server(server) - handler_thread = Thread(target=notification_handler, args=(server,)) - handler_thread.daemon = True - handler_thread.start() - - server.serve_forever() - except KeyboardInterrupt: - pass diff --git a/awx/main/socket_queue.py b/awx/main/socket_queue.py deleted file mode 100644 index 40dba76366..0000000000 --- a/awx/main/socket_queue.py +++ /dev/null @@ -1,169 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved. - -import os - -import zmq - -from django.conf import settings - - -class Socket(object): - """An abstraction class implemented for a dumb OS socket. - - Intended to allow alteration of backend details in a single, consistent - way throughout the Tower application. - """ - def __init__(self, bucket, rw, debug=0, logger=None, nowait=False): - """Instantiate a Socket object, which uses ZeroMQ to actually perform - passing a message back and forth. - - Designed to be used as a context manager: - - with Socket('callbacks', 'w') as socket: - socket.publish({'message': 'foo bar baz'}) - - If listening for messages through a socket, the `listen` method - is a simple generator: - - with Socket('callbacks', 'r') as socket: - for message in socket.listen(): - [...] - """ - self._bucket = bucket - self._rw = { - 'r': zmq.REP, - 'w': zmq.REQ, - }[rw.lower()] - - self._connection_pid = None - self._context = None - self._socket = None - - self._debug = debug - self._logger = logger - self._nowait = nowait - - def __enter__(self): - self.connect() - return self - - def __exit__(self, *args, **kwargs): - self.close() - - @property - def is_connected(self): - if self._socket: - return True - return False - - @property - def port(self): - return { - 'callbacks': os.environ.get('CALLBACK_CONSUMER_PORT', - getattr(settings, 'CALLBACK_CONSUMER_PORT', 'tcp://127.0.0.1:5557')), - 'task_commands': settings.TASK_COMMAND_PORT, - 'websocket': settings.SOCKETIO_NOTIFICATION_PORT, - 'fact_cache': settings.FACT_CACHE_PORT, - }[self._bucket] - - def connect(self): - """Connect to ZeroMQ.""" - - # Make sure that we are clearing everything out if there is - # a problem; PID crossover can cause bad news. - active_pid = os.getpid() - if self._connection_pid is None: - self._connection_pid = active_pid - if self._connection_pid != active_pid: - self._context = None - self._socket = None - self._connection_pid = active_pid - - # If the port is an integer, convert it into tcp:// - port = self.port - if isinstance(port, int): - port = 'tcp://127.0.0.1:%d' % port - - # If the port is None, then this is an intentional dummy; - # honor this. (For testing.) - if not port: - return - - # Okay, create the connection. - if self._context is None: - self._context = zmq.Context() - self._socket = self._context.socket(self._rw) - if self._nowait: - self._socket.setsockopt(zmq.RCVTIMEO, 2000) - self._socket.setsockopt(zmq.LINGER, 1000) - if self._rw == zmq.REQ: - self._socket.connect(port) - else: - self._socket.bind(port) - - def close(self): - """Disconnect and tear down.""" - if self._socket: - self._socket.close() - self._socket = None - self._context = None - - def publish(self, message): - """Publish a message over the socket.""" - - # If the port is None, no-op. - if self.port is None: - return - - # If we are not connected, whine. - if not self.is_connected: - raise RuntimeError('Cannot publish a message when not connected ' - 'to the socket.') - - # If we are in the wrong mode, whine. - if self._rw != zmq.REQ: - raise RuntimeError('This socket is not opened for writing.') - - # If we are in debug mode; provide the PID. - if self._debug: - message.update({'pid': os.getpid(), - 'connection_pid': self._connection_pid}) - - # Send the message. - for retry in xrange(4): - try: - self._socket.send_json(message) - self._socket.recv() - break - except Exception as ex: - if self._logger: - self._logger.error('Publish Exception: %r; retry=%d', - ex, retry, exc_info=True) - if retry >= 3: - raise - - def listen(self): - """Retrieve a single message from the subcription channel - and return it. - """ - # If the port is None, no-op. - if self.port is None: - raise StopIteration - - # If we are not connected, whine. - if not self.is_connected: - raise RuntimeError('Cannot publish a message when not connected ' - 'to the socket.') - - # If we are in the wrong mode, whine. - if self._rw != zmq.REP: - raise RuntimeError('This socket is not opened for reading.') - - # Actually listen to the socket. - while True: - try: - message = self._socket.recv_json() - yield message - finally: - self._socket.send('1') diff --git a/requirements/requirements.in b/requirements/requirements.in index 886476fecb..3ae18db466 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -38,7 +38,6 @@ python-memcached==1.58 python-radius==1.0 python-saml==2.2.0 python-social-auth==0.2.21 -pyzmq==14.5.0 redbaron==0.6.2 requests-futures==0.9.7 shade==1.13.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index e39a8aed88..03a6d9f614 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -164,7 +164,6 @@ python-swiftclient==3.2.0 # via python-heatclient, python-troveclient, shade python-troveclient==2.6.0 # via shade pytz==2016.7 # via babel, celery, irc, oslo.serialization, oslo.utils, tempora, twilio PyYAML==3.12 # via cliff, djangorestframework-yaml, os-client-config, psphere, python-heatclient, python-ironicclient, python-mistralclient -pyzmq==14.5.0 rackspace-auth-openstack==1.3 # via rackspace-novaclient rackspace-novaclient==2.1 rax-default-network-flags-python-novaclient-ext==0.4.0 # via rackspace-novaclient