mirror of
https://github.com/ansible/awx.git
synced 2026-03-13 15:09:32 -02:30
remove pyzmq
This commit is contained in:
@@ -1,293 +0,0 @@
|
|||||||
# Copyright (c) 2015 Ansible, Inc.
|
|
||||||
# All Rights Reserved.
|
|
||||||
|
|
||||||
# Python
|
|
||||||
import os
|
|
||||||
import logging
|
|
||||||
import urllib
|
|
||||||
import weakref
|
|
||||||
from optparse import make_option
|
|
||||||
from threading import Thread
|
|
||||||
|
|
||||||
# Django
|
|
||||||
from django.conf import settings
|
|
||||||
from django.core.management.base import NoArgsCommand
|
|
||||||
|
|
||||||
# AWX
|
|
||||||
import awx
|
|
||||||
from awx.main.models import * # noqa
|
|
||||||
from awx.main.socket_queue import Socket
|
|
||||||
|
|
||||||
# socketio
|
|
||||||
from socketio import socketio_manage
|
|
||||||
from socketio.server import SocketIOServer
|
|
||||||
from socketio.namespace import BaseNamespace
|
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.commands.run_socketio_service')
|
|
||||||
|
|
||||||
|
|
||||||
class SocketSession(object):
|
|
||||||
def __init__(self, session_id, token_key, socket):
|
|
||||||
self.socket = weakref.ref(socket)
|
|
||||||
self.session_id = session_id
|
|
||||||
self.token_key = token_key
|
|
||||||
self._valid = True
|
|
||||||
|
|
||||||
def is_valid(self):
|
|
||||||
return bool(self._valid)
|
|
||||||
|
|
||||||
def invalidate(self):
|
|
||||||
self._valid = False
|
|
||||||
|
|
||||||
def is_db_token_valid(self):
|
|
||||||
auth_token = AuthToken.objects.filter(key=self.token_key, reason='')
|
|
||||||
if not auth_token.exists():
|
|
||||||
return False
|
|
||||||
auth_token = auth_token[0]
|
|
||||||
return bool(not auth_token.is_expired())
|
|
||||||
|
|
||||||
|
|
||||||
class SocketSessionManager(object):
|
|
||||||
def __init__(self):
|
|
||||||
self.SESSIONS_MAX = 1000
|
|
||||||
self.socket_sessions = []
|
|
||||||
self.socket_session_token_key_map = {}
|
|
||||||
|
|
||||||
def _prune(self):
|
|
||||||
if len(self.socket_sessions) > self.SESSIONS_MAX:
|
|
||||||
session = self.socket_sessions[0]
|
|
||||||
entries = self.socket_session_token_key_map[session.token_key]
|
|
||||||
del entries[session.session_id]
|
|
||||||
if len(entries) == 0:
|
|
||||||
del self.socket_session_token_key_map[session.token_key]
|
|
||||||
self.socket_sessions.pop(0)
|
|
||||||
|
|
||||||
'''
|
|
||||||
Returns an dict of sessions <session_id, session>
|
|
||||||
'''
|
|
||||||
def lookup(self, token_key=None):
|
|
||||||
if not token_key:
|
|
||||||
raise ValueError("token_key required")
|
|
||||||
return self.socket_session_token_key_map.get(token_key, None)
|
|
||||||
|
|
||||||
def add_session(self, session):
|
|
||||||
self.socket_sessions.append(session)
|
|
||||||
entries = self.socket_session_token_key_map.get(session.token_key, None)
|
|
||||||
if not entries:
|
|
||||||
entries = {}
|
|
||||||
self.socket_session_token_key_map[session.token_key] = entries
|
|
||||||
entries[session.session_id] = session
|
|
||||||
self._prune()
|
|
||||||
return session
|
|
||||||
|
|
||||||
|
|
||||||
class SocketController(object):
|
|
||||||
def __init__(self, SocketSessionManager):
|
|
||||||
self.server = None
|
|
||||||
self.SocketSessionManager = SocketSessionManager
|
|
||||||
|
|
||||||
def add_session(self, session):
|
|
||||||
return self.SocketSessionManager.add_session(session)
|
|
||||||
|
|
||||||
def broadcast_packet(self, packet):
|
|
||||||
# Broadcast message to everyone at endpoint
|
|
||||||
# Loop over the 'raw' list of sockets (don't trust our list)
|
|
||||||
for session_id, socket in list(self.server.sockets.iteritems()):
|
|
||||||
socket_session = socket.session.get('socket_session', None)
|
|
||||||
if socket_session and socket_session.is_valid():
|
|
||||||
try:
|
|
||||||
socket.send_packet(packet)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Error sending client packet to %s: %s" % (str(session_id), str(packet)))
|
|
||||||
logger.error("Error was: " + str(e))
|
|
||||||
|
|
||||||
def send_packet(self, packet, token_key):
|
|
||||||
if not token_key:
|
|
||||||
raise ValueError("token_key is required")
|
|
||||||
socket_sessions = self.SocketSessionManager.lookup(token_key=token_key)
|
|
||||||
# We may not find the socket_session if the user disconnected
|
|
||||||
# (it's actually more compliciated than that because of our prune logic)
|
|
||||||
if not socket_sessions:
|
|
||||||
return None
|
|
||||||
for session_id, socket_session in socket_sessions.iteritems():
|
|
||||||
logger.warn("Maybe sending packet to %s" % session_id)
|
|
||||||
if socket_session and socket_session.is_valid():
|
|
||||||
logger.warn("Sending packet to %s" % session_id)
|
|
||||||
socket = socket_session.socket()
|
|
||||||
if socket:
|
|
||||||
try:
|
|
||||||
socket.send_packet(packet)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Error sending client packet to %s: %s" % (str(socket_session.session_id), str(packet)))
|
|
||||||
logger.error("Error was: " + str(e))
|
|
||||||
|
|
||||||
def set_server(self, server):
|
|
||||||
self.server = server
|
|
||||||
return server
|
|
||||||
|
|
||||||
|
|
||||||
socketController = SocketController(SocketSessionManager())
|
|
||||||
|
|
||||||
|
|
||||||
#
|
|
||||||
# Socket session is attached to self.session['socket_session']
|
|
||||||
# self.session and self.socket.session point to the same dict
|
|
||||||
#
|
|
||||||
class TowerBaseNamespace(BaseNamespace):
|
|
||||||
def get_allowed_methods(self):
|
|
||||||
return ['recv_disconnect']
|
|
||||||
|
|
||||||
def get_initial_acl(self):
|
|
||||||
request_token = self._get_request_token()
|
|
||||||
if request_token:
|
|
||||||
# (1) This is the first time the socket has been seen (first
|
|
||||||
# namespace joined).
|
|
||||||
# (2) This socket has already been seen (already joined and maybe
|
|
||||||
# left a namespace)
|
|
||||||
#
|
|
||||||
# Note: Assume that the user token is valid if the session is found
|
|
||||||
socket_session = self.session.get('socket_session', None)
|
|
||||||
if not socket_session:
|
|
||||||
socket_session = SocketSession(self.socket.sessid, request_token, self.socket)
|
|
||||||
if socket_session.is_db_token_valid():
|
|
||||||
self.session['socket_session'] = socket_session
|
|
||||||
socketController.add_session(socket_session)
|
|
||||||
else:
|
|
||||||
socket_session.invalidate()
|
|
||||||
|
|
||||||
return set(['recv_connect'] + self.get_allowed_methods())
|
|
||||||
else:
|
|
||||||
logger.warn("Authentication Failure validating user")
|
|
||||||
self.emit("connect_failed", "Authentication failed")
|
|
||||||
return set(['recv_connect'])
|
|
||||||
|
|
||||||
def _get_request_token(self):
|
|
||||||
if 'QUERY_STRING' not in self.environ:
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
k, v = self.environ['QUERY_STRING'].split("=")
|
|
||||||
if k == "Token":
|
|
||||||
token_actual = urllib.unquote_plus(v).decode().replace("\"","")
|
|
||||||
return token_actual
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Exception validating user: " + str(e))
|
|
||||||
return False
|
|
||||||
return False
|
|
||||||
|
|
||||||
def recv_connect(self):
|
|
||||||
socket_session = self.session.get('socket_session', None)
|
|
||||||
if socket_session and not socket_session.is_valid():
|
|
||||||
self.disconnect(silent=False)
|
|
||||||
|
|
||||||
|
|
||||||
class TestNamespace(TowerBaseNamespace):
|
|
||||||
def recv_connect(self):
|
|
||||||
logger.info("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
|
||||||
self.emit('test', "If you see this then you attempted to connect to the test socket endpoint")
|
|
||||||
super(TestNamespace, self).recv_connect()
|
|
||||||
|
|
||||||
|
|
||||||
class JobNamespace(TowerBaseNamespace):
|
|
||||||
def recv_connect(self):
|
|
||||||
logger.info("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
|
||||||
super(JobNamespace, self).recv_connect()
|
|
||||||
|
|
||||||
|
|
||||||
class JobEventNamespace(TowerBaseNamespace):
|
|
||||||
def recv_connect(self):
|
|
||||||
logger.info("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
|
||||||
super(JobEventNamespace, self).recv_connect()
|
|
||||||
|
|
||||||
|
|
||||||
class AdHocCommandEventNamespace(TowerBaseNamespace):
|
|
||||||
def recv_connect(self):
|
|
||||||
logger.info("Received client connect for ad hoc command event namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
|
||||||
super(AdHocCommandEventNamespace, self).recv_connect()
|
|
||||||
|
|
||||||
|
|
||||||
class ScheduleNamespace(TowerBaseNamespace):
|
|
||||||
def get_allowed_methods(self):
|
|
||||||
parent_allowed = super(ScheduleNamespace, self).get_allowed_methods()
|
|
||||||
return parent_allowed + ["schedule_changed"]
|
|
||||||
|
|
||||||
def recv_connect(self):
|
|
||||||
logger.info("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
|
||||||
super(ScheduleNamespace, self).recv_connect()
|
|
||||||
|
|
||||||
|
|
||||||
# Catch-all namespace.
|
|
||||||
# Deliver 'global' events over this namespace
|
|
||||||
class ControlNamespace(TowerBaseNamespace):
|
|
||||||
def recv_connect(self):
|
|
||||||
logger.warn("Received client connect for control namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
|
||||||
super(ControlNamespace, self).recv_connect()
|
|
||||||
|
|
||||||
|
|
||||||
class TowerSocket(object):
|
|
||||||
def __call__(self, environ, start_response):
|
|
||||||
path = environ['PATH_INFO'].strip('/') or 'index.html'
|
|
||||||
if path.startswith('socket.io'):
|
|
||||||
socketio_manage(environ, {'/socket.io/test': TestNamespace,
|
|
||||||
'/socket.io/jobs': JobNamespace,
|
|
||||||
'/socket.io/job_events': JobEventNamespace,
|
|
||||||
'/socket.io/ad_hoc_command_events': AdHocCommandEventNamespace,
|
|
||||||
'/socket.io/schedules': ScheduleNamespace,
|
|
||||||
'/socket.io/control': ControlNamespace})
|
|
||||||
else:
|
|
||||||
logger.warn("Invalid connect path received: " + path)
|
|
||||||
start_response('404 Not Found', [])
|
|
||||||
return ['Tower version %s' % awx.__version__]
|
|
||||||
|
|
||||||
|
|
||||||
def notification_handler(server):
|
|
||||||
with Socket('websocket', 'r') as websocket:
|
|
||||||
for message in websocket.listen():
|
|
||||||
packet = {
|
|
||||||
'args': message,
|
|
||||||
'endpoint': message['endpoint'],
|
|
||||||
'name': message['event'],
|
|
||||||
'type': 'event',
|
|
||||||
}
|
|
||||||
|
|
||||||
if 'token_key' in message:
|
|
||||||
# Best practice not to send the token over the socket
|
|
||||||
socketController.send_packet(packet, message.pop('token_key'))
|
|
||||||
else:
|
|
||||||
socketController.broadcast_packet(packet)
|
|
||||||
|
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
|
||||||
'''
|
|
||||||
SocketIO event emitter Tower service
|
|
||||||
Receives notifications from other services destined for UI notification
|
|
||||||
'''
|
|
||||||
|
|
||||||
help = 'Launch the SocketIO event emitter service'
|
|
||||||
|
|
||||||
option_list = NoArgsCommand.option_list + (
|
|
||||||
make_option('--receive_port', dest='receive_port', type='int', default=5559,
|
|
||||||
help='Port to listen for new events that will be destined for a client'),
|
|
||||||
make_option('--socketio_port', dest='socketio_port', type='int', default=8080,
|
|
||||||
help='Port to accept socketio requests from clients'),)
|
|
||||||
|
|
||||||
def handle_noargs(self, **options):
|
|
||||||
socketio_listen_port = settings.SOCKETIO_LISTEN_PORT
|
|
||||||
|
|
||||||
try:
|
|
||||||
if os.path.exists('/etc/tower/tower.cert') and os.path.exists('/etc/tower/tower.key'):
|
|
||||||
logger.info('Listening on port https://0.0.0.0:' + str(socketio_listen_port))
|
|
||||||
server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io',
|
|
||||||
keyfile='/etc/tower/tower.key', certfile='/etc/tower/tower.cert')
|
|
||||||
else:
|
|
||||||
logger.info('Listening on port http://0.0.0.0:' + str(socketio_listen_port))
|
|
||||||
server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io')
|
|
||||||
|
|
||||||
socketController.set_server(server)
|
|
||||||
handler_thread = Thread(target=notification_handler, args=(server,))
|
|
||||||
handler_thread.daemon = True
|
|
||||||
handler_thread.start()
|
|
||||||
|
|
||||||
server.serve_forever()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
@@ -1,169 +0,0 @@
|
|||||||
# Copyright (c) 2015 Ansible, Inc.
|
|
||||||
# All Rights Reserved.
|
|
||||||
|
|
||||||
import os
|
|
||||||
|
|
||||||
import zmq
|
|
||||||
|
|
||||||
from django.conf import settings
|
|
||||||
|
|
||||||
|
|
||||||
class Socket(object):
|
|
||||||
"""An abstraction class implemented for a dumb OS socket.
|
|
||||||
|
|
||||||
Intended to allow alteration of backend details in a single, consistent
|
|
||||||
way throughout the Tower application.
|
|
||||||
"""
|
|
||||||
def __init__(self, bucket, rw, debug=0, logger=None, nowait=False):
|
|
||||||
"""Instantiate a Socket object, which uses ZeroMQ to actually perform
|
|
||||||
passing a message back and forth.
|
|
||||||
|
|
||||||
Designed to be used as a context manager:
|
|
||||||
|
|
||||||
with Socket('callbacks', 'w') as socket:
|
|
||||||
socket.publish({'message': 'foo bar baz'})
|
|
||||||
|
|
||||||
If listening for messages through a socket, the `listen` method
|
|
||||||
is a simple generator:
|
|
||||||
|
|
||||||
with Socket('callbacks', 'r') as socket:
|
|
||||||
for message in socket.listen():
|
|
||||||
[...]
|
|
||||||
"""
|
|
||||||
self._bucket = bucket
|
|
||||||
self._rw = {
|
|
||||||
'r': zmq.REP,
|
|
||||||
'w': zmq.REQ,
|
|
||||||
}[rw.lower()]
|
|
||||||
|
|
||||||
self._connection_pid = None
|
|
||||||
self._context = None
|
|
||||||
self._socket = None
|
|
||||||
|
|
||||||
self._debug = debug
|
|
||||||
self._logger = logger
|
|
||||||
self._nowait = nowait
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
self.connect()
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, *args, **kwargs):
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_connected(self):
|
|
||||||
if self._socket:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def port(self):
|
|
||||||
return {
|
|
||||||
'callbacks': os.environ.get('CALLBACK_CONSUMER_PORT',
|
|
||||||
getattr(settings, 'CALLBACK_CONSUMER_PORT', 'tcp://127.0.0.1:5557')),
|
|
||||||
'task_commands': settings.TASK_COMMAND_PORT,
|
|
||||||
'websocket': settings.SOCKETIO_NOTIFICATION_PORT,
|
|
||||||
'fact_cache': settings.FACT_CACHE_PORT,
|
|
||||||
}[self._bucket]
|
|
||||||
|
|
||||||
def connect(self):
|
|
||||||
"""Connect to ZeroMQ."""
|
|
||||||
|
|
||||||
# Make sure that we are clearing everything out if there is
|
|
||||||
# a problem; PID crossover can cause bad news.
|
|
||||||
active_pid = os.getpid()
|
|
||||||
if self._connection_pid is None:
|
|
||||||
self._connection_pid = active_pid
|
|
||||||
if self._connection_pid != active_pid:
|
|
||||||
self._context = None
|
|
||||||
self._socket = None
|
|
||||||
self._connection_pid = active_pid
|
|
||||||
|
|
||||||
# If the port is an integer, convert it into tcp://
|
|
||||||
port = self.port
|
|
||||||
if isinstance(port, int):
|
|
||||||
port = 'tcp://127.0.0.1:%d' % port
|
|
||||||
|
|
||||||
# If the port is None, then this is an intentional dummy;
|
|
||||||
# honor this. (For testing.)
|
|
||||||
if not port:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Okay, create the connection.
|
|
||||||
if self._context is None:
|
|
||||||
self._context = zmq.Context()
|
|
||||||
self._socket = self._context.socket(self._rw)
|
|
||||||
if self._nowait:
|
|
||||||
self._socket.setsockopt(zmq.RCVTIMEO, 2000)
|
|
||||||
self._socket.setsockopt(zmq.LINGER, 1000)
|
|
||||||
if self._rw == zmq.REQ:
|
|
||||||
self._socket.connect(port)
|
|
||||||
else:
|
|
||||||
self._socket.bind(port)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""Disconnect and tear down."""
|
|
||||||
if self._socket:
|
|
||||||
self._socket.close()
|
|
||||||
self._socket = None
|
|
||||||
self._context = None
|
|
||||||
|
|
||||||
def publish(self, message):
|
|
||||||
"""Publish a message over the socket."""
|
|
||||||
|
|
||||||
# If the port is None, no-op.
|
|
||||||
if self.port is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
# If we are not connected, whine.
|
|
||||||
if not self.is_connected:
|
|
||||||
raise RuntimeError('Cannot publish a message when not connected '
|
|
||||||
'to the socket.')
|
|
||||||
|
|
||||||
# If we are in the wrong mode, whine.
|
|
||||||
if self._rw != zmq.REQ:
|
|
||||||
raise RuntimeError('This socket is not opened for writing.')
|
|
||||||
|
|
||||||
# If we are in debug mode; provide the PID.
|
|
||||||
if self._debug:
|
|
||||||
message.update({'pid': os.getpid(),
|
|
||||||
'connection_pid': self._connection_pid})
|
|
||||||
|
|
||||||
# Send the message.
|
|
||||||
for retry in xrange(4):
|
|
||||||
try:
|
|
||||||
self._socket.send_json(message)
|
|
||||||
self._socket.recv()
|
|
||||||
break
|
|
||||||
except Exception as ex:
|
|
||||||
if self._logger:
|
|
||||||
self._logger.error('Publish Exception: %r; retry=%d',
|
|
||||||
ex, retry, exc_info=True)
|
|
||||||
if retry >= 3:
|
|
||||||
raise
|
|
||||||
|
|
||||||
def listen(self):
|
|
||||||
"""Retrieve a single message from the subcription channel
|
|
||||||
and return it.
|
|
||||||
"""
|
|
||||||
# If the port is None, no-op.
|
|
||||||
if self.port is None:
|
|
||||||
raise StopIteration
|
|
||||||
|
|
||||||
# If we are not connected, whine.
|
|
||||||
if not self.is_connected:
|
|
||||||
raise RuntimeError('Cannot publish a message when not connected '
|
|
||||||
'to the socket.')
|
|
||||||
|
|
||||||
# If we are in the wrong mode, whine.
|
|
||||||
if self._rw != zmq.REP:
|
|
||||||
raise RuntimeError('This socket is not opened for reading.')
|
|
||||||
|
|
||||||
# Actually listen to the socket.
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
message = self._socket.recv_json()
|
|
||||||
yield message
|
|
||||||
finally:
|
|
||||||
self._socket.send('1')
|
|
||||||
@@ -38,7 +38,6 @@ python-memcached==1.58
|
|||||||
python-radius==1.0
|
python-radius==1.0
|
||||||
python-saml==2.2.0
|
python-saml==2.2.0
|
||||||
python-social-auth==0.2.21
|
python-social-auth==0.2.21
|
||||||
pyzmq==14.5.0
|
|
||||||
redbaron==0.6.2
|
redbaron==0.6.2
|
||||||
requests-futures==0.9.7
|
requests-futures==0.9.7
|
||||||
shade==1.13.1
|
shade==1.13.1
|
||||||
|
|||||||
@@ -164,7 +164,6 @@ python-swiftclient==3.2.0 # via python-heatclient, python-troveclient, shade
|
|||||||
python-troveclient==2.6.0 # via shade
|
python-troveclient==2.6.0 # via shade
|
||||||
pytz==2016.7 # via babel, celery, irc, oslo.serialization, oslo.utils, tempora, twilio
|
pytz==2016.7 # via babel, celery, irc, oslo.serialization, oslo.utils, tempora, twilio
|
||||||
PyYAML==3.12 # via cliff, djangorestframework-yaml, os-client-config, psphere, python-heatclient, python-ironicclient, python-mistralclient
|
PyYAML==3.12 # via cliff, djangorestframework-yaml, os-client-config, psphere, python-heatclient, python-ironicclient, python-mistralclient
|
||||||
pyzmq==14.5.0
|
|
||||||
rackspace-auth-openstack==1.3 # via rackspace-novaclient
|
rackspace-auth-openstack==1.3 # via rackspace-novaclient
|
||||||
rackspace-novaclient==2.1
|
rackspace-novaclient==2.1
|
||||||
rax-default-network-flags-python-novaclient-ext==0.4.0 # via rackspace-novaclient
|
rax-default-network-flags-python-novaclient-ext==0.4.0 # via rackspace-novaclient
|
||||||
|
|||||||
Reference in New Issue
Block a user