Merge pull request #6588 from chrismeyersfsu/feature-fact_cache

initial tower fact cache implementation
This commit is contained in:
Chris Meyers 2017-06-22 09:58:28 -04:00 committed by GitHub
commit fee16fe391
23 changed files with 375 additions and 440 deletions

View File

@ -390,7 +390,6 @@ server_noattach:
tmux new-window 'exec make receiver'
tmux select-window -t tower:2
tmux rename-window 'Extra Services'
tmux split-window -h 'exec make factcacher'
tmux select-window -t tower:0
server: server_noattach
@ -472,12 +471,6 @@ socketservice:
fi; \
$(PYTHON) manage.py run_socketio_service
factcacher:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \
fi; \
$(PYTHON) manage.py run_fact_cache_receiver
nginx:
nginx -g "daemon off;"

View File

@ -2227,7 +2227,7 @@ class JobOptionsSerializer(LabelsListMixin, BaseSerializer):
fields = ('*', 'job_type', 'inventory', 'project', 'playbook',
'credential', 'vault_credential', 'forks', 'limit',
'verbosity', 'extra_vars', 'job_tags', 'force_handlers',
'skip_tags', 'start_at_task', 'timeout', 'store_facts',)
'skip_tags', 'start_at_task', 'timeout', 'use_fact_cache',)
def get_fields(self):
fields = super(JobOptionsSerializer, self).get_fields()

View File

@ -228,6 +228,19 @@ register(
category_slug='jobs',
)
register(
'ANSIBLE_FACT_CACHE_TIMEOUT',
field_class=fields.IntegerField,
min_value=0,
default=0,
label=_('Per-Host Ansible Fact Cache Timeout'),
help_text=_('Maximum time, in seconds, that Tower stored Ansible facts are considered valid since '
'the last time they were modified. Only valid, non-stale, facts will be accessible by '
'a playbook. Note, this does not influence the deletion of ansible_facts from the database.'),
category=_('Jobs'),
category_slug='jobs',
)
register(
'LOG_AGGREGATOR_HOST',
field_class=fields.CharField,

View File

@ -1,151 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import logging
from datetime import datetime
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
# Django
from django.core.management.base import NoArgsCommand
from django.conf import settings
from django.utils import timezone
from django.db import IntegrityError
# AWX
from awx.main.models.jobs import Job
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
from awx.main.models.base import PERM_INVENTORY_SCAN
logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver')
analytics_logger = logging.getLogger('awx.analytics.system_tracking')
class FactBrokerWorker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
self.timestamp = None
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[Queue(settings.FACT_QUEUE,
Exchange(settings.FACT_QUEUE, type='direct'),
routing_key=settings.FACT_QUEUE)],
accept=['json'],
callbacks=[self.process_fact_message])]
def _determine_module(self, facts):
# Symantically determine the module type
if len(facts) == 1:
return facts.iterkeys().next()
return 'ansible'
def _extract_module_facts(self, module, facts):
if module in facts:
f = facts[module]
return f
return facts
def process_facts(self, facts):
module = self._determine_module(facts)
facts = self._extract_module_facts(module, facts)
return (module, facts)
def _do_fact_scan_create_update(self, host_obj, module_name, facts, timestamp):
try:
fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=timestamp)
fact_obj.facts = facts
fact_obj.save()
logger.info('Updated existing fact <%s>' % (fact_obj.id))
except Fact.DoesNotExist:
# Create new Fact entry
fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts)
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
analytics_logger.info('Received message with fact data', extra=dict(
module_name=module_name, facts_data=facts))
return fact_obj
def _do_gather_facts_update(self, host_obj, module_name, facts, timestamp):
host_obj.update_ansible_facts(module=module_name, facts=facts, timestamp=self.timestamp)
return host_obj
def process_fact_message(self, body, message):
hostname = body['host']
inventory_id = body['inventory_id']
job_id = body.get('job_id', -1)
facts_data = body['facts']
date_key = body['date_key']
is_fact_scan = False
job = None
'''
In Tower < 3.2 we neglected to ack the incoming message.
In Tower 3.2 we add the job_id parameter.
To account for this, we need to fail gracefully when the job is not
found.
'''
try:
job = Job.objects.get(id=job_id)
is_fact_scan = True if job.job_type == PERM_INVENTORY_SCAN else False
except Job.DoesNotExist:
logger.warn('Failed to find job %s while processing facts' % job_id)
message.ack()
return None
try:
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
except Fact.DoesNotExist:
logger.warn('Failed to intake fact. Host does not exist <hostname, inventory_id> <%s, %s>' % (hostname, inventory_id))
message.ack()
return None
except Fact.MultipleObjectsReturned:
logger.warn('Database inconsistent. Multiple Hosts found for <hostname, inventory_id> <%s, %s>.' % (hostname, inventory_id))
message.ack()
return None
except Exception as e:
logger.error("Exception communicating with Fact Cache Database: %s" % str(e))
message.ack()
return None
(module_name, facts) = self.process_facts(facts_data)
self.timestamp = datetime.fromtimestamp(date_key, timezone.utc)
ret = None
# Update existing Fact entry
if is_fact_scan is True:
ret = self._do_fact_scan_create_update(host_obj, module_name, facts, self.timestamp)
if job.store_facts is True:
if module_name == 'insights':
system_id = facts.get('system_id', None)
host_obj.insights_system_id = system_id
try:
host_obj.save()
except IntegrityError:
host_obj.insights_system_id = None
logger.warn('Inisghts system_id %s not assigned to host %s because it already exists.' % (system_id, host_obj.pk))
self._do_gather_facts_update(host_obj, module_name, facts, self.timestamp)
message.ack()
return ret
class Command(NoArgsCommand):
'''
Save Fact Event packets to the database as emitted from a Tower Scan Job
'''
help = 'Launch the Fact Cache Receiver'
def handle_noargs(self, **options):
with Connection(settings.BROKER_URL) as conn:
try:
worker = FactBrokerWorker(conn)
worker.run()
except KeyboardInterrupt:
pass

View File

@ -81,15 +81,20 @@ class Migration(migrations.Migration):
name='ansible_facts',
field=awx.main.fields.JSONBField(default={}, help_text='Arbitrary JSON structure of most recent ansible_facts, per-host.', blank=True),
),
migrations.AddField(
model_name='host',
name='ansible_facts_modified',
field=models.DateTimeField(default=None, help_text='The date and time ansible_facts was last modified.', null=True, editable=False),
),
migrations.AddField(
model_name='job',
name='store_facts',
field=models.BooleanField(default=False, help_text='During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'),
name='use_fact_cache',
field=models.BooleanField(default=False, help_text='If enabled, Tower will act as an Ansible Fact Cache Plugin; persisting facts at the end of a playbook run to the database and caching facts for use by Ansible.'),
),
migrations.AddField(
model_name='jobtemplate',
name='store_facts',
field=models.BooleanField(default=False, help_text='During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'),
name='use_fact_cache',
field=models.BooleanField(default=False, help_text='If enabled, Tower will act as an Ansible Fact Cache Plugin; persisting facts at the end of a playbook run to the database and caching facts for use by Ansible.'),
),
migrations.RunSQL([("CREATE INDEX host_ansible_facts_default_gin ON %s USING gin"
"(ansible_facts jsonb_path_ops);", [AsIs(Host._meta.db_table)])],

View File

@ -448,6 +448,12 @@ class Host(CommonModelNameNotUnique):
default={},
help_text=_('Arbitrary JSON structure of most recent ansible_facts, per-host.'),
)
ansible_facts_modified = models.DateTimeField(
default=None,
editable=False,
null=True,
help_text=_('The date and time ansible_facts was last modified.'),
)
insights_system_id = models.TextField(
blank=True,
default=None,

View File

@ -7,13 +7,18 @@ import hashlib
import hmac
import logging
import time
import json
from urlparse import urljoin
# Django
from django.conf import settings
from django.db import models
#from django.core.cache import cache
import memcache
from django.db.models import Q, Count
from django.utils.dateparse import parse_datetime
from dateutil import parser
from dateutil.tz import tzutc
from django.utils.encoding import force_text
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
@ -41,6 +46,7 @@ from awx.main.consumers import emit_channel_notification
logger = logging.getLogger('awx.main.models.jobs')
analytics_logger = logging.getLogger('awx.analytics.job_events')
system_tracking_logger = logging.getLogger('awx.analytics.system_tracking')
__all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent', 'SystemJobOptions', 'SystemJobTemplate', 'SystemJob']
@ -147,9 +153,9 @@ class JobOptions(BaseModel):
default=0,
help_text=_("The amount of time (in seconds) to run before the task is canceled."),
)
store_facts = models.BooleanField(
use_fact_cache = models.BooleanField(
default=False,
help_text=_('During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'),
help_text=_("If enabled, Tower will act as an Ansible Fact Cache Plugin; persisting facts at the end of a playbook run to the database and caching facts for use by Ansible."),
)
extra_vars_dict = VarsDictProperty('extra_vars', True)
@ -282,7 +288,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour
'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars',
'launch_type', 'force_handlers', 'skip_tags', 'start_at_task',
'become_enabled', 'labels', 'survey_passwords',
'allow_simultaneous', 'timeout', 'store_facts',]
'allow_simultaneous', 'timeout', 'use_fact_cache',]
def resource_validation_data(self):
'''
@ -703,6 +709,84 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
self.project_update.cancel(job_explanation=job_explanation)
return res
@property
def memcached_fact_key(self):
return '{}'.format(self.inventory.id)
def memcached_fact_host_key(self, host_name):
return '{}-{}'.format(self.inventory.id, host_name)
def memcached_fact_modified_key(self, host_name):
return '{}-{}-modified'.format(self.inventory.id, host_name)
def _get_inventory_hosts(self, only=['name', 'ansible_facts', 'modified',]):
return self.inventory.hosts.only(*only)
def _get_memcache_connection(self):
return memcache.Client([settings.CACHES['default']['LOCATION']], debug=0)
def start_job_fact_cache(self):
if not self.inventory:
return
cache = self._get_memcache_connection()
host_names = []
for host in self._get_inventory_hosts():
host_key = self.memcached_fact_host_key(host.name)
modified_key = self.memcached_fact_modified_key(host.name)
if cache.get(modified_key) is None:
if host.ansible_facts_modified:
host_modified = host.ansible_facts_modified.replace(tzinfo=tzutc()).isoformat()
else:
host_modified = datetime.datetime.now(tzutc()).isoformat()
cache.set(host_key, json.dumps(host.ansible_facts))
cache.set(modified_key, host_modified)
host_names.append(host.name)
cache.set(self.memcached_fact_key, host_names)
def finish_job_fact_cache(self):
if not self.inventory:
return
cache = self._get_memcache_connection()
hosts = self._get_inventory_hosts()
for host in hosts:
host_key = self.memcached_fact_host_key(host.name)
modified_key = self.memcached_fact_modified_key(host.name)
modified = cache.get(modified_key)
if modified is None:
cache.delete(host_key)
continue
# Save facts if cache is newer than DB
modified = parser.parse(modified, tzinfos=[tzutc()])
if not host.ansible_facts_modified or modified > host.ansible_facts_modified:
ansible_facts = cache.get(host_key)
try:
ansible_facts = json.loads(ansible_facts)
except Exception:
ansible_facts = None
if ansible_facts is None:
cache.delete(host_key)
continue
host.ansible_facts = ansible_facts
host.ansible_facts_modified = modified
if 'insights' in ansible_facts and 'system_id' in ansible_facts['insights']:
host.insights_system_id = ansible_facts['insights']['system_id']
host.save()
system_tracking_logger.info('New fact for inventory {} host {}'.format(host.inventory.name, host.name),
extra=dict(inventory_id=host.inventory.id, host_name=host.name,
ansible_facts=host.ansible_facts,
ansible_facts_modified=host.ansible_facts_modified.isoformat()))
class JobHostSummary(CreatedModifiedModel):
'''
@ -1357,3 +1441,4 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin):
def get_notification_friendly_name(self):
return "System Job"

View File

@ -877,6 +877,12 @@ class RunJob(BaseTask):
# callbacks to work.
env['JOB_ID'] = str(job.pk)
env['INVENTORY_ID'] = str(job.inventory.pk)
if job.use_fact_cache and not kwargs.get('isolated'):
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
env['ANSIBLE_FACT_CACHE_TIMEOUT'] = str(settings.ANSIBLE_FACT_CACHE_TIMEOUT)
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
@ -951,13 +957,6 @@ class RunJob(BaseTask):
if authorize:
env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password')
# Set environment variables related to gathering facts from the cache
if (job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True) and not kwargs.get('isolated'):
env['FACT_QUEUE'] = settings.FACT_QUEUE
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT)
return env
def build_args(self, job, **kwargs):
@ -1140,8 +1139,14 @@ class RunJob(BaseTask):
('project_update', local_project_sync.name, local_project_sync.id)))
raise
if job.use_fact_cache and not kwargs.get('isolated'):
job.start_job_fact_cache()
def final_run_hook(self, job, status, **kwargs):
super(RunJob, self).final_run_hook(job, status, **kwargs)
if job.use_fact_cache and not kwargs.get('isolated'):
job.finish_job_fact_cache()
try:
inventory = job.inventory
except Inventory.DoesNotExist:
@ -1852,7 +1857,6 @@ class RunInventoryUpdate(BaseTask):
raise
def final_run_hook(self, instance, status, **kwargs):
print("In final run hook")
if self.custom_dir_path:
for p in self.custom_dir_path:
try:

View File

@ -1,129 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
# Python
import pytest
from datetime import datetime
import json
# Django
from django.utils import timezone
# AWX
from awx.main.management.commands.run_fact_cache_receiver import FactBrokerWorker
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
from awx.main.models.base import PERM_INVENTORY_SCAN
@pytest.fixture
def mock_message(mocker):
class Message():
def ack():
pass
msg = Message()
mocker.patch.object(msg, 'ack')
return msg
@pytest.fixture
def mock_job_generator(mocker):
def fn(store_facts=True, job_type=PERM_INVENTORY_SCAN):
class Job():
def __init__(self):
self.store_facts = store_facts
self.job_type = job_type
job = Job()
mocker.patch('awx.main.models.Job.objects.get', return_value=job)
return job
return fn
# TODO: Check that timestamp and other attributes are as expected
def check_process_fact_message_module(fact_returned, data, module_name, message):
date_key = data['date_key']
message.ack.assert_called_with()
# Ensure 1, and only 1, fact created
timestamp = datetime.fromtimestamp(date_key, timezone.utc)
assert 1 == Fact.objects.all().count()
host_obj = Host.objects.get(name=data['host'], inventory__id=data['inventory_id'])
assert host_obj is not None
fact_known = Fact.get_host_fact(host_obj.id, module_name, timestamp)
assert fact_known is not None
assert fact_known == fact_returned
assert host_obj == fact_returned.host
if module_name == 'ansible':
assert data['facts'] == fact_returned.facts
else:
assert data['facts'][module_name] == fact_returned.facts
assert timestamp == fact_returned.timestamp
assert module_name == fact_returned.module
@pytest.mark.django_db
def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
receiver = FactBrokerWorker(None)
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message)
check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible', mock_message)
@pytest.mark.django_db
def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
receiver = FactBrokerWorker(None)
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
fact_returned = receiver.process_fact_message(fact_msg_packages, mock_message)
check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages', mock_message)
@pytest.mark.django_db
def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
receiver = FactBrokerWorker(None)
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
fact_returned = receiver.process_fact_message(fact_msg_services, mock_message)
check_process_fact_message_module(fact_returned, fact_msg_services, 'services', mock_message)
@pytest.mark.django_db
def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
'''
We pickypack our fact sending onto the Ansible fact interface.
The interface is <hostname, facts>. Where facts is a json blob of all the facts.
This makes it hard to decipher what facts are new/changed.
Because of this, we handle the same fact module data being sent multiple times
and just keep the newest version.
'''
#epoch = timezone.now()
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
epoch = datetime.fromtimestamp(fact_msg_ansible['date_key'])
fact_scans(fact_scans=1, timestamp_epoch=epoch)
key = 'ansible.overwrite'
value = 'hello world'
receiver = FactBrokerWorker(None)
receiver.process_fact_message(fact_msg_ansible, mock_message)
fact_msg_ansible['facts'][key] = value
fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message)
fact_obj = Fact.objects.get(id=fact_returned.id)
assert key in fact_obj.facts
assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
@pytest.mark.django_db
def test_process_fact_store_facts(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
receiver = FactBrokerWorker(None)
mock_job_generator(store_facts=True, job_type='run')
receiver.process_fact_message(fact_msg_services, mock_message)
host_obj = Host.objects.get(name=fact_msg_services['host'], inventory__id=fact_msg_services['inventory_id'])
assert host_obj is not None
assert host_obj.ansible_facts == fact_msg_services['facts']

View File

@ -0,0 +1,133 @@
import pytest
from awx.main.models import (
Job,
Inventory,
Host,
)
import datetime
import json
from dateutil.tz import tzutc
class CacheMock(object):
def __init__(self):
self.d = dict()
def get(self, key):
if key not in self.d:
return None
return self.d[key]
def set(self, key, val):
self.d[key] = val
def delete(self, key):
del self.d[key]
@pytest.fixture
def old_time():
return (datetime.datetime.now(tzutc()) - datetime.timedelta(minutes=60))
@pytest.fixture()
def new_time():
return (datetime.datetime.now(tzutc()))
@pytest.fixture
def hosts(old_time, inventory):
return [
Host(name='host1', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time, inventory=inventory),
Host(name='host2', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time, inventory=inventory),
Host(name='host3', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time, inventory=inventory),
]
@pytest.fixture
def hosts2(inventory):
return [
Host(name='host2', ansible_facts="foobar", ansible_facts_modified=old_time, inventory=inventory),
]
@pytest.fixture
def inventory():
return Inventory(id=5)
@pytest.fixture
def mock_cache(mocker):
cache = CacheMock()
mocker.patch.object(cache, 'set', wraps=cache.set)
mocker.patch.object(cache, 'get', wraps=cache.get)
mocker.patch.object(cache, 'delete', wraps=cache.delete)
return cache
@pytest.fixture
def job(mocker, hosts, inventory, mock_cache):
j = Job(inventory=inventory, id=2)
j._get_inventory_hosts = mocker.Mock(return_value=hosts)
j._get_memcache_connection = mocker.Mock(return_value=mock_cache)
return j
@pytest.fixture
def job2(mocker, hosts2, inventory, mock_cache):
j = Job(inventory=inventory, id=3)
j._get_inventory_hosts = mocker.Mock(return_value=hosts2)
j._get_memcache_connection = mocker.Mock(return_value=mock_cache)
return j
def test_start_job_fact_cache(hosts, job, inventory, mocker):
job.start_job_fact_cache()
job._get_memcache_connection().set.assert_any_call('5', [h.name for h in hosts])
for host in hosts:
job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), json.dumps(host.ansible_facts))
job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), host.ansible_facts_modified.isoformat())
def test_start_job_fact_cache_existing_host(hosts, hosts2, job, job2, inventory, mocker):
job.start_job_fact_cache()
for host in hosts:
job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), json.dumps(host.ansible_facts))
job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), host.ansible_facts_modified.isoformat())
job._get_memcache_connection().set.reset_mock()
job2.start_job_fact_cache()
# Ensure hosts2 ansible_facts didn't overwrite hosts ansible_facts
ansible_facts_cached = job._get_memcache_connection().get('{}-{}'.format(5, hosts2[0].name))
assert ansible_facts_cached == json.dumps(hosts[1].ansible_facts)
def test_finish_job_fact_cache(job, hosts, inventory, mocker, new_time):
job.start_job_fact_cache()
for h in hosts:
h.save = mocker.Mock()
host_key = job.memcached_fact_host_key(hosts[1].name)
modified_key = job.memcached_fact_modified_key(hosts[1].name)
ansible_facts_new = {"foo": "bar", "insights": {"system_id": "updated_by_scan"}}
job._get_memcache_connection().set(host_key, json.dumps(ansible_facts_new))
job._get_memcache_connection().set(modified_key, new_time.isoformat())
job.finish_job_fact_cache()
hosts[0].save.assert_not_called()
hosts[2].save.assert_not_called()
assert hosts[1].ansible_facts == ansible_facts_new
assert hosts[1].insights_system_id == "updated_by_scan"
hosts[1].save.assert_called_once_with()

View File

@ -4,6 +4,8 @@ import cStringIO
import json
import logging
import socket
import datetime
from dateutil.tz import tzutc
from uuid import uuid4
import mock
@ -135,7 +137,7 @@ def test_base_logging_handler_emit(dummy_log_record):
assert body['message'] == 'User joe logged in'
def test_base_logging_handler_emit_one_record_per_fact():
def test_base_logging_handler_emit_system_tracking():
handler = BaseHandler(host='127.0.0.1', enabled_flag=True,
message_type='logstash', indv_facts=True,
enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking'])
@ -149,27 +151,20 @@ def test_base_logging_handler_emit_one_record_per_fact():
tuple(), # args,
None # exc_info
)
record.module_name = 'packages'
record.facts_data = [{
"name": "ansible",
"version": "2.2.1.0"
}, {
"name": "ansible-tower",
"version": "3.1.0"
}]
record.inventory_id = 11
record.host_name = 'my_lucky_host'
record.ansible_facts = {
"ansible_kernel": "4.4.66-boot2docker",
"ansible_machine": "x86_64",
"ansible_swapfree_mb": 4663,
}
record.ansible_facts_modified = datetime.datetime.now(tzutc()).isoformat()
sent_payloads = handler.emit(record)
assert len(sent_payloads) == 2
sent_payloads.sort(key=lambda payload: payload['version'])
assert len(sent_payloads) == 1
assert sent_payloads[0]['ansible_facts'] == record.ansible_facts
assert sent_payloads[0]['level'] == 'INFO'
assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking'
assert sent_payloads[0]['name'] == 'ansible'
assert sent_payloads[0]['version'] == '2.2.1.0'
assert sent_payloads[1]['level'] == 'INFO'
assert sent_payloads[1]['logger_name'] == 'awx.analytics.system_tracking'
assert sent_payloads[1]['name'] == 'ansible-tower'
assert sent_payloads[1]['version'] == '3.1.0'
@pytest.mark.parametrize('host, port, normalized, hostname_only', [

View File

@ -10,7 +10,7 @@ def test_produce_supervisor_command(mocker):
with mocker.patch.object(reload.subprocess, 'Popen', Popen_mock):
reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart")
reload.subprocess.Popen.assert_called_once_with(
['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher'],
['supervisorctl', 'restart', 'tower-processes:receiver',],
stderr=-1, stdin=-1, stdout=-1)

View File

@ -44,7 +44,7 @@ class LogstashFormatter(LogstashFormatterVersion1):
'processName', 'relativeCreated', 'thread', 'threadName', 'extra',
'auth_token', 'tags', 'host', 'host_id', 'level', 'port', 'uuid'))
if kind == 'system_tracking':
data = copy(raw_data['facts_data'])
data = copy(raw_data['ansible_facts'])
elif kind == 'job_events':
data = copy(raw_data['event_model_data'])
else:
@ -99,17 +99,14 @@ class LogstashFormatter(LogstashFormatterVersion1):
val = self.format_timestamp(time_float)
data_for_log[key] = val
elif kind == 'system_tracking':
module_name = raw_data['module_name']
if module_name in ['services', 'packages', 'files']:
data_for_log[module_name] = index_by_name(data)
elif module_name == 'ansible':
data_for_log['ansible'] = data
# Remove sub-keys with data type conflicts in elastic search
data_for_log['ansible'].pop('ansible_python_version', None)
data_for_log['ansible']['ansible_python'].pop('version_info', None)
else:
data_for_log['facts'] = data
data_for_log['module_name'] = module_name
data.pop('ansible_python_version', None)
if 'ansible_python' in data:
data['ansible_python'].pop('version_info', None)
data_for_log['ansible_facts'] = data
data_for_log['ansible_facts_modified'] = raw_data['ansible_facts_modified']
data_for_log['inventory_id'] = raw_data['inventory_id']
data_for_log['host_name'] = raw_data['host_name']
elif kind == 'performance':
request = raw_data['python_objects']['request']
response = raw_data['python_objects']['response']

View File

@ -11,7 +11,6 @@ import socket
import select
import six
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from requests.exceptions import RequestException
# loggly
@ -113,28 +112,10 @@ class BaseHandler(logging.Handler):
"""
return payload
def _send_and_queue_system_tracking(self, payload_data):
# Special action for System Tracking, queue up multiple log messages
ret = []
module_name = payload_data['module_name']
if module_name in ['services', 'packages', 'files']:
facts_dict = payload_data.pop(module_name)
for key in facts_dict:
fact_payload = copy(payload_data)
fact_payload.update(facts_dict[key])
ret.append(self._send(fact_payload))
return ret
def _format_and_send_record(self, record):
ret = []
payload = self.format(record)
if self.indv_facts:
payload_data = json.loads(payload)
if record.name.startswith('awx.analytics.system_tracking'):
ret.extend(self._send_and_queue_system_tracking(payload_data))
if len(ret) == 0:
ret.append(self._send(payload))
return ret
return [self._send(json.loads(self.format(record)))]
return [self._send(self.format(record))]
def _skip_log(self, logger_name):
if self.host == '' or (not self.enabled_flag):

View File

@ -30,100 +30,99 @@
# POSSIBILITY OF SUCH DAMAGE.
import os
import time
import memcache
import json
import datetime
from dateutil import parser
from dateutil.tz import tzutc
from ansible import constants as C
try:
from ansible.cache.base import BaseCacheModule
except:
from ansible.plugins.cache.base import BaseCacheModule
from kombu import Connection, Exchange, Producer
class CacheModule(BaseCacheModule):
def __init__(self, *args, **kwargs):
# Basic in-memory caching for typical runs
self._cache = {}
self._all_keys = {}
self.mc = memcache.Client([C.CACHE_PLUGIN_CONNECTION], debug=0)
self._timeout = int(C.CACHE_PLUGIN_TIMEOUT)
self._inventory_id = os.environ['INVENTORY_ID']
self.date_key = time.time()
self.callback_connection = os.environ['CALLBACK_CONNECTION']
self.callback_queue = os.environ['FACT_QUEUE']
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.callback_queue, type='direct')
self.producer = Producer(self.connection)
@property
def host_names_key(self):
return '{}'.format(self._inventory_id)
def filter_ansible_facts(self, facts):
return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_'))
def translate_host_key(self, host_name):
return '{}-{}'.format(self._inventory_id, host_name)
def identify_new_module(self, key, value):
# Return the first key found that doesn't exist in the
# previous set of facts
if key in self._all_keys:
for k in value.iterkeys():
if k not in self._all_keys[key] and not k.startswith('ansible_'):
return k
# First time we have seen facts from this host
# it's either ansible facts or a module facts (including module_setup)
elif len(value) == 1:
return value.iterkeys().next()
return None
def translate_modified_key(self, host_name):
return '{}-{}-modified'.format(self._inventory_id, host_name)
def get(self, key):
return self._cache.get(key)
host_key = self.translate_host_key(key)
modified_key = self.translate_modified_key(key)
'''
get() returns a reference to the fact object (usually a dict). The object is modified directly,
then set is called. Effectively, pre-determining the set logic.
'''
Cache entry expired
'''
modified = self.mc.get(modified_key)
if modified is None:
raise KeyError
modified = parser.parse(modified).replace(tzinfo=tzutc())
now_utc = datetime.datetime.now(tzutc())
if self._timeout != 0 and (modified + datetime.timedelta(seconds=self._timeout)) < now_utc:
raise KeyError
The below logic creates a backup of the cache each set. The values are now preserved across set() calls.
value_json = self.mc.get(host_key)
if value_json is None:
raise KeyError
try:
return json.loads(value_json)
# If cache entry is corrupt or bad, fail gracefully.
except (TypeError, ValueError):
self.delete(key)
raise KeyError
For a given key. The previous value is looked at for new keys that aren't of the form 'ansible_'.
If found, send the value of the found key.
If not found, send all the key value pairs of the form 'ansible_' (we presume set() is called because
of an ansible fact module invocation)
More simply stated...
In value, if a new key is found at the top most dict then consider this a module request and only
emit the facts for the found top-level key.
If a new key is not found, assume set() was called as a result of ansible facts scan. Thus, emit
all facts of the form 'ansible_'.
'''
def set(self, key, value):
module = self.identify_new_module(key, value)
# Assume ansible fact triggered the set if no new module found
facts = self.filter_ansible_facts(value) if not module else dict({ module : value[module]})
self._cache[key] = value
self._all_keys[key] = value.keys()
packet = {
'host': key,
'inventory_id': os.environ['INVENTORY_ID'],
'job_id': os.getenv('JOB_ID', ''),
'facts': facts,
'date_key': self.date_key,
}
host_key = self.translate_host_key(key)
modified_key = self.translate_modified_key(key)
# Emit fact data to tower for processing
self.producer.publish(packet,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.callback_queue)
self.mc.set(host_key, json.dumps(value))
self.mc.set(modified_key, datetime.datetime.now(tzutc()).isoformat())
def keys(self):
return self._cache.keys()
return self.mc.get(self.host_names_key)
def contains(self, key):
return key in self._cache
try:
self.get(key)
return True
except KeyError:
return False
def delete(self, key):
del self._cache[key]
self.mc.delete(self.translate_host_key(key))
self.mc.delete(self.translate_modified_key(key))
def flush(self):
self._cache = {}
host_names = self.mc.get(self.host_names_key)
if not host_names:
return
for k in host_names:
self.mc.delete(self.translate_host_key(k))
self.mc.delete(self.translate_modified_key(k))
def copy(self):
return self._cache.copy()
ret = dict()
host_names = self.mc.get(self.host_names_key)
if not host_names:
return
for k in host_names:
ret[k] = self.mc.get(self.translate_host_key(k))
return ret

View File

@ -1053,9 +1053,6 @@ LOGGING = {
'awx.main.consumers': {
'handlers': ['null']
},
'awx.main.commands.run_fact_cache_receiver': {
'handlers': ['fact_receiver'],
},
'awx.main.access': {
'handlers': ['null'],
'propagate': False,

View File

@ -137,7 +137,6 @@ SERVICE_NAME_DICT = {
"runworker": "channels",
"uwsgi": "uwsgi",
"daphne": "daphne",
"fact": "factcacher",
"nginx": "nginx"}
# Used for sending commands in automatic restart
UWSGI_FIFO_LOCATION = '/awxfifo'

View File

@ -66,8 +66,7 @@ SERVICE_NAME_DICT = {
"callback": "awx-callback-receiver",
"channels": "awx-channels-worker",
"uwsgi": "awx-uwsgi",
"daphne": "awx-daphne",
"fact": "awx-fact-cache-receiver"}
"daphne": "awx-daphne"}
# Used for sending commands in automatic restart
UWSGI_FIFO_LOCATION = '/var/lib/awx/awxfifo'

19
docs/fact_cache.md Normal file
View File

@ -0,0 +1,19 @@
# Tower as an Ansible Fact Cache
Tower can store and retrieve per-host facts via an Ansible Fact Cache Plugin. This behavior is configurable on a per-job-template basis. When enabled, Tower will serve fact requests for all Hosts in an Inventory related to the Job running. This allows users to use Job Templates with `--limit` while still having access to the entire Inventory of Host facts. The Tower Ansible Fact Cache supports a global timeout settings that it enforces per-host. The setting is available in the CTiT interface under the Jobs category with the name `ANSIBLE_FACT_CACHE_TIMEOUT` and is in seconds.
## Tower Fact Cache Implementation Details
### Tower Injection
In order to understand the behavior of Tower as a fact cache you will need to understand how fact caching is achieved in Tower. Upon a Job invocation with `use_fact_cache=True`, Tower will inject, into memcached, all `ansible_facts` associated with each Host in the Inventory associated with the Job. The cache key is of the form `inventory_id-host_name` so that hosts with the same name in different inventories do not clash. A list of all hosts in the inventory is also injected into memcached with key `inventory_id` and value `[host_name1, host_name2, ..., host_name3]`.
### Ansible plugin usage
The fact cache plugin running in Ansible will connect to the same memcached instance. A `get()` call to the fact cache interface in Ansible will result in a looked into memcached for the host-specific set of facts. A `set()` call to the fact cache will result in an update to memcached record along with the modified time.
### Tower Cache to DB
When a Job finishes running that has `use_fact_cache=True` enabled, Tower will go through memcached and get all records for the hosts in the Inventory. Any records with update times newer than the database per-host `ansible_facts_modified` value will result in the `ansible_facts`, `ansible_facts_modified` from memcached being saved to the database.
### Caching Behavior
Tower will always inject the host `ansible_facts` into memcached. The Ansible Tower Fact Cache Plugin will choose to present the cached values to the user or not based on the per-host `ansible_facts_modified` time and the global `ANSIBLE_FACT_CACHE_TIMEOUT`.
## Tower Fact Logging
New and changed facts will be logged via Tower's logging facility. Specifically, to the `system_tracking` namespace or logger. The logging payload will include the fields: `host_name`, `inventory_id`, and `ansible_facts`. Where `ansible_facts` is a dictionary of all ansible facts for `host_name` in Tower Inventory `inventory_id`.

View File

@ -13,7 +13,7 @@ from `InventorySource` completely in Tower 3.3. As a result the related field on
## Fact Searching
Facts generated by an Ansible playbook during a Job Template run are stored by Tower into the database
whenever `store_facts=True` is set per-Job-Template. New facts are merged with existing
whenever `use_fact_cache=True` is set per-Job-Template. New facts are merged with existing
facts and are per-host. These stored facts can be used to filter hosts via the
`/api/v2/hosts` endpoint, using the GET query parameter `host_filter` i.e.
`/api/v2/hosts?host_filter=ansible_facts__ansible_processor_vcpus=8`

View File

@ -3,6 +3,5 @@ runworker: make runworker
daphne: make daphne
celeryd: make celeryd
receiver: make receiver
factcacher: make factcacher
flower: make flower
uwsgi: make uwsgi

View File

@ -43,14 +43,6 @@ redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:factcacher]
command = python manage.py run_fact_cache_receiver
autostart = true
autorestart = true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[program:nginx]
command = nginx -g "daemon off;"
autostart = true
@ -68,7 +60,7 @@ stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
[group:tower-processes]
programs=celeryd,receiver,runworker,uwsgi,daphne,factcacher,nginx,flower
programs=celeryd,receiver,runworker,uwsgi,daphne,nginx,flower
priority=5
[unix_http_server]

View File

@ -3,6 +3,5 @@ runworker: make runworker
daphne: make daphne
celeryd: make celeryd EXTRA_GROUP_QUEUES=thepentagon
receiver: make receiver
factcacher: make factcacher
flower: make flower
uwsgi: make uwsgi