diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py index bf8b0b5053..644ef5df75 100644 --- a/awx/main/management/commands/run_socketio_service.py +++ b/awx/main/management/commands/run_socketio_service.py @@ -23,9 +23,7 @@ from django.utils.tzinfo import FixedOffset # AWX import awx from awx.main.models import * - -# ZeroMQ -import zmq +from awx.main.queue import PubSub # gevent & socketio import gevent @@ -68,14 +66,15 @@ class TowerSocket(object): start_response('404 Not Found', []) return ['Tower version %s' % awx.__version__] -def notification_handler(bind_port, server): - handler_context = zmq.Context() - handler_socket = handler_context.socket(zmq.PULL) - handler_socket.bind(bind_port) - - while True: - message = handler_socket.recv_json() - packet = dict(type='event', name=message['event'], endpoint=message['endpoint'], args=message) +def notification_handler(server): + pubsub = PubSub('websocket') + for message in pubsub.subscribe(): + packet = { + 'args': message, + 'endpoint': message['endpoint'], + 'name': message['event'], + 'type': 'event', + } for session_id, socket in list(server.sockets.iteritems()): socket.send_packet(packet) @@ -119,7 +118,7 @@ class Command(NoArgsCommand): server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io') #gevent.spawn(notification_handler, socketio_notification_port, server) - handler_thread = Thread(target=notification_handler, args = (socketio_notification_port, server,)) + handler_thread = Thread(target=notification_handler, args=(server,)) handler_thread.daemon = True handler_thread.start() diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 46cd8ed06e..d83371c150 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -22,15 +22,16 @@ from django.utils.tzinfo import FixedOffset # AWX from awx.main.models import * +from awx.main.queue import FifoQueue from awx.main.tasks import handle_work_error from awx.main.utils import get_system_task_capacity, decrypt_field -# ZeroMQ -import zmq - # Celery from celery.task.control import inspect + +queue = FifoQueue('tower_task_manager') + class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' @@ -280,7 +281,7 @@ def process_graph(graph, task_capacity): print('Started Node: %s (capacity hit: %s) Remaining Capacity: %s' % (str(node_obj), str(impact), str(remaining_volume))) -def run_taskmanager(command_port): +def run_taskmanager(): """Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks. """ @@ -293,18 +294,23 @@ def run_taskmanager(command_port): signal.signal(signal.SIGTERM, shutdown_handler()) paused = False task_capacity = get_system_task_capacity() - command_context = zmq.Context() - command_socket = command_context.socket(zmq.PULL) - command_socket.bind(command_port) - print("Listening on %s" % command_port) last_rebuild = datetime.datetime.fromtimestamp(0) + + # Attempt to pull messages off of the task system queue into perpetuity. while True: - try: - message = command_socket.recv_json(flags=zmq.NOBLOCK) - except zmq.ZMQError,e: - message = None - if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 10: - if message is not None and 'pause' in message: + # Pop a message off the queue. + # (If the queue is empty, None will be returned.) + message = queue.pop() + + # Sanity check: If we got no message back, sleep and continue. + if message is None: + time.sleep(0.1) + continue + + # Parse out the message appropriately, rebuilding our graph if + # appropriate. + if (datetime.datetime.now() - last_rebuild).seconds > 10: + if 'pause' in message: print("Pause command received: %s" % str(message)) paused = message['pause'] graph = rebuild_graph(message) @@ -339,8 +345,7 @@ class Command(NoArgsCommand): def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() - command_port = settings.TASK_COMMAND_PORT try: - run_taskmanager(command_port) + run_taskmanager() except KeyboardInterrupt: pass diff --git a/awx/main/queue.py b/awx/main/queue.py index c29e805843..ecc2f2ef02 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -5,7 +5,38 @@ import json from redis import StrictRedis -redis = StrictRedis('127.0.0.1') # FIXME: Don't hard-code. +from django.conf import settings + +__all__ = ['FifoQueue', 'PubSub'] + + +# Determine, based on settings.BROKER_URL (for celery), what the correct Redis +# connection settings are. +redis_kwargs = {} +broker_url = settings.BROKER_URL +if not broker_url.lower().startswith('redis://'): + raise RuntimeError('Error importing awx.main.queue: Cannot use queue with ' + 'a non-Redis broker configured for celery.') +broker_url = broker_url[8:] + +# There may or may not be a password; address both situations by checking +# for an "@" in the broker URL. +if '@' in broker_url: + broker_auth, broker_host = broker_url.split('@') + redis_kwargs['password'] = broker_auth.split(':')[1] +else: + broker_host = broker_url + +# Ignore anything after a / in the broker host. +broker_host = broker_host.split('/')[0] + +# If a custom port is present, parse it out. +if ':' in broker_host: + broker_host, broker_port = broker_host.split(':') + redis_kwargs['port'] = int(broker_port) + +# Now create a StrictRedis object that knows how to connect appropriately. +redis = StrictRedis(broker_host, **redis_kwargs) class FifoQueue(object): @@ -62,7 +93,7 @@ class PubSub(object): """ return self._ps.get_message() - def listen(self, wait=0.001): + def subscribe(self, wait=0.001): """Listen to content from the subscription channel indefinitely, and yield messages as they are retrieved. """ diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 5b0eea8972..bb9a328892 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -26,9 +26,6 @@ import dateutil.parser # Pexpect import pexpect -# ZMQ -import zmq - # Celery from celery import Task, task from djcelery.models import PeriodicTask @@ -41,7 +38,9 @@ from django.utils.timezone import now # AWX from awx.main.constants import CLOUD_PROVIDERS -from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate +from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, + # Schedule, UnifiedJobTemplate +from awx.main.queue import FifoQueue from awx.main.utils import (get_ansible_version, decrypt_field, update_scm_url, ignore_inventory_computed_fields, emit_websocket_notification) @@ -105,10 +104,11 @@ def tower_periodic_scheduler(self): @task() def notify_task_runner(metadata_dict): - signal_context = zmq.Context() - signal_socket = signal_context.socket(zmq.PUSH) - signal_socket.connect(settings.TASK_COMMAND_PORT) - signal_socket.send_json(metadata_dict) + """Add the given task into the Tower task manager's queue, to be consumed + by the task system. + """ + queue = FifoQueue('tower_task_manager') + queue.push(metadata_dict) @task(bind=True) def handle_work_error(self, task_id, subtasks=None): diff --git a/awx/main/utils.py b/awx/main/utils.py index 87635d5968..53b104862f 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -18,8 +18,8 @@ from rest_framework.exceptions import ParseError, PermissionDenied # PyCrypto from Crypto.Cipher import AES -# ZeroMQ -import zmq +# Tower +from awx.main.queue import PubSub __all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore', @@ -364,14 +364,10 @@ def get_system_task_capacity(): def emit_websocket_notification(endpoint, event, payload): - from django.conf import settings - if getattr(settings, 'SOCKETIO_NOTIFICATION_PORT', None): - emit_context = zmq.Context() - emit_socket = emit_context.socket(zmq.PUSH) - emit_socket.connect(settings.SOCKETIO_NOTIFICATION_PORT) - payload['event'] = event - payload['endpoint'] = endpoint - emit_socket.send_json(payload); + pubsub = PubSub('websocket') + payload['event'] = event + payload['endpoint'] = endpoint + pubsub.publish(payload) _inventory_updates = threading.local()