Theoretically working Socket implementation.

This commit is contained in:
Luke Sneeringer
2014-11-21 09:24:49 -06:00
parent 99b8b8e193
commit 52aca5a081
5 changed files with 79 additions and 47 deletions

View File

@@ -25,7 +25,7 @@ from django.db import connection
# AWX # AWX
from awx.main.models import * from awx.main.models import *
from awx.main.queue import PubSub from awx.main.socket import Socket
MAX_REQUESTS = 10000 MAX_REQUESTS = 10000
WORKERS = 4 WORKERS = 4
@@ -102,8 +102,8 @@ class CallbackReceiver(object):
total_messages = 0 total_messages = 0
last_parent_events = {} last_parent_events = {}
with closing(PubSub('callbacks')) as callbacks: with Socket('callbacks', 'r') as callbacks:
for message in callbacks.subscribe(wait=0.1): for message in callbacks.listen():
total_messages += 1 total_messages += 1
if not use_workers: if not use_workers:
self.process_job_event(message) self.process_job_event(message)

View File

@@ -24,7 +24,7 @@ from django.utils.tzinfo import FixedOffset
# AWX # AWX
import awx import awx
from awx.main.models import * from awx.main.models import *
from awx.main.queue import PubSub from awx.main.socket import Socket
# gevent & socketio # gevent & socketio
import gevent import gevent
@@ -119,16 +119,16 @@ class TowerSocket(object):
return ['Tower version %s' % awx.__version__] return ['Tower version %s' % awx.__version__]
def notification_handler(server): def notification_handler(server):
pubsub = PubSub('websocket') with Socket('websocket', 'r') as websocket:
for message in pubsub.subscribe(): for message in websocket.listen():
packet = { packet = {
'args': message, 'args': message,
'endpoint': message['endpoint'], 'endpoint': message['endpoint'],
'name': message['event'], 'name': message['event'],
'type': 'event', 'type': 'event',
} }
for session_id, socket in list(server.sockets.iteritems()): for session_id, socket in list(server.sockets.iteritems()):
socket.send_packet(packet) socket.send_packet(packet)
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''

View File

@@ -12,13 +12,26 @@ class Socket(object):
way throughout the Tower application. way throughout the Tower application.
""" """
ports = { ports = {
'callbacks': , 'callbacks': 5556,
'websocket': , 'task_commands': 6559,
'websocket': 6557,
} }
def __init__(self, bucket, rw, debug=0, logger=None): def __init__(self, bucket, rw, debug=0, logger=None):
"""Instantiate a Socket object, which uses ZeroMQ to actually perform """Instantiate a Socket object, which uses ZeroMQ to actually perform
passing a message back and forth. passing a message back and forth.
Designed to be used as a context manager:
with Socket('callbacks', 'w') as socket:
socket.publish({'message': 'foo bar baz'})
If listening for messages through a socket, the `listen` method
is a simple generator:
with Socket('callbacks', 'r') as socket:
for message in socket.listen():
[...]
""" """
self._bucket = bucket self._bucket = bucket
self._rw = { self._rw = {
@@ -37,11 +50,20 @@ class Socket(object):
self.connect() self.connect()
return self return self
def __exit__(self, *args, **kwargs):
self.close()
@property
def is_connected(self):
if self._socket:
return True
return False
@property @property
def port(self): def port(self):
return self.ports[self._bucket] return self.ports[self._bucket]
def connect(self, purpose): def connect(self):
"""Connect to ZeroMQ.""" """Connect to ZeroMQ."""
# Make sure that we are clearing everything out if there is # Make sure that we are clearing everything out if there is
@@ -57,15 +79,30 @@ class Socket(object):
# Okay, create the connection. # Okay, create the connection.
if self._context is None: if self._context is None:
self._context = zmq.Context() self._context = zmq.Context()
self._socket = self._context.socket(purpose) self._socket = self._context.socket(self._rw)
if purpose == self.WRITE: if purpose == zmq.REQ:
self._socket.connect(self.port) self._socket.connect('tcp://127.0.0.1:%d' % self.port)
else: else:
self._socket.bind(self.port) self._socket.bind('tcp://127.0.0.1:%d' % self.port)
def close(self):
"""Disconnect and tear down."""
self._socket.close()
self._socket = None
self._context = None
def publish(self, message): def publish(self, message):
"""Publish a message over the socket.""" """Publish a message over the socket."""
# If we are not connected, whine.
if not self.is_connected:
raise RuntimeError('Cannot publish a message when not connected '
'to the socket.')
# If we are in the wrong mode, whine.
if self._rw != zmq.REQ:
raise RuntimeError('This socket is not opened for writing.')
# If we are in debug mode; provide the PID. # If we are in debug mode; provide the PID.
if self._debug: if self._debug:
message.update({'pid': os.getpid(), message.update({'pid': os.getpid(),
@@ -74,7 +111,6 @@ class Socket(object):
# Send the message. # Send the message.
for retry in xrange(4): for retry in xrange(4):
try: try:
self.connect()
self._socket.send_json(message) self._socket.send_json(message)
self._socket.recv() self._socket.recv()
except Exception as ex: except Exception as ex:
@@ -88,6 +124,16 @@ class Socket(object):
"""Retrieve a single message from the subcription channel """Retrieve a single message from the subcription channel
and return it. and return it.
""" """
# If we are not connected, whine.
if not self.is_connected:
raise RuntimeError('Cannot publish a message when not connected '
'to the socket.')
# If we are in the wrong mode, whine.
if self._rw != zmq.REP:
raise RuntimeError('This socket is not opened for reading.')
# Actually listen to the socket.
while True: while True:
message = self._socket.recv_json() message = self._socket.recv_json()
yield message yield message

View File

@@ -361,11 +361,12 @@ def get_system_task_capacity():
def emit_websocket_notification(endpoint, event, payload): def emit_websocket_notification(endpoint, event, payload):
from awx.main.queue import PubSub from awx.main.socket import Socket
pubsub = PubSub('websocket')
payload['event'] = event with Socket('websocket', 'w') as websocket:
payload['endpoint'] = endpoint payload['event'] = event
pubsub.publish(payload) payload['endpoint'] = endpoint
websocket.publish(payload)
_inventory_updates = threading.local() _inventory_updates = threading.local()

View File

@@ -44,7 +44,7 @@ from contextlib import closing
import requests import requests
# Tower # Tower
from awx.main.queue import PubSub from awx.main.socket import Socket
class TokenAuth(requests.auth.AuthBase): class TokenAuth(requests.auth.AuthBase):
@@ -115,26 +115,11 @@ class CallbackModule(object):
'counter': self.counter, 'counter': self.counter,
'created': datetime.datetime.utcnow().isoformat(), 'created': datetime.datetime.utcnow().isoformat(),
} }
active_pid = os.getpid()
if self.job_callback_debug:
msg.update({
'pid': active_pid,
})
for retry_count in xrange(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
# Publish the callback through Redis. # Publish the callback.
with closing(PubSub('callbacks')) as callbacks: with Socket('callbacks', 'w', debug=self.job_callback_debug,
callbacks.publish(msg) logger=self.logger) as callbacks:
return callbacks.publish(msg)
except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e,
retry_count, exc_info=True)
# TODO: Maybe recycle connection here?
if retry_count >= 3:
raise
def _post_rest_api_event(self, event, event_data): def _post_rest_api_event(self, event, event_data):
data = json.dumps({ data = json.dumps({