Beginning swap of ZeroMQ for Redis.

This commit is contained in:
Luke Sneeringer 2014-10-27 16:18:04 -05:00
parent 842086eef7
commit 99381f11fa
5 changed files with 79 additions and 48 deletions

View File

@ -23,9 +23,7 @@ from django.utils.tzinfo import FixedOffset
# AWX
import awx
from awx.main.models import *
# ZeroMQ
import zmq
from awx.main.queue import PubSub
# gevent & socketio
import gevent
@ -68,14 +66,15 @@ class TowerSocket(object):
start_response('404 Not Found', [])
return ['Tower version %s' % awx.__version__]
def notification_handler(bind_port, server):
handler_context = zmq.Context()
handler_socket = handler_context.socket(zmq.PULL)
handler_socket.bind(bind_port)
while True:
message = handler_socket.recv_json()
packet = dict(type='event', name=message['event'], endpoint=message['endpoint'], args=message)
def notification_handler(server):
pubsub = PubSub('websocket')
for message in pubsub.subscribe():
packet = {
'args': message,
'endpoint': message['endpoint'],
'name': message['event'],
'type': 'event',
}
for session_id, socket in list(server.sockets.iteritems()):
socket.send_packet(packet)
@ -119,7 +118,7 @@ class Command(NoArgsCommand):
server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io')
#gevent.spawn(notification_handler, socketio_notification_port, server)
handler_thread = Thread(target=notification_handler, args = (socketio_notification_port, server,))
handler_thread = Thread(target=notification_handler, args=(server,))
handler_thread.daemon = True
handler_thread.start()

View File

@ -22,15 +22,16 @@ from django.utils.tzinfo import FixedOffset
# AWX
from awx.main.models import *
from awx.main.queue import FifoQueue
from awx.main.tasks import handle_work_error
from awx.main.utils import get_system_task_capacity, decrypt_field
# ZeroMQ
import zmq
# Celery
from celery.task.control import inspect
queue = FifoQueue('tower_task_manager')
class SimpleDAG(object):
''' A simple implementation of a directed acyclic graph '''
@ -280,7 +281,7 @@ def process_graph(graph, task_capacity):
print('Started Node: %s (capacity hit: %s) Remaining Capacity: %s' %
(str(node_obj), str(impact), str(remaining_volume)))
def run_taskmanager(command_port):
def run_taskmanager():
"""Receive task start and finish signals to rebuild a dependency graph
and manage the actual running of tasks.
"""
@ -293,18 +294,23 @@ def run_taskmanager(command_port):
signal.signal(signal.SIGTERM, shutdown_handler())
paused = False
task_capacity = get_system_task_capacity()
command_context = zmq.Context()
command_socket = command_context.socket(zmq.PULL)
command_socket.bind(command_port)
print("Listening on %s" % command_port)
last_rebuild = datetime.datetime.fromtimestamp(0)
# Attempt to pull messages off of the task system queue into perpetuity.
while True:
try:
message = command_socket.recv_json(flags=zmq.NOBLOCK)
except zmq.ZMQError,e:
message = None
if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 10:
if message is not None and 'pause' in message:
# Pop a message off the queue.
# (If the queue is empty, None will be returned.)
message = queue.pop()
# Sanity check: If we got no message back, sleep and continue.
if message is None:
time.sleep(0.1)
continue
# Parse out the message appropriately, rebuilding our graph if
# appropriate.
if (datetime.datetime.now() - last_rebuild).seconds > 10:
if 'pause' in message:
print("Pause command received: %s" % str(message))
paused = message['pause']
graph = rebuild_graph(message)
@ -339,8 +345,7 @@ class Command(NoArgsCommand):
def handle_noargs(self, **options):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()
command_port = settings.TASK_COMMAND_PORT
try:
run_taskmanager(command_port)
run_taskmanager()
except KeyboardInterrupt:
pass

View File

@ -5,7 +5,38 @@ import json
from redis import StrictRedis
redis = StrictRedis('127.0.0.1') # FIXME: Don't hard-code.
from django.conf import settings
__all__ = ['FifoQueue', 'PubSub']
# Determine, based on settings.BROKER_URL (for celery), what the correct Redis
# connection settings are.
redis_kwargs = {}
broker_url = settings.BROKER_URL
if not broker_url.lower().startswith('redis://'):
raise RuntimeError('Error importing awx.main.queue: Cannot use queue with '
'a non-Redis broker configured for celery.')
broker_url = broker_url[8:]
# There may or may not be a password; address both situations by checking
# for an "@" in the broker URL.
if '@' in broker_url:
broker_auth, broker_host = broker_url.split('@')
redis_kwargs['password'] = broker_auth.split(':')[1]
else:
broker_host = broker_url
# Ignore anything after a / in the broker host.
broker_host = broker_host.split('/')[0]
# If a custom port is present, parse it out.
if ':' in broker_host:
broker_host, broker_port = broker_host.split(':')
redis_kwargs['port'] = int(broker_port)
# Now create a StrictRedis object that knows how to connect appropriately.
redis = StrictRedis(broker_host, **redis_kwargs)
class FifoQueue(object):
@ -62,7 +93,7 @@ class PubSub(object):
"""
return self._ps.get_message()
def listen(self, wait=0.001):
def subscribe(self, wait=0.001):
"""Listen to content from the subscription channel indefinitely,
and yield messages as they are retrieved.
"""

View File

@ -26,9 +26,6 @@ import dateutil.parser
# Pexpect
import pexpect
# ZMQ
import zmq
# Celery
from celery import Task, task
from djcelery.models import PeriodicTask
@ -41,7 +38,9 @@ from django.utils.timezone import now
# AWX
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate
from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate,
# Schedule, UnifiedJobTemplate
from awx.main.queue import FifoQueue
from awx.main.utils import (get_ansible_version, decrypt_field, update_scm_url,
ignore_inventory_computed_fields, emit_websocket_notification)
@ -105,10 +104,11 @@ def tower_periodic_scheduler(self):
@task()
def notify_task_runner(metadata_dict):
signal_context = zmq.Context()
signal_socket = signal_context.socket(zmq.PUSH)
signal_socket.connect(settings.TASK_COMMAND_PORT)
signal_socket.send_json(metadata_dict)
"""Add the given task into the Tower task manager's queue, to be consumed
by the task system.
"""
queue = FifoQueue('tower_task_manager')
queue.push(metadata_dict)
@task(bind=True)
def handle_work_error(self, task_id, subtasks=None):

View File

@ -18,8 +18,8 @@ from rest_framework.exceptions import ParseError, PermissionDenied
# PyCrypto
from Crypto.Cipher import AES
# ZeroMQ
import zmq
# Tower
from awx.main.queue import PubSub
__all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
@ -364,14 +364,10 @@ def get_system_task_capacity():
def emit_websocket_notification(endpoint, event, payload):
from django.conf import settings
if getattr(settings, 'SOCKETIO_NOTIFICATION_PORT', None):
emit_context = zmq.Context()
emit_socket = emit_context.socket(zmq.PUSH)
emit_socket.connect(settings.SOCKETIO_NOTIFICATION_PORT)
payload['event'] = event
payload['endpoint'] = endpoint
emit_socket.send_json(payload);
pubsub = PubSub('websocket')
payload['event'] = event
payload['endpoint'] = endpoint
pubsub.publish(payload)
_inventory_updates = threading.local()