mirror of
https://github.com/ansible/awx.git
synced 2026-05-23 16:47:45 -02:30
Port fact caching system to rabbitmq
* Purge all references to zmq also * New setting to control the queue it works on
This commit is contained in:
@@ -6,6 +6,9 @@ import logging
|
|||||||
from threading import Thread
|
from threading import Thread
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from kombu import Connection, Exchange, Queue
|
||||||
|
from kombu.mixins import ConsumerMixin
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.core.management.base import NoArgsCommand
|
from django.core.management.base import NoArgsCommand
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -23,6 +26,34 @@ class FactCacheReceiver(object):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.timestamp = None
|
self.timestamp = None
|
||||||
|
|
||||||
|
|
||||||
|
def run_receiver(self, use_processing_threads=True):
|
||||||
|
with Socket('fact_cache', 'r') as facts:
|
||||||
|
for message in facts.listen():
|
||||||
|
if 'host' not in message or 'facts' not in message or 'date_key' not in message:
|
||||||
|
logger.warn('Received invalid message %s' % message)
|
||||||
|
continue
|
||||||
|
logger.info('Received message %s' % message)
|
||||||
|
if use_processing_threads:
|
||||||
|
wt = Thread(target=self.process_fact_message, args=(message,))
|
||||||
|
wt.start()
|
||||||
|
else:
|
||||||
|
self.process_fact_message(message)
|
||||||
|
|
||||||
|
|
||||||
|
class FactBrokerWorker(ConsumerMixin):
|
||||||
|
|
||||||
|
def __init__(self, connection):
|
||||||
|
self.connection = connection
|
||||||
|
self.timestamp = None
|
||||||
|
|
||||||
|
def get_consumers(self, Consumer, channel):
|
||||||
|
return [Consumer(queues=[Queue(settings.FACT_QUEUE,
|
||||||
|
Exchange(settings.FACT_QUEUE, type='direct'),
|
||||||
|
routing_key=settings.FACT_QUEUE)],
|
||||||
|
accept=['json'],
|
||||||
|
callbacks=[self.process_fact_message])]
|
||||||
|
|
||||||
def _determine_module(self, facts):
|
def _determine_module(self, facts):
|
||||||
# Symantically determine the module type
|
# Symantically determine the module type
|
||||||
if len(facts) == 1:
|
if len(facts) == 1:
|
||||||
@@ -40,17 +71,13 @@ class FactCacheReceiver(object):
|
|||||||
facts = self._extract_module_facts(module, facts)
|
facts = self._extract_module_facts(module, facts)
|
||||||
return (module, facts)
|
return (module, facts)
|
||||||
|
|
||||||
def process_fact_message(self, message):
|
def process_fact_message(self, body, message):
|
||||||
hostname = message['host']
|
print body
|
||||||
inventory_id = message['inventory_id']
|
print type(body)
|
||||||
facts_data = message['facts']
|
hostname = body['host']
|
||||||
date_key = message['date_key']
|
inventory_id = body['inventory_id']
|
||||||
|
facts_data = body['facts']
|
||||||
# TODO: in ansible < v2 module_setup is emitted for "smart" fact caching.
|
date_key = body['date_key']
|
||||||
# ansible v2 will not emit this message. Thus, this can be removed at that time.
|
|
||||||
if 'module_setup' in facts_data and len(facts_data) == 1:
|
|
||||||
logger.info('Received module_setup message')
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
|
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
|
||||||
@@ -79,32 +106,18 @@ class FactCacheReceiver(object):
|
|||||||
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
|
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
|
||||||
return fact_obj
|
return fact_obj
|
||||||
|
|
||||||
def run_receiver(self, use_processing_threads=True):
|
|
||||||
with Socket('fact_cache', 'r') as facts:
|
|
||||||
for message in facts.listen():
|
|
||||||
if 'host' not in message or 'facts' not in message or 'date_key' not in message:
|
|
||||||
logger.warn('Received invalid message %s' % message)
|
|
||||||
continue
|
|
||||||
logger.info('Received message %s' % message)
|
|
||||||
if use_processing_threads:
|
|
||||||
wt = Thread(target=self.process_fact_message, args=(message,))
|
|
||||||
wt.start()
|
|
||||||
else:
|
|
||||||
self.process_fact_message(message)
|
|
||||||
|
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
class Command(NoArgsCommand):
|
||||||
'''
|
'''
|
||||||
blah blah
|
Save Fact Event packets to the database as emitted from a Tower Scan Job
|
||||||
'''
|
'''
|
||||||
help = 'Launch the Fact Cache Receiver'
|
help = 'Launch the Fact Cache Receiver'
|
||||||
|
|
||||||
def handle_noargs(self, **options):
|
def handle_noargs(self, **options):
|
||||||
fcr = FactCacheReceiver()
|
with Connection(settings.BROKER_URL) as conn:
|
||||||
fact_cache_port = settings.FACT_CACHE_PORT
|
try:
|
||||||
logger.info('Listening on port http://0.0.0.0:' + str(fact_cache_port))
|
worker = FactBrokerWorker(conn)
|
||||||
try:
|
worker.run()
|
||||||
fcr.run_receiver()
|
except KeyboardInterrupt:
|
||||||
except KeyboardInterrupt:
|
pass
|
||||||
pass
|
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@
|
|||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
|
|
||||||
# Python
|
# Python
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
@@ -12,37 +11,7 @@ from django.conf import settings
|
|||||||
# Kombu
|
# Kombu
|
||||||
from kombu import Connection, Exchange, Producer
|
from kombu import Connection, Exchange, Producer
|
||||||
|
|
||||||
__all__ = ['FifoQueue', 'CallbackQueueDispatcher']
|
__all__ = ['CallbackQueueDispatcher']
|
||||||
|
|
||||||
|
|
||||||
# TODO: Figure out wtf to do with this class
|
|
||||||
class FifoQueue(object):
|
|
||||||
"""An abstraction class implemented for a simple push/pull queue.
|
|
||||||
|
|
||||||
Intended to allow alteration of backend details in a single, consistent
|
|
||||||
way throughout the Tower application.
|
|
||||||
"""
|
|
||||||
def __init__(self, queue_name):
|
|
||||||
"""Instantiate a queue object, which is able to interact with a
|
|
||||||
particular queue.
|
|
||||||
"""
|
|
||||||
self._queue_name = queue_name
|
|
||||||
|
|
||||||
def __len__(self):
|
|
||||||
"""Return the length of the Redis list."""
|
|
||||||
#return redis.llen(self._queue_name)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def push(self, value):
|
|
||||||
"""Push a value onto the right side of the queue."""
|
|
||||||
#redis.rpush(self._queue_name, json.dumps(value))
|
|
||||||
|
|
||||||
def pop(self):
|
|
||||||
"""Retrieve a value from the left side of the queue."""
|
|
||||||
#answer = redis.lpop(self._queue_name)
|
|
||||||
answer = None
|
|
||||||
if answer:
|
|
||||||
return json.loads(answer)
|
|
||||||
|
|
||||||
|
|
||||||
class CallbackQueueDispatcher(object):
|
class CallbackQueueDispatcher(object):
|
||||||
|
|||||||
@@ -852,6 +852,7 @@ class RunJob(BaseTask):
|
|||||||
|
|
||||||
# Set environment variables related to scan jobs
|
# Set environment variables related to scan jobs
|
||||||
if job.job_type == PERM_INVENTORY_SCAN:
|
if job.job_type == PERM_INVENTORY_SCAN:
|
||||||
|
env['FACT_QUEUE'] = settings.FACT_QUEUE
|
||||||
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
|
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
|
||||||
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
|
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
|
||||||
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
|
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
|
||||||
|
|||||||
@@ -29,20 +29,15 @@
|
|||||||
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||||
# POSSIBILITY OF SUCH DAMAGE.
|
# POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from ansible import constants as C
|
|
||||||
try:
|
try:
|
||||||
from ansible.cache.base import BaseCacheModule
|
from ansible.cache.base import BaseCacheModule
|
||||||
except:
|
except:
|
||||||
from ansible.plugins.cache.base import BaseCacheModule
|
from ansible.plugins.cache.base import BaseCacheModule
|
||||||
|
|
||||||
try:
|
from kombu import Connection, Exchange, Producer
|
||||||
import zmq
|
|
||||||
except ImportError:
|
|
||||||
print("pyzmq is required")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
class CacheModule(BaseCacheModule):
|
class CacheModule(BaseCacheModule):
|
||||||
@@ -52,19 +47,12 @@ class CacheModule(BaseCacheModule):
|
|||||||
self._cache = {}
|
self._cache = {}
|
||||||
self._all_keys = {}
|
self._all_keys = {}
|
||||||
|
|
||||||
# This is the local tower zmq connection
|
|
||||||
self._tower_connection = C.CACHE_PLUGIN_CONNECTION
|
|
||||||
self.date_key = time.time()
|
self.date_key = time.time()
|
||||||
try:
|
self.callback_connection = os.environ['CALLBACK_CONNECTION']
|
||||||
self.context = zmq.Context()
|
self.callback_queue = os.environ['FACT_QUEUE']
|
||||||
self.socket = self.context.socket(zmq.REQ)
|
self.connection = Connection(self.callback_connection)
|
||||||
self.socket.setsockopt(zmq.RCVTIMEO, 4000)
|
self.exchange = Exchange(self.callback_queue, type='direct')
|
||||||
self.socket.setsockopt(zmq.LINGER, 2000)
|
self.producer = Producer(self.connection)
|
||||||
self.socket.connect(self._tower_connection)
|
|
||||||
except Exception, e:
|
|
||||||
print("Connection to zeromq failed at %s with error: %s" % (str(self._tower_connection),
|
|
||||||
str(e)))
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
def filter_ansible_facts(self, facts):
|
def filter_ansible_facts(self, facts):
|
||||||
return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_'))
|
return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_'))
|
||||||
@@ -117,8 +105,12 @@ class CacheModule(BaseCacheModule):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Emit fact data to tower for processing
|
# Emit fact data to tower for processing
|
||||||
self.socket.send_json(packet)
|
self.producer.publish(packet,
|
||||||
self.socket.recv()
|
serializer='json',
|
||||||
|
compression='bzip2',
|
||||||
|
exchange=self.exchange,
|
||||||
|
declare=[self.exchange],
|
||||||
|
routing_key=self.callback_queue)
|
||||||
|
|
||||||
def keys(self):
|
def keys(self):
|
||||||
return self._cache.keys()
|
return self._cache.keys()
|
||||||
|
|||||||
@@ -797,6 +797,8 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False
|
|||||||
INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT
|
INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT
|
||||||
|
|
||||||
CALLBACK_QUEUE = "callback_tasks"
|
CALLBACK_QUEUE = "callback_tasks"
|
||||||
|
FACT_QUEUE = "facts"
|
||||||
|
|
||||||
SCHEDULER_QUEUE = "scheduler"
|
SCHEDULER_QUEUE = "scheduler"
|
||||||
|
|
||||||
TASK_COMMAND_PORT = 6559
|
TASK_COMMAND_PORT = 6559
|
||||||
|
|||||||
Reference in New Issue
Block a user