Merge pull request #128 from chrismeyersfsu/feature-mongodb2

Implemented fact scan storage logic. …
This commit is contained in:
Chris Meyers 2015-04-07 10:40:35 -04:00
commit cd39b2e2e9
23 changed files with 853 additions and 78 deletions

20
awx/fact/__init__.py Normal file
View File

@ -0,0 +1,20 @@
# Copyright (c) 2014 AnsibleWorks, Inc.
# All Rights Reserved.
from __future__ import absolute_import
import logging
from django.conf import settings
from mongoengine import connect
from mongoengine.connection import get_db, ConnectionError
from .utils.dbtransform import register_key_transform
logger = logging.getLogger('fact.__init__')
# Connect to Mongo
try:
connect(settings.MONGO_DB)
register_key_transform(get_db())
except ConnectionError:
logger.warn('Failed to establish connect to MongDB "%s"' % (settings.MONGO_DB))

View File

@ -0,0 +1,6 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from __future__ import absolute_import
from .fact import * # noqa

95
awx/fact/models/fact.py Normal file
View File

@ -0,0 +1,95 @@
from mongoengine import Document, DynamicDocument, DateTimeField, ReferenceField, StringField
class FactHost(Document):
hostname = StringField(max_length=100, required=True, unique=True)
# TODO: Consider using hashed index on hostname. django-mongo may not support this but
# executing raw js will
meta = {
'indexes': [
'hostname'
]
}
@staticmethod
def get_host_id(hostname):
host = FactHost.objects.get(hostname=hostname)
if host:
return host.id
return None
class Fact(DynamicDocument):
timestamp = DateTimeField(required=True)
host = ReferenceField(FactHost, required=True)
module = StringField(max_length=50, required=True)
# fact = <anything>
# TODO: Consider using hashed index on host. django-mongo may not support this but
# executing raw js will
meta = {
'indexes': [
'-timestamp',
'host'
]
}
@staticmethod
def add_fact(timestamp, fact, host, module):
fact_obj = Fact(timestamp=timestamp, host=host, module=module, fact=fact)
fact_obj.save()
version_obj = FactVersion(timestamp=timestamp, host=host, module=module, fact=fact_obj)
version_obj.save()
return (fact_obj, version_obj)
# TODO: if we want to relax the need to include module...
# If module not specified then filter query may return more than 1 result.
# Thus, the resulting facts must somehow be unioned/concated/ or kept as an array.
@staticmethod
def get_host_version(hostname, timestamp, module):
try:
host = FactHost.objects.get(hostname=hostname)
except FactHost.DoesNotExist:
return None
kv = {
'host' : host.id,
'timestamp__lte': timestamp,
'module': module,
}
try:
facts = Fact.objects.filter(**kv)
if not facts:
return None
return facts[0]
except Fact.DoesNotExist:
return None
@staticmethod
def get_host_timeline(hostname, module):
try:
host = FactHost.objects.get(hostname=hostname)
except FactHost.DoesNotExist:
return None
kv = {
'host': host.id,
'module': module,
}
return FactVersion.objects.filter(**kv).values_list('timestamp')
class FactVersion(Document):
timestamp = DateTimeField(required=True)
host = ReferenceField(FactHost, required=True)
module = StringField(max_length=50, required=True)
fact = ReferenceField(Fact, required=True)
# TODO: Consider using hashed index on module. django-mongo may not support this but
# executing raw js will
meta = {
'indexes': [
'-timestamp',
'module'
]
}

View File

@ -0,0 +1,7 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from __future__ import absolute_import
from .models import * # noqa
from .utils import * # noqa

View File

@ -0,0 +1,6 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from __future__ import absolute_import
from .fact import * # noqa

View File

@ -0,0 +1,154 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from datetime import datetime
from copy import deepcopy
# Django
# AWX
from awx.fact.models.fact import * # noqa
from awx.main.tests.base import BaseTest, MongoDBRequired
__all__ = ['FactHostTest', 'FactTest', 'FactGetHostVersionTest', 'FactGetHostTimeline']
TEST_FACT_DATA = {
'hostname': 'hostname1',
'add_fact_data': {
'timestamp': datetime.now(),
'host': None,
'module': 'packages',
'fact': {
"accountsservice": [
{
"architecture": "amd64",
"name": "accountsservice",
"source": "apt",
"version": "0.6.35-0ubuntu7.1"
}
],
"acpid": [
{
"architecture": "amd64",
"name": "acpid",
"source": "apt",
"version": "1:2.0.21-1ubuntu2"
}
],
"adduser": [
{
"architecture": "all",
"name": "adduser",
"source": "apt",
"version": "3.113+nmu3ubuntu3"
}
],
},
}
}
# Strip off microseconds because mongo has less precision
TEST_FACT_DATA['add_fact_data']['timestamp'] = TEST_FACT_DATA['add_fact_data']['timestamp'].replace(microsecond=0)
def create_host_document():
TEST_FACT_DATA['add_fact_data']['host'] = FactHost(hostname=TEST_FACT_DATA['hostname']).save()
def create_fact_scans(count=1):
timestamps = []
for i in range(0, count):
data = deepcopy(TEST_FACT_DATA)
t = datetime.now().replace(year=2015 - i, microsecond=0)
data['add_fact_data']['timestamp'] = t
(f, v) = Fact.add_fact(**data['add_fact_data'])
timestamps.append(t)
return timestamps
class FactHostTest(BaseTest, MongoDBRequired):
def test_create_host(self):
host = FactHost(hostname=TEST_FACT_DATA['hostname'])
host.save()
host = FactHost.objects.get(hostname=TEST_FACT_DATA['hostname'])
self.assertIsNotNone(host, "Host added but not found")
self.assertEqual(TEST_FACT_DATA['hostname'], host.hostname, "Gotten record hostname does not match expected hostname")
# Ensure an error is raised for .get() that doesn't match a record.
def test_get_host_id_no_result(self):
host = FactHost(hostname=TEST_FACT_DATA['hostname'])
host.save()
self.assertRaises(FactHost.DoesNotExist, FactHost.objects.get, hostname='doesnotexist')
class FactTest(BaseTest, MongoDBRequired):
def setUp(self):
super(FactTest, self).setUp()
create_host_document()
def test_add_fact(self):
(f_obj, v_obj) = Fact.add_fact(**TEST_FACT_DATA['add_fact_data'])
f = Fact.objects.get(id=f_obj.id)
v = FactVersion.objects.get(id=v_obj.id)
self.assertEqual(f.id, f_obj.id)
self.assertEqual(f.module, TEST_FACT_DATA['add_fact_data']['module'])
self.assertEqual(f.fact, TEST_FACT_DATA['add_fact_data']['fact'])
self.assertEqual(f.timestamp, TEST_FACT_DATA['add_fact_data']['timestamp'])
# host relationship created
self.assertEqual(f.host.id, TEST_FACT_DATA['add_fact_data']['host'].id)
# version created and related
self.assertEqual(v.id, v_obj.id)
self.assertEqual(v.timestamp, TEST_FACT_DATA['add_fact_data']['timestamp'])
self.assertEqual(v.host.id, TEST_FACT_DATA['add_fact_data']['host'].id)
self.assertEqual(v.fact.id, f_obj.id)
self.assertEqual(v.fact.module, TEST_FACT_DATA['add_fact_data']['module'])
class FactGetHostVersionTest(BaseTest, MongoDBRequired):
def setUp(self):
super(FactGetHostVersionTest, self).setUp()
create_host_document()
self.t1 = datetime.now().replace(second=1, microsecond=0)
self.t2 = datetime.now().replace(second=2, microsecond=0)
data = deepcopy(TEST_FACT_DATA)
data['add_fact_data']['timestamp'] = self.t1
(self.f1, self.v1) = Fact.add_fact(**data['add_fact_data'])
data = deepcopy(TEST_FACT_DATA)
data['add_fact_data']['timestamp'] = self.t2
(self.f2, self.v2) = Fact.add_fact(**data['add_fact_data'])
def test_get_host_version_exact_timestamp(self):
fact = Fact.get_host_version(hostname=TEST_FACT_DATA['hostname'], timestamp=self.t1, module=TEST_FACT_DATA['add_fact_data']['module'])
self.assertIsNotNone(fact, "Set of Facts not found")
self.assertEqual(self.f1.id, fact.id)
self.assertEqual(self.f1.fact, fact.fact)
def test_get_host_version_lte_timestamp(self):
t3 = datetime.now().replace(second=3, microsecond=0)
fact = Fact.get_host_version(hostname=TEST_FACT_DATA['hostname'], timestamp=t3, module=TEST_FACT_DATA['add_fact_data']['module'])
self.assertEqual(self.f1.id, fact.id)
self.assertEqual(self.f1.fact, fact.fact)
def test_get_host_version_none(self):
t3 = deepcopy(self.t1).replace(second=0)
fact = Fact.get_host_version(hostname=TEST_FACT_DATA['hostname'], timestamp=t3, module=TEST_FACT_DATA['add_fact_data']['module'])
self.assertIsNone(fact)
class FactGetHostTimeline(BaseTest, MongoDBRequired):
def setUp(self):
super(FactGetHostTimeline, self).setUp()
create_host_document()
self.scans = 20
self.timestamps = create_fact_scans(self.scans)
def test_get_host_timeline_ok(self):
timestamps = Fact.get_host_timeline(hostname=TEST_FACT_DATA['hostname'], module=TEST_FACT_DATA['add_fact_data']['module'])
self.assertIsNotNone(timestamps)
self.assertEqual(len(timestamps), len(self.timestamps))
for i in range(0, self.scans):
self.assertEqual(timestamps[i], self.timestamps[i])

View File

@ -0,0 +1,6 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from __future__ import absolute_import
from .dbtransform import * # noqa

View File

@ -0,0 +1,77 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
from datetime import datetime
from mongoengine import connect
# Django
from django.conf import settings
# AWX
from awx.main.tests.base import BaseTest, MongoDBRequired
from awx.fact.models.fact import * # noqa
__all__ = ['DBTransformTest']
class DBTransformTest(BaseTest, MongoDBRequired):
def setUp(self):
super(DBTransformTest, self).setUp()
# Create a db connection that doesn't have the transformation registered
# Note: this goes through pymongo not mongoengine
self.client = connect(settings.MONGO_DB)
self.db = self.client[settings.MONGO_DB]
def _create_fact(self):
fact = {}
fact[self.k] = self.v
h = FactHost(hostname='blah')
h.save()
f = Fact(host=h,module='blah',timestamp=datetime.now(),fact=fact)
f.save()
return f
def create_dot_fact(self):
self.k = 'this.is.a.key'
self.v = 'this.is.a.value'
self.k_uni = 'this\uff0Eis\uff0Ea\uff0Ekey'
return self._create_fact()
def create_dollar_fact(self):
self.k = 'this$is$a$key'
self.v = 'this$is$a$value'
self.k_uni = 'this\uff04is\uff04a\uff04key'
return self._create_fact()
def check_unicode(self, f):
f_raw = self.db.fact.find_one(id=f.id)
self.assertIn(self.k_uni, f_raw['fact'])
self.assertEqual(f_raw['fact'][self.k_uni], self.v)
# Ensure key . are being transformed to the equivalent unicode into the database
def test_key_transform_dot_unicode_in_storage(self):
f = self.create_dot_fact()
self.check_unicode(f)
# Ensure key $ are being transformed to the equivalent unicode into the database
def test_key_transform_dollar_unicode_in_storage(self):
f = self.create_dollar_fact()
self.check_unicode(f)
def check_transform(self):
f = Fact.objects.all()[0]
self.assertIn(self.k, f.fact)
self.assertEqual(f.fact[self.k], self.v)
def test_key_transform_dot_on_retreive(self):
self.create_dot_fact()
self.check_transform()
def test_key_transform_dollar_on_retreive(self):
self.create_dollar_fact()
self.check_transform()

View File

@ -0,0 +1,2 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved

View File

@ -0,0 +1,55 @@
# Copyright (c) 2014, Ansible, Inc.
# All Rights Reserved.
from pymongo.son_manipulator import SONManipulator
'''
Inspired by: https://stackoverflow.com/questions/8429318/how-to-use-dot-in-field-name/20698802#20698802
Replace . and $ with unicode values
'''
class KeyTransform(SONManipulator):
def __init__(self, replace):
self.replace = replace
def transform_key(self, key, replace, replacement):
"""Transform key for saving to database."""
return key.replace(replace, replacement)
def revert_key(self, key, replace, replacement):
"""Restore transformed key returning from database."""
return key.replace(replacement, replace)
def transform_incoming(self, son, collection):
"""Recursively replace all keys that need transforming."""
for (key, value) in son.items():
for r in self.replace:
replace = r[0]
replacement = r[1]
if replace in key:
if isinstance(value, dict):
son[self.transform_key(key, replace, replacement)] = self.transform_incoming(
son.pop(key), collection)
else:
son[self.transform_key(key, replace, replacement)] = son.pop(key)
elif isinstance(value, dict): # recurse into sub-docs
son[key] = self.transform_incoming(value, collection)
return son
def transform_outgoing(self, son, collection):
"""Recursively restore all transformed keys."""
for (key, value) in son.items():
for r in self.replace:
replace = r[0]
replacement = r[1]
if replacement in key:
if isinstance(value, dict):
son[self.revert_key(key, replace, replacement)] = self.transform_outgoing(
son.pop(key), collection)
else:
son[self.revert_key(key, replace, replacement)] = son.pop(key)
elif isinstance(value, dict): # recurse into sub-docs
son[key] = self.transform_outgoing(value, collection)
return son
def register_key_transform(db):
db.add_son_manipulator(KeyTransform([('.', '\uff0E'), ('$', '\uff04')]))

View File

@ -2,88 +2,67 @@
# All Rights Reserved
import logging
from datetime import datetime
from django.core.management.base import NoArgsCommand
from awx.main.models import * # noqa
from awx.fact.models.fact import * # noqa
from awx.main.socket import Socket
from pymongo import MongoClient
_MODULES = ['packages', 'services', 'files']
logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver')
from pymongo.son_manipulator import SONManipulator
class KeyTransform(SONManipulator):
"""Transforms keys going to database and restores them coming out.
This allows keys with dots in them to be used (but does break searching on
them unless the find command also uses the transform.
Example & test:
# To allow `.` (dots) in keys
import pymongo
client = pymongo.MongoClient("mongodb://localhost")
db = client['delete_me']
db.add_son_manipulator(KeyTransform(".", "_dot_"))
db['mycol'].remove()
db['mycol'].update({'_id': 1}, {'127.0.0.1': 'localhost'}, upsert=True,
manipulate=True)
print db['mycol'].find().next()
print db['mycol'].find({'127_dot_0_dot_0_dot_1': 'localhost'}).next()
Note: transformation could be easily extended to be more complex.
"""
def __init__(self, replace, replacement):
self.replace = replace
self.replacement = replacement
def transform_key(self, key):
"""Transform key for saving to database."""
return key.replace(self.replace, self.replacement)
def revert_key(self, key):
"""Restore transformed key returning from database."""
return key.replace(self.replacement, self.replace)
def transform_incoming(self, son, collection):
"""Recursively replace all keys that need transforming."""
for (key, value) in son.items():
if self.replace in key:
if isinstance(value, dict):
son[self.transform_key(key)] = self.transform_incoming(
son.pop(key), collection)
else:
son[self.transform_key(key)] = son.pop(key)
elif isinstance(value, dict): # recurse into sub-docs
son[key] = self.transform_incoming(value, collection)
return son
def transform_outgoing(self, son, collection):
return son
class FactCacheReceiver(object):
def __init__(self):
self.client = MongoClient('localhost', 27017)
self.timestamp = None
def _determine_module(self, facts):
for x in _MODULES:
if x in facts:
return x
return 'ansible'
def _extract_module_facts(self, module, facts):
if module in facts:
f = facts[module]
return f
return facts
def process_facts(self, facts):
module = self._determine_module(facts)
facts = self._extract_module_facts(module, facts)
return (module, facts)
def process_fact_message(self, message):
host = message['host'].replace(".", "_")
facts = message['facts']
hostname = message['host']
facts_data = message['facts']
date_key = message['date_key']
host_db = self.client.host_facts
host_db.add_son_manipulator(KeyTransform(".", "_"))
host_db.add_son_manipulator(KeyTransform("$", "_"))
host_collection = host_db[host]
facts.update(dict(tower_host=host, datetime=date_key))
rec = host_collection.find({"datetime": date_key})
if rec.count():
this_fact = rec.next()
this_fact.update(facts)
host_collection.save(this_fact)
else:
host_collection.insert(facts)
# TODO: in ansible < v2 module_setup is emitted for "smart" fact caching.
# ansible v2 will not emit this message. Thus, this can be removed at that time.
if 'module_setup' in facts_data and len(facts_data) == 1:
return
try:
host = FactHost.objects.get(hostname=hostname)
except FactHost.DoesNotExist:
host = FactHost(hostname=hostname)
host.save()
except FactHost.MultipleObjectsReturned:
query = "db['fact_host'].find(hostname=%s)" % hostname
print('Database inconsistent. Multiple FactHost "%s" exist. Try the query %s to find the records.' % (hostname, query))
return
(module, facts) = self.process_facts(facts_data)
self.timestamp = datetime.fromtimestamp(date_key, None)
try:
# Update existing Fact entry
version_obj = FactVersion.objects.get(timestamp=self.timestamp, host=host, module=module)
Fact.objects(id=version_obj.fact.id).update_one(fact=facts)
except FactVersion.DoesNotExist:
# Create new Fact entry
(fact_obj, version_obj) = Fact.add_fact(self.timestamp, facts, host, module)
def run_receiver(self):
with Socket('fact_cache', 'r') as facts:

View File

@ -15,3 +15,4 @@ from awx.main.tests.activity_stream import * # noqa
from awx.main.tests.schedules import * # noqa
from awx.main.tests.redact import * # noqa
from awx.main.tests.views import * # noqa
from awx.main.tests.commands import * # noqa

View File

@ -25,6 +25,9 @@ from django.contrib.auth.models import User
from django.test.client import Client
from django.test.utils import override_settings
# MongoEngine
from mongoengine.connection import get_db, ConnectionError
# AWX
from awx.main.models import * # noqa
from awx.main.backend import LDAPSettings
@ -40,6 +43,15 @@ TEST_PLAYBOOK = '''- hosts: mygroup
command: test 1 = 1
'''
class MongoDBRequired(django.test.TestCase):
def setUp(self):
# Drop mongo database
try:
self.db = get_db()
self.db.connection.drop_database(settings.MONGO_DB)
except ConnectionError:
self.skipTest('MongoDB connection failed')
class QueueTestMixin(object):
def start_queue(self):
self.start_redis()
@ -84,6 +96,7 @@ class BaseTestMixin(QueueTestMixin):
def setUp(self):
super(BaseTestMixin, self).setUp()
self.object_ctr = 0
# Save sys.path before tests.
self._sys_path = [x for x in sys.path]

View File

@ -0,0 +1,5 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from awx.main.tests.commands.run_fact_cache_receiver import * # noqa
from awx.main.tests.commands.commands_monolithic import * # noqa

View File

@ -0,0 +1,89 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
import StringIO
import sys
import json
# Django
from django.core.management import call_command
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
class BaseCommandMixin(BaseTestMixin):
'''
Base class for tests that run management commands.
'''
def create_test_inventories(self):
self.setup_users()
self.organizations = self.make_organizations(self.super_django_user, 2)
self.projects = self.make_projects(self.normal_django_user, 2)
self.organizations[0].projects.add(self.projects[1])
self.organizations[1].projects.add(self.projects[0])
self.inventories = []
self.hosts = []
self.groups = []
for n, organization in enumerate(self.organizations):
inventory = Inventory.objects.create(name='inventory-%d' % n,
description='description for inventory %d' % n,
organization=organization,
variables=json.dumps({'n': n}) if n else '')
self.inventories.append(inventory)
hosts = []
for x in xrange(10):
if n > 0:
variables = json.dumps({'ho': 'hum-%d' % x})
else:
variables = ''
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variables=variables)
hosts.append(host)
self.hosts.extend(hosts)
groups = []
for x in xrange(5):
if n > 0:
variables = json.dumps({'gee': 'whiz-%d' % x})
else:
variables = ''
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variables=variables)
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
if n > 0 and x == 4:
group.parents.add(groups[3])
self.groups.extend(groups)
def run_command(self, name, *args, **options):
'''
Run a management command and capture its stdout/stderr along with any
exceptions.
'''
command_runner = options.pop('command_runner', call_command)
stdin_fileobj = options.pop('stdin_fileobj', None)
options.setdefault('verbosity', 1)
options.setdefault('interactive', False)
original_stdin = sys.stdin
original_stdout = sys.stdout
original_stderr = sys.stderr
if stdin_fileobj:
sys.stdin = stdin_fileobj
sys.stdout = StringIO.StringIO()
sys.stderr = StringIO.StringIO()
result = None
try:
result = command_runner(name, *args, **options)
except Exception as e:
result = e
finally:
captured_stdout = sys.stdout.getvalue()
captured_stderr = sys.stderr.getvalue()
sys.stdin = original_stdin
sys.stdout = original_stdout
sys.stderr = original_stderr
return result, captured_stdout, captured_stderr

View File

@ -870,7 +870,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
parts.query, parts.fragment])
os.environ.setdefault('REST_API_URL', rest_api_url)
os.environ['INVENTORY_ID'] = str(old_inv.pk)
source = os.path.join(os.path.dirname(__file__), '..', '..', 'plugins',
source = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'plugins',
'inventory', 'awxrest.py')
result, stdout, stderr = self.run_command('inventory_import',
inventory_id=new_inv.pk,
@ -907,7 +907,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
new_inv = self.organizations[0].inventories.create(name='newec2')
self.assertEqual(new_inv.hosts.count(), 0)
self.assertEqual(new_inv.groups.count(), 0)
os.chdir(os.path.join(os.path.dirname(__file__), 'data'))
os.chdir(os.path.join(os.path.dirname(__file__), '..', 'data'))
inv_file = 'large_ec2_inventory.py'
result, stdout, stderr = self.run_command('inventory_import',
inventory_id=new_inv.pk,
@ -928,7 +928,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
new_inv = self.organizations[0].inventories.create(name='splunk')
self.assertEqual(new_inv.hosts.count(), 0)
self.assertEqual(new_inv.groups.count(), 0)
inv_file = os.path.join(os.path.dirname(__file__), 'data',
inv_file = os.path.join(os.path.dirname(__file__), '..', 'data',
'splunk_inventory.py')
result, stdout, stderr = self.run_command('inventory_import',
inventory_id=new_inv.pk,
@ -951,7 +951,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
def _check_largeinv_import(self, new_inv, nhosts, nhosts_inactive=0):
self._start_time = time.time()
inv_file = os.path.join(os.path.dirname(__file__), 'data', 'largeinv.py')
inv_file = os.path.join(os.path.dirname(__file__), '..', 'data', 'largeinv.py')
ngroups = self._get_ngroups_for_nhosts(nhosts)
os.environ['NHOSTS'] = str(nhosts)
result, stdout, stderr = self.run_command('inventory_import',

View File

@ -0,0 +1,199 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import time
from datetime import datetime
import mock
import unittest
from copy import deepcopy
from mock import MagicMock
# AWX
from awx.main.tests.base import BaseTest, MongoDBRequired
from awx.main.tests.commands.base import BaseCommandMixin
from awx.main.management.commands.run_fact_cache_receiver import FactCacheReceiver
from awx.fact.models.fact import * # noqa
__all__ = ['RunFactCacheReceiverUnitTest', 'RunFactCacheReceiverFunctionalTest']
TEST_MSG_BASE = {
'host': 'hostname1',
'date_key': time.mktime(datetime.utcnow().timetuple()),
'facts' : { }
}
TEST_MSG_MODULES = {
'packages': {
"accountsservice": [
{
"architecture": "amd64",
"name": "accountsservice",
"source": "apt",
"version": "0.6.35-0ubuntu7.1"
}
],
"acpid": [
{
"architecture": "amd64",
"name": "acpid",
"source": "apt",
"version": "1:2.0.21-1ubuntu2"
}
],
"adduser": [
{
"architecture": "all",
"name": "adduser",
"source": "apt",
"version": "3.113+nmu3ubuntu3"
}
],
},
'services': [
{
"name": "acpid",
"source": "sysv",
"state": "running"
},
{
"name": "apparmor",
"source": "sysv",
"state": "stopped"
},
{
"name": "atd",
"source": "sysv",
"state": "running"
},
{
"name": "cron",
"source": "sysv",
"state": "running"
}
],
'ansible': {
'ansible_fact_simple': 'hello world',
'ansible_fact_complex': {
'foo': 'bar',
'hello': [
'scooby',
'dooby',
'doo'
]
},
}
}
# Derived from TEST_MSG_BASE
TEST_MSG = dict(TEST_MSG_BASE)
def copy_only_module(data, module):
data = deepcopy(data)
data['facts'] = {}
if module == 'ansible':
data['facts'] = deepcopy(TEST_MSG_MODULES[module])
else:
data['facts'][module] = deepcopy(TEST_MSG_MODULES[module])
return data
class RunFactCacheReceiverFunctionalTest(BaseCommandMixin, BaseTest, MongoDBRequired):
@unittest.skip('''\
TODO: run_fact_cache_receiver enters a while True loop that never exists. \
This differs from most other commands that we test for. More logic and work \
would be required to invoke this case from the command line with little return \
in terms of increase coverage and confidence.''')
def test_invoke(self):
result, stdout, stderr = self.run_command('run_fact_cache_receiver')
self.assertEqual(result, None)
class RunFactCacheReceiverUnitTest(BaseTest, MongoDBRequired):
# TODO: Check that timestamp and other attributes are as expected
def check_process_fact_message_module(self, data, module):
fact_found = None
facts = Fact.objects.all()
self.assertEqual(len(facts), 1)
for fact in facts:
if fact.module == module:
fact_found = fact
break
self.assertIsNotNone(fact_found)
#self.assertEqual(data['facts'][module], fact_found[module])
fact_found = None
fact_versions = FactVersion.objects.all()
self.assertEqual(len(fact_versions), 1)
for fact in fact_versions:
if fact.module == module:
fact_found = fact
break
self.assertIsNotNone(fact_found)
# Ensure that the message flows from the socket through to process_fact_message()
@mock.patch('awx.main.socket.Socket.listen')
def test_run_receiver(self, listen_mock):
listen_mock.return_value = [TEST_MSG]
receiver = FactCacheReceiver()
receiver.process_fact_message = MagicMock(name='process_fact_message')
receiver.run_receiver()
receiver.process_fact_message.assert_called_once_with(TEST_MSG)
def test_process_fact_message_ansible(self):
data = copy_only_module(TEST_MSG, 'ansible')
receiver = FactCacheReceiver()
receiver.process_fact_message(data)
self.check_process_fact_message_module(data, 'ansible')
def test_process_fact_message_packages(self):
data = copy_only_module(TEST_MSG, 'packages')
receiver = FactCacheReceiver()
receiver.process_fact_message(data)
self.check_process_fact_message_module(data, 'packages')
def test_process_fact_message_services(self):
data = copy_only_module(TEST_MSG, 'services')
receiver = FactCacheReceiver()
receiver.process_fact_message(data)
self.check_process_fact_message_module(data, 'services')
# Ensure that only a single host gets created for multiple invocations with the same hostname
def test_process_fact_message_single_host_created(self):
receiver = FactCacheReceiver()
data = deepcopy(TEST_MSG)
receiver.process_fact_message(data)
data = deepcopy(TEST_MSG)
data['date_key'] = time.mktime(datetime.utcnow().timetuple())
receiver.process_fact_message(data)
fact_hosts = FactHost.objects.all()
self.assertEqual(len(fact_hosts), 1)
def test_process_facts_message_ansible_overwrite(self):
data = copy_only_module(TEST_MSG, 'ansible')
key = 'ansible_overwrite'
value = 'hello world'
receiver = FactCacheReceiver()
receiver.process_fact_message(data)
fact = Fact.objects.all()[0]
data = copy_only_module(TEST_MSG, 'ansible')
data['facts'][key] = value
receiver.process_fact_message(data)
fact = Fact.objects.get(id=fact.id)
self.assertIn(key, fact.fact)
self.assertEqual(fact.fact[key], value)

View File

@ -1,3 +1,6 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from awx.main.tests.jobs.jobs_monolithic import * # noqa
from survey_password import * # noqa
from base import * # noqa

View File

@ -32,6 +32,8 @@
import sys
import time
import datetime
import json
from copy import deepcopy
from ansible import constants as C
from ansible.cache.base import BaseCacheModule
@ -47,6 +49,7 @@ class CacheModule(BaseCacheModule):
# Basic in-memory caching for typical runs
self._cache = {}
self._cache_prev = {}
# This is the local tower zmq connection
self._tower_connection = C.CACHE_PLUGIN_CONNECTION
@ -54,20 +57,67 @@ class CacheModule(BaseCacheModule):
try:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.RCVTIMEO, 4000)
self.socket.setsockopt(zmq.LINGER, 2000)
self.socket.connect(self._tower_connection)
except Exception, e:
print("Connection to zeromq failed at %s with error: %s" % (str(self._tower_connection),
str(e)))
sys.exit(1)
def identify_ansible_facts(self, facts):
ansible_keys = {}
for k in facts.keys():
if k.startswith('ansible_'):
ansible_keys[k] = 1
return ansible_keys
def identify_new_module(self, key, value):
if key in self._cache_prev:
value_old = self._cache_prev[key]
for k,v in value.iteritems():
if k not in value_old:
if not k.startswith('ansible_'):
return k
return None
def get(self, key):
return self._cache.get(key)
'''
get() returns a reference to the fact object (usually a dict). The object is modified directly,
then set is called. Effectively, pre-determining the set logic.
The below logic creates a backup of the cache each set. The values are now preserved across set() calls.
For a given key. The previous value is looked at for new keys that aren't of the form 'ansible_'.
If found, send the value of the found key.
If not found, send all the key value pairs of the form 'ansible_' (we presume set() is called because
of an ansible fact module invocation)
More simply stated...
In value, if a new key is found at the top most dict then consider this a module request and only
emit the facts for the found top-level key.
If a new key is not found, assume set() was called as a result of ansible facts scan. Thus, emit
all facts of the form 'ansible_'.
'''
def set(self, key, value):
module = self.identify_new_module(key, value)
# Assume ansible fact triggered the set if no new module found
facts = {}
if not module:
keys = self.identify_ansible_facts(value)
for k in keys:
facts[k] = value[k]
else:
facts[module] = value[module]
self._cache_prev = deepcopy(self._cache)
self._cache[key] = value
# Emit fact data to tower for processing
self.socket.send_json(dict(host=key, facts=value, date_key=self.date_key))
self.socket.send_json(dict(host=key, facts=facts, date_key=self.date_key))
self.socket.recv()
def keys(self):

View File

@ -7,6 +7,8 @@ import glob
from datetime import timedelta
import tempfile
MONGO_DB = 'system_tracking'
# Update this module's local settings from the global settings module.
from django.conf import global_settings
this_module = sys.modules[__name__]
@ -153,6 +155,7 @@ INSTALLED_APPS = (
'awx.main',
'awx.api',
'awx.ui',
'awx.fact',
)
INTERNAL_IPS = ('127.0.0.1',)

View File

@ -14,6 +14,8 @@ from split_settings.tools import optional, include
# Load default settings.
from defaults import *
MONGO_DB = 'system_tracking_dev'
# Disable capturing all SQL queries when running celeryd in development.
if 'celeryd' in sys.argv:
SQL_DEBUG = False
@ -30,7 +32,7 @@ AWX_PROOT_ENABLED = True
try:
import django_jenkins
INSTALLED_APPS += ('django_jenkins',)
PROJECT_APPS = ('awx.main.tests', 'awx.api.tests',)
PROJECT_APPS = ('awx.main.tests', 'awx.api.tests', 'awx.fact.tests',)
except ImportError:
pass

View File

@ -42,6 +42,8 @@ if len(sys.argv) >= 2 and sys.argv[1] == 'test':
'TEST_NAME': os.path.join(BASE_DIR, 'awx_test.sqlite3'),
}
}
MONGO_DB = 'system_tracking_test'
# Celery AMQP configuration.
BROKER_URL = 'redis://localhost/'

View File

@ -55,7 +55,6 @@ mongo-python-driver-2.8.tar.gz
# Needed by pyrax:
#httplib2-0.8.tar.gz
#keyring-3.7.zip
#mock-1.0.1.tar.gz
#python-swiftclient-2.0.3.tar.gz
#rackspace-novaclient-1.4.tar.gz
# Remaining dev/prod packages:
@ -78,6 +77,8 @@ mongo-python-driver-2.8.tar.gz
#mongoengine-0.9.0.tar.gz
# Dev-only packages:
# Needed for tests
mock-1.0.1.tar.gz
# Needed by django-debug-toolbar:
sqlparse-0.1.11.tar.gz
# Needed for Python2.6 support: