From 3b07d9745deaaacbdd0b38556a19e3c7b555710f Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 18 Nov 2016 09:44:17 -0500 Subject: [PATCH] Port fact caching system to rabbitmq * Purge all references to zmq also * New setting to control the queue it works on --- .../commands/run_fact_cache_receiver.py | 77 +++++++++++-------- awx/main/queue.py | 33 +------- awx/main/tasks.py | 1 + awx/plugins/fact_caching/tower.py | 36 ++++----- awx/settings/defaults.py | 2 + 5 files changed, 63 insertions(+), 86 deletions(-) diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index afc6095309..78bf88ffb4 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -6,6 +6,9 @@ import logging from threading import Thread from datetime import datetime +from kombu import Connection, Exchange, Queue +from kombu.mixins import ConsumerMixin + # Django from django.core.management.base import NoArgsCommand from django.conf import settings @@ -23,6 +26,34 @@ class FactCacheReceiver(object): def __init__(self): self.timestamp = None + + def run_receiver(self, use_processing_threads=True): + with Socket('fact_cache', 'r') as facts: + for message in facts.listen(): + if 'host' not in message or 'facts' not in message or 'date_key' not in message: + logger.warn('Received invalid message %s' % message) + continue + logger.info('Received message %s' % message) + if use_processing_threads: + wt = Thread(target=self.process_fact_message, args=(message,)) + wt.start() + else: + self.process_fact_message(message) + + +class FactBrokerWorker(ConsumerMixin): + + def __init__(self, connection): + self.connection = connection + self.timestamp = None + + def get_consumers(self, Consumer, channel): + return [Consumer(queues=[Queue(settings.FACT_QUEUE, + Exchange(settings.FACT_QUEUE, type='direct'), + routing_key=settings.FACT_QUEUE)], + accept=['json'], + callbacks=[self.process_fact_message])] + def _determine_module(self, facts): # Symantically determine the module type if len(facts) == 1: @@ -40,17 +71,13 @@ class FactCacheReceiver(object): facts = self._extract_module_facts(module, facts) return (module, facts) - def process_fact_message(self, message): - hostname = message['host'] - inventory_id = message['inventory_id'] - facts_data = message['facts'] - date_key = message['date_key'] - - # TODO: in ansible < v2 module_setup is emitted for "smart" fact caching. - # ansible v2 will not emit this message. Thus, this can be removed at that time. - if 'module_setup' in facts_data and len(facts_data) == 1: - logger.info('Received module_setup message') - return None + def process_fact_message(self, body, message): + print body + print type(body) + hostname = body['host'] + inventory_id = body['inventory_id'] + facts_data = body['facts'] + date_key = body['date_key'] try: host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id) @@ -79,32 +106,18 @@ class FactCacheReceiver(object): logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) return fact_obj - def run_receiver(self, use_processing_threads=True): - with Socket('fact_cache', 'r') as facts: - for message in facts.listen(): - if 'host' not in message or 'facts' not in message or 'date_key' not in message: - logger.warn('Received invalid message %s' % message) - continue - logger.info('Received message %s' % message) - if use_processing_threads: - wt = Thread(target=self.process_fact_message, args=(message,)) - wt.start() - else: - self.process_fact_message(message) - class Command(NoArgsCommand): ''' - blah blah + Save Fact Event packets to the database as emitted from a Tower Scan Job ''' help = 'Launch the Fact Cache Receiver' def handle_noargs(self, **options): - fcr = FactCacheReceiver() - fact_cache_port = settings.FACT_CACHE_PORT - logger.info('Listening on port http://0.0.0.0:' + str(fact_cache_port)) - try: - fcr.run_receiver() - except KeyboardInterrupt: - pass + with Connection(settings.BROKER_URL) as conn: + try: + worker = FactBrokerWorker(conn) + worker.run() + except KeyboardInterrupt: + pass diff --git a/awx/main/queue.py b/awx/main/queue.py index bfb487441f..541724405f 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -2,7 +2,6 @@ # All Rights Reserved. # Python -import json import logging import os @@ -12,37 +11,7 @@ from django.conf import settings # Kombu from kombu import Connection, Exchange, Producer -__all__ = ['FifoQueue', 'CallbackQueueDispatcher'] - - -# TODO: Figure out wtf to do with this class -class FifoQueue(object): - """An abstraction class implemented for a simple push/pull queue. - - Intended to allow alteration of backend details in a single, consistent - way throughout the Tower application. - """ - def __init__(self, queue_name): - """Instantiate a queue object, which is able to interact with a - particular queue. - """ - self._queue_name = queue_name - - def __len__(self): - """Return the length of the Redis list.""" - #return redis.llen(self._queue_name) - return 0 - - def push(self, value): - """Push a value onto the right side of the queue.""" - #redis.rpush(self._queue_name, json.dumps(value)) - - def pop(self): - """Retrieve a value from the left side of the queue.""" - #answer = redis.lpop(self._queue_name) - answer = None - if answer: - return json.loads(answer) +__all__ = ['CallbackQueueDispatcher'] class CallbackQueueDispatcher(object): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 47a6583e9a..92ed8b7a5d 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -852,6 +852,7 @@ class RunJob(BaseTask): # Set environment variables related to scan jobs if job.job_type == PERM_INVENTORY_SCAN: + env['FACT_QUEUE'] = settings.FACT_QUEUE env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') env['ANSIBLE_CACHE_PLUGIN'] = "tower" diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index a956a5f50b..00f280a884 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -29,20 +29,15 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -import sys import os import time -from ansible import constants as C + try: from ansible.cache.base import BaseCacheModule except: from ansible.plugins.cache.base import BaseCacheModule -try: - import zmq -except ImportError: - print("pyzmq is required") - sys.exit(1) +from kombu import Connection, Exchange, Producer class CacheModule(BaseCacheModule): @@ -52,20 +47,13 @@ class CacheModule(BaseCacheModule): self._cache = {} self._all_keys = {} - # This is the local tower zmq connection - self._tower_connection = C.CACHE_PLUGIN_CONNECTION self.date_key = time.time() - try: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.RCVTIMEO, 4000) - self.socket.setsockopt(zmq.LINGER, 2000) - self.socket.connect(self._tower_connection) - except Exception, e: - print("Connection to zeromq failed at %s with error: %s" % (str(self._tower_connection), - str(e))) - sys.exit(1) - + self.callback_connection = os.environ['CALLBACK_CONNECTION'] + self.callback_queue = os.environ['FACT_QUEUE'] + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.callback_queue, type='direct') + self.producer = Producer(self.connection) + def filter_ansible_facts(self, facts): return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_')) @@ -117,8 +105,12 @@ class CacheModule(BaseCacheModule): } # Emit fact data to tower for processing - self.socket.send_json(packet) - self.socket.recv() + self.producer.publish(packet, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.callback_queue) def keys(self): return self._cache.keys() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 69cd7f232b..01264de6ba 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -797,6 +797,8 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT CALLBACK_QUEUE = "callback_tasks" +FACT_QUEUE = "facts" + SCHEDULER_QUEUE = "scheduler" TASK_COMMAND_PORT = 6559