From 626e2d1c9b34450f240ec3a131f84d49a9b41048 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 13 Jun 2017 12:41:35 -0400 Subject: [PATCH 01/10] tower fact cache implementation * Tower now injects facts into jobs via memcached for use by Ansible playbooks. On the Ansible side, this is accomplished by the existing mechanism, an Ansible Fact Cache Plugin + memcached. On the Tower side, memcached is leveraged heavily. --- awx/main/models/jobs.py | 73 +++++++++++++++ awx/main/tasks.py | 9 ++ awx/main/tests/unit/models/test_jobs.py | 115 ++++++++++++++++++++++++ awx/plugins/fact_caching/tower.py | 103 ++++++++------------- 4 files changed, 232 insertions(+), 68 deletions(-) create mode 100644 awx/main/tests/unit/models/test_jobs.py diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 5a83491a97..b89697b69c 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -12,6 +12,8 @@ from urlparse import urljoin # Django from django.conf import settings from django.db import models +#from django.core.cache import cache +import memcache from django.db.models import Q, Count from django.utils.dateparse import parse_datetime from django.utils.encoding import force_text @@ -38,6 +40,8 @@ from awx.main.fields import JSONField from awx.main.consumers import emit_channel_notification +TIMEOUT = 60 + logger = logging.getLogger('awx.main.models.jobs') analytics_logger = logging.getLogger('awx.analytics.job_events') @@ -703,6 +707,74 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): self.project_update.cancel(job_explanation=job_explanation) return res + @property + def store_facts_enabled(self): + if not self.job_template or self.job_template is False: + return False + return True + + @property + def memcached_fact_key(self): + return '{}'.format(self.inventory.id) + + def memcached_fact_host_key(self, host_name): + return '{}-{}'.format(self.inventory.id, host_name) + + def memcached_fact_modified_key(self, host_name): + return '{}-{}-modified'.format(self.inventory.id, host_name) + + def _get_inventory_hosts(self, only=['name', 'ansible_facts', 'modified',]): + return self.inventory.hosts.only(*only) + + def _get_memcache_connection(self): + return memcache.Client([settings.CACHES['default']['LOCATION']], debug=0) + + def start_job_fact_cache(self): + if not self.inventory: + return + + cache = self._get_memcache_connection() + + host_names = [] + + for host in self._get_inventory_hosts(): + host_key = self.memcached_fact_host_key(host.name) + modified_key = self.memcached_fact_modified_key(host.name) + # Only add host/facts if host doesn't already exist in the cache + if cache.get(modified_key) is None: + cache.set(host_key, host.ansible_facts) + cache.set(modified_key, False) + + host_names.append(host.name) + + cache.set(self.memcached_fact_key, host_names) + + def finish_job_fact_cache(self): + if not self.inventory: + # TODO: Uh oh, we need to clean up the cache + return + + cache = self._get_memcache_connection() + + hosts = self._get_inventory_hosts() + for host in hosts: + host_key = self.memcached_fact_host_key(host.name) + modified_key = self.memcached_fact_modified_key(host.name) + + modified = cache.get(modified_key) + if modified is None: + continue + + # Save facts that have changed + if modified: + ansible_facts = cache.get(host_key) + if ansible_facts is None: + cache.delete(host_key) + # TODO: Log cache inconsistency + continue + host.ansible_facts = ansible_facts + host.save() + class JobHostSummary(CreatedModifiedModel): ''' @@ -1357,3 +1429,4 @@ class SystemJob(UnifiedJob, SystemJobOptions, JobNotificationMixin): def get_notification_friendly_name(self): return "System Job" + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 19bd26bb1f..665389db77 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -877,6 +877,9 @@ class RunJob(BaseTask): # callbacks to work. env['JOB_ID'] = str(job.pk) env['INVENTORY_ID'] = str(job.inventory.pk) + if job.store_facts_enabled: + env['MEMCACHED_PREPEND_KEY'] = job.memcached_fact_key + env['MEMCACHED_LOCATION'] = settings.CACHES['default']['LOCATION'] if job.project: env['PROJECT_REVISION'] = job.project.scm_revision env['ANSIBLE_RETRY_FILES_ENABLED'] = "False" @@ -1140,8 +1143,14 @@ class RunJob(BaseTask): ('project_update', local_project_sync.name, local_project_sync.id))) raise + if job.store_facts_enabled: + job.start_job_fact_cache() + + def final_run_hook(self, job, status, **kwargs): super(RunJob, self).final_run_hook(job, status, **kwargs) + if job.store_facts_enabled: + job.finish_job_fact_cache() try: inventory = job.inventory except Inventory.DoesNotExist: diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py new file mode 100644 index 0000000000..8c75541bdf --- /dev/null +++ b/awx/main/tests/unit/models/test_jobs.py @@ -0,0 +1,115 @@ +import pytest + +from awx.main.models import ( + Job, + Inventory, + Host, +) + + +class CacheMock(object): + def __init__(self): + self.d = dict() + + def get(self, key): + if key not in self.d: + return None + return self.d[key] + + def set(self, key, val): + self.d[key] = val + + def delete(self, key): + del self.d[key] + + +@pytest.fixture +def hosts(): + return [ + Host(name='host1', ansible_facts={"a": 1, "b": 2}), + Host(name='host2', ansible_facts={"a": 1, "b": 2}), + Host(name='host3', ansible_facts={"a": 1, "b": 2}), + ] + + +@pytest.fixture +def hosts2(): + return [ + Host(name='host2', ansible_facts="foobar"), + ] + + +@pytest.fixture +def inventory(): + return Inventory(id=5) + + +@pytest.fixture +def mock_cache(mocker): + cache = CacheMock() + mocker.patch.object(cache, 'set', wraps=cache.set) + mocker.patch.object(cache, 'get', wraps=cache.get) + mocker.patch.object(cache, 'delete', wraps=cache.delete) + return cache + + +@pytest.fixture +def job(mocker, hosts, inventory, mock_cache): + j = Job(inventory=inventory, id=2) + j._get_inventory_hosts = mocker.Mock(return_value=hosts) + j._get_memcache_connection = mocker.Mock(return_value=mock_cache) + return j + + +@pytest.fixture +def job2(mocker, hosts2, inventory, mock_cache): + j = Job(inventory=inventory, id=3) + j._get_inventory_hosts = mocker.Mock(return_value=hosts2) + j._get_memcache_connection = mocker.Mock(return_value=mock_cache) + return j + + +def test_start_job_fact_cache(hosts, job, inventory, mocker): + + job.start_job_fact_cache() + + job._get_memcache_connection().set.assert_any_call('{}'.format(5), [h.name for h in hosts]) + for host in hosts: + job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), host.ansible_facts) + job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), False) + + +def test_start_job_fact_cache_existing_host(hosts, hosts2, job, job2, inventory, mocker): + + job.start_job_fact_cache() + + for host in hosts: + job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), host.ansible_facts) + job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), False) + + job._get_memcache_connection().set.reset_mock() + + job2.start_job_fact_cache() + + # Ensure hosts2 ansible_facts didn't overwrite hosts ansible_facts + ansible_facts_cached = job._get_memcache_connection().get('{}-{}'.format(5, hosts2[0].name)) + assert ansible_facts_cached == hosts[1].ansible_facts + + +def test_finish_job_fact_cache(job, hosts, inventory, mocker): + + job.start_job_fact_cache() + + host = hosts[1] + host_key = job.memcached_fact_host_key(host.name) + modified_key = job.memcached_fact_modified_key(host.name) + host.save = mocker.Mock() + + job._get_memcache_connection().set(host_key, 'blah') + job._get_memcache_connection().set(modified_key, True) + + job.finish_job_fact_cache() + + assert host.ansible_facts == 'blah' + host.save.assert_called_once_with() + diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 427cce8501..353c49010a 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -30,100 +30,67 @@ # POSSIBILITY OF SUCH DAMAGE. import os -import time +import memcache +import json try: from ansible.cache.base import BaseCacheModule except: from ansible.plugins.cache.base import BaseCacheModule -from kombu import Connection, Exchange, Producer - class CacheModule(BaseCacheModule): def __init__(self, *args, **kwargs): # Basic in-memory caching for typical runs - self._cache = {} - self._all_keys = {} + self.mc = memcache.Client([os.environ['MEMCACHED_LOCATION']], debug=0) + self.inventory_id = os.environ['INVENTORY_ID'] - self.date_key = time.time() - self.callback_connection = os.environ['CALLBACK_CONNECTION'] - self.callback_queue = os.environ['FACT_QUEUE'] - self.connection = Connection(self.callback_connection) - self.exchange = Exchange(self.callback_queue, type='direct') - self.producer = Producer(self.connection) + @property + def host_names_key(self): + return '{}'.format(self.inventory_id) - def filter_ansible_facts(self, facts): - return dict((k, facts[k]) for k in facts.keys() if k.startswith('ansible_')) + def translate_host_key(self, host_name): + return '{}-{}'.format(self.inventory_id, host_name) - def identify_new_module(self, key, value): - # Return the first key found that doesn't exist in the - # previous set of facts - if key in self._all_keys: - for k in value.iterkeys(): - if k not in self._all_keys[key] and not k.startswith('ansible_'): - return k - # First time we have seen facts from this host - # it's either ansible facts or a module facts (including module_setup) - elif len(value) == 1: - return value.iterkeys().next() - return None + def translate_modified_key(self, host_name): + return '{}-{}-modified'.format(self.inventory_id, host_name) def get(self, key): - return self._cache.get(key) + host_key = self.translate_host_key(key) + value_json = self.mc.get(host_key) + if not value_json: + raise KeyError + return json.loads(value_json) - ''' - get() returns a reference to the fact object (usually a dict). The object is modified directly, - then set is called. Effectively, pre-determining the set logic. - - The below logic creates a backup of the cache each set. The values are now preserved across set() calls. - - For a given key. The previous value is looked at for new keys that aren't of the form 'ansible_'. - If found, send the value of the found key. - If not found, send all the key value pairs of the form 'ansible_' (we presume set() is called because - of an ansible fact module invocation) - - More simply stated... - In value, if a new key is found at the top most dict then consider this a module request and only - emit the facts for the found top-level key. - - If a new key is not found, assume set() was called as a result of ansible facts scan. Thus, emit - all facts of the form 'ansible_'. - ''' def set(self, key, value): - module = self.identify_new_module(key, value) - # Assume ansible fact triggered the set if no new module found - facts = self.filter_ansible_facts(value) if not module else dict({ module : value[module]}) - self._cache[key] = value - self._all_keys[key] = value.keys() - packet = { - 'host': key, - 'inventory_id': os.environ['INVENTORY_ID'], - 'job_id': os.getenv('JOB_ID', ''), - 'facts': facts, - 'date_key': self.date_key, - } + host_key = self.translate_host_key(key) + modified_key = self.translate_modified_key(key) - # Emit fact data to tower for processing - self.producer.publish(packet, - serializer='json', - compression='bzip2', - exchange=self.exchange, - declare=[self.exchange], - routing_key=self.callback_queue) + self.mc.set(host_key, json.dumps(value)) + self.mc.set(modified_key, True) def keys(self): - return self._cache.keys() + return self.mc.get(self.host_names_key) def contains(self, key): - return key in self._cache + val = self.mc.get(key) + if val is None: + return False + return True def delete(self, key): - del self._cache[key] + self.mc.delete(self.translate_host_key(key)) + self.mc.delete(self.translate_modified_key(key)) def flush(self): - self._cache = {} + for k in self.mc.get(self.host_names_key): + self.mc.delete(self.translate_host_key(k)) + self.mc.delete(self.translate_modified_key(k)) def copy(self): - return self._cache.copy() + ret = dict() + for k in self.mc.get(self.host_names_key): + ret[k] = self.mc.get(self.translate_host_key(k)) + return ret + From 817dbe8d33d4561d0f9a76a66fc3d8dcc15d3a96 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Fri, 16 Jun 2017 08:36:32 -0400 Subject: [PATCH 02/10] add fact modified time --- awx/main/conf.py | 13 ++++++++++ awx/main/migrations/0038_v320_release.py | 5 ++++ awx/main/models/inventory.py | 6 +++++ awx/main/models/jobs.py | 9 ++----- awx/main/tasks.py | 21 +++++++--------- awx/plugins/fact_caching/tower.py | 32 +++++++++++++++++------- 6 files changed, 58 insertions(+), 28 deletions(-) diff --git a/awx/main/conf.py b/awx/main/conf.py index fa98ba27d7..f53ef23f35 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -228,6 +228,19 @@ register( category_slug='jobs', ) +register( + 'ANSIBLE_FACT_CACHE_TIMEOUT', + field_class=fields.IntegerField, + min_value=0, + default=0, + label=_('Per-Host Ansible Fact Cache Timeout'), + help_text=_('Maximum time, in seconds, that Tower stored Ansible facts are considered valid since ' + 'the last time they were modified. Only valid, non-stale, facts will be accessible by ' + 'a playbook. Note, this does not influence the deletion of ansible_facts from the database.'), + category=_('Jobs'), + category_slug='jobs', +) + register( 'LOG_AGGREGATOR_HOST', field_class=fields.CharField, diff --git a/awx/main/migrations/0038_v320_release.py b/awx/main/migrations/0038_v320_release.py index 585b42ce04..f04d280727 100644 --- a/awx/main/migrations/0038_v320_release.py +++ b/awx/main/migrations/0038_v320_release.py @@ -81,6 +81,11 @@ class Migration(migrations.Migration): name='ansible_facts', field=awx.main.fields.JSONBField(default={}, help_text='Arbitrary JSON structure of most recent ansible_facts, per-host.', blank=True), ), + migrations.AddField( + model_name='host', + name='ansible_facts_modified', + field=models.DateTimeField(default=None, help_text='The date and time ansible_facts was last modified.', null=True, editable=False), + ), migrations.AddField( model_name='job', name='store_facts', diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 05a5cd2364..3c92b31391 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -448,6 +448,12 @@ class Host(CommonModelNameNotUnique): default={}, help_text=_('Arbitrary JSON structure of most recent ansible_facts, per-host.'), ) + ansible_facts_modified = models.DateTimeField( + default=None, + editable=False, + null=True, + help_text=_('The date and time ansible_facts was last modified.'), + ) insights_system_id = models.TextField( blank=True, default=None, diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index b89697b69c..0c4979be54 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -7,6 +7,7 @@ import hashlib import hmac import logging import time +import json from urlparse import urljoin # Django @@ -707,12 +708,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): self.project_update.cancel(job_explanation=job_explanation) return res - @property - def store_facts_enabled(self): - if not self.job_template or self.job_template is False: - return False - return True - @property def memcached_fact_key(self): return '{}'.format(self.inventory.id) @@ -742,7 +737,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): modified_key = self.memcached_fact_modified_key(host.name) # Only add host/facts if host doesn't already exist in the cache if cache.get(modified_key) is None: - cache.set(host_key, host.ansible_facts) + cache.set(host_key, json.dumps(host.ansible_facts)) cache.set(modified_key, False) host_names.append(host.name) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 665389db77..dd344ea587 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -773,6 +773,7 @@ class BaseTask(Task): self.final_run_hook(instance, status, **kwargs) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): + print("Status is not successful!") # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute if status == 'canceled': @@ -877,9 +878,12 @@ class RunJob(BaseTask): # callbacks to work. env['JOB_ID'] = str(job.pk) env['INVENTORY_ID'] = str(job.inventory.pk) - if job.store_facts_enabled: - env['MEMCACHED_PREPEND_KEY'] = job.memcached_fact_key - env['MEMCACHED_LOCATION'] = settings.CACHES['default']['LOCATION'] + if job.store_facts: + env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') + env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') + env['ANSIBLE_CACHE_PLUGIN'] = "tower" + env['ANSIBLE_FACT_CACHE_TIMEOUT'] = str(settings.ANSIBLE_FACT_CACHE_TIMEOUT) + env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else '' if job.project: env['PROJECT_REVISION'] = job.project.scm_revision env['ANSIBLE_RETRY_FILES_ENABLED'] = "False" @@ -954,13 +958,6 @@ class RunJob(BaseTask): if authorize: env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password') - # Set environment variables related to gathering facts from the cache - if (job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True) and not kwargs.get('isolated'): - env['FACT_QUEUE'] = settings.FACT_QUEUE - env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') - env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') - env['ANSIBLE_CACHE_PLUGIN'] = "tower" - env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT) return env def build_args(self, job, **kwargs): @@ -1143,13 +1140,13 @@ class RunJob(BaseTask): ('project_update', local_project_sync.name, local_project_sync.id))) raise - if job.store_facts_enabled: + if job.store_facts: job.start_job_fact_cache() def final_run_hook(self, job, status, **kwargs): super(RunJob, self).final_run_hook(job, status, **kwargs) - if job.store_facts_enabled: + if job.store_facts: job.finish_job_fact_cache() try: inventory = job.inventory diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 353c49010a..9310f693ad 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -33,6 +33,8 @@ import os import memcache import json +from ansible import constants as C + try: from ansible.cache.base import BaseCacheModule except: @@ -42,8 +44,8 @@ except: class CacheModule(BaseCacheModule): def __init__(self, *args, **kwargs): - # Basic in-memory caching for typical runs - self.mc = memcache.Client([os.environ['MEMCACHED_LOCATION']], debug=0) + self.mc = memcache.Client([C.CACHE_PLUGIN_CONNECTION], debug=0) + self.timeout = int(C.CACHE_PLUGIN_TIMEOUT) self.inventory_id = os.environ['INVENTORY_ID'] @property @@ -59,9 +61,14 @@ class CacheModule(BaseCacheModule): def get(self, key): host_key = self.translate_host_key(key) value_json = self.mc.get(host_key) - if not value_json: + if value_json is None: + raise KeyError + try: + return json.loads(value_json) + # If cache entry is corrupt or bad, fail gracefully. + except (TypeError, ValueError): + self.delete(key) raise KeyError - return json.loads(value_json) def set(self, key, value): host_key = self.translate_host_key(key) @@ -74,7 +81,8 @@ class CacheModule(BaseCacheModule): return self.mc.get(self.host_names_key) def contains(self, key): - val = self.mc.get(key) + host_key = self.translate_host_key(key) + val = self.mc.get(host_key) if val is None: return False return True @@ -84,13 +92,19 @@ class CacheModule(BaseCacheModule): self.mc.delete(self.translate_modified_key(key)) def flush(self): - for k in self.mc.get(self.host_names_key): + host_names = self.mc.get(self.host_names_key) + if not host_names: + return + + for k in host_names: self.mc.delete(self.translate_host_key(k)) self.mc.delete(self.translate_modified_key(k)) def copy(self): ret = dict() - for k in self.mc.get(self.host_names_key): - ret[k] = self.mc.get(self.translate_host_key(k)) - return ret + host_names = self.mc.get(self.host_names_key) + if not host_names: + return + + return [self.mc.get(self.translate_host_key(k)) for k in host_names] From 12cdbcf8b52dbb57081e65bfdf6fcc9faa1498bf Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 19 Jun 2017 10:59:37 -0400 Subject: [PATCH 03/10] adds per-host timeout --- awx/main/models/jobs.py | 21 ++++++---- awx/main/tests/unit/models/test_jobs.py | 52 ++++++++++++++++--------- awx/plugins/fact_caching/tower.py | 40 +++++++++++++------ 3 files changed, 76 insertions(+), 37 deletions(-) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 0c4979be54..8384a8f7a9 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -17,6 +17,8 @@ from django.db import models import memcache from django.db.models import Q, Count from django.utils.dateparse import parse_datetime +from dateutil import parser +from dateutil.tz import tzutc from django.utils.encoding import force_text from django.utils.timezone import utc from django.utils.translation import ugettext_lazy as _ @@ -41,8 +43,6 @@ from awx.main.fields import JSONField from awx.main.consumers import emit_channel_notification -TIMEOUT = 60 - logger = logging.getLogger('awx.main.models.jobs') analytics_logger = logging.getLogger('awx.analytics.job_events') @@ -735,10 +735,14 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): for host in self._get_inventory_hosts(): host_key = self.memcached_fact_host_key(host.name) modified_key = self.memcached_fact_modified_key(host.name) - # Only add host/facts if host doesn't already exist in the cache + if cache.get(modified_key) is None: + if host.ansible_facts_modified: + host_modified = host.ansible_facts_modified.replace(tzinfo=tzutc()).isoformat() + else: + host_modified = datetime.datetime.now(tzutc()).isoformat() cache.set(host_key, json.dumps(host.ansible_facts)) - cache.set(modified_key, False) + cache.set(modified_key, host_modified) host_names.append(host.name) @@ -746,7 +750,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): def finish_job_fact_cache(self): if not self.inventory: - # TODO: Uh oh, we need to clean up the cache return cache = self._get_memcache_connection() @@ -758,16 +761,18 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): modified = cache.get(modified_key) if modified is None: + cache.delete(host_key) continue - # Save facts that have changed - if modified: + # Save facts if cache is newer than DB + modified = parser.parse(modified, tzinfos=[tzutc()]) + if not host.ansible_facts_modified or modified > host.ansible_facts_modified: ansible_facts = cache.get(host_key) if ansible_facts is None: cache.delete(host_key) - # TODO: Log cache inconsistency continue host.ansible_facts = ansible_facts + host.ansible_facts_modified = modified host.save() diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py index 8c75541bdf..afc833a47a 100644 --- a/awx/main/tests/unit/models/test_jobs.py +++ b/awx/main/tests/unit/models/test_jobs.py @@ -6,6 +6,10 @@ from awx.main.models import ( Host, ) +import datetime +import json +from dateutil.tz import tzutc + class CacheMock(object): def __init__(self): @@ -24,18 +28,28 @@ class CacheMock(object): @pytest.fixture -def hosts(): +def old_time(): + return (datetime.datetime.now(tzutc()) - datetime.timedelta(minutes=60)) + + +@pytest.fixture() +def new_time(): + return (datetime.datetime.now(tzutc())) + + +@pytest.fixture +def hosts(old_time): return [ - Host(name='host1', ansible_facts={"a": 1, "b": 2}), - Host(name='host2', ansible_facts={"a": 1, "b": 2}), - Host(name='host3', ansible_facts={"a": 1, "b": 2}), + Host(name='host1', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time), + Host(name='host2', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time), + Host(name='host3', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time), ] @pytest.fixture def hosts2(): return [ - Host(name='host2', ansible_facts="foobar"), + Host(name='host2', ansible_facts="foobar", ansible_facts_modified=old_time), ] @@ -75,8 +89,8 @@ def test_start_job_fact_cache(hosts, job, inventory, mocker): job._get_memcache_connection().set.assert_any_call('{}'.format(5), [h.name for h in hosts]) for host in hosts: - job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), host.ansible_facts) - job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), False) + job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), json.dumps(host.ansible_facts)) + job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), host.ansible_facts_modified.isoformat()) def test_start_job_fact_cache_existing_host(hosts, hosts2, job, job2, inventory, mocker): @@ -84,8 +98,8 @@ def test_start_job_fact_cache_existing_host(hosts, hosts2, job, job2, inventory, job.start_job_fact_cache() for host in hosts: - job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), host.ansible_facts) - job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), False) + job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), json.dumps(host.ansible_facts)) + job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), host.ansible_facts_modified.isoformat()) job._get_memcache_connection().set.reset_mock() @@ -93,23 +107,25 @@ def test_start_job_fact_cache_existing_host(hosts, hosts2, job, job2, inventory, # Ensure hosts2 ansible_facts didn't overwrite hosts ansible_facts ansible_facts_cached = job._get_memcache_connection().get('{}-{}'.format(5, hosts2[0].name)) - assert ansible_facts_cached == hosts[1].ansible_facts + assert ansible_facts_cached == json.dumps(hosts[1].ansible_facts) -def test_finish_job_fact_cache(job, hosts, inventory, mocker): +def test_finish_job_fact_cache(job, hosts, inventory, mocker, new_time): job.start_job_fact_cache() + for h in hosts: + h.save = mocker.Mock() - host = hosts[1] - host_key = job.memcached_fact_host_key(host.name) - modified_key = job.memcached_fact_modified_key(host.name) - host.save = mocker.Mock() + host_key = job.memcached_fact_host_key(hosts[1].name) + modified_key = job.memcached_fact_modified_key(hosts[1].name) job._get_memcache_connection().set(host_key, 'blah') - job._get_memcache_connection().set(modified_key, True) + job._get_memcache_connection().set(modified_key, new_time.isoformat()) job.finish_job_fact_cache() - assert host.ansible_facts == 'blah' - host.save.assert_called_once_with() + hosts[0].save.assert_not_called() + hosts[2].save.assert_not_called() + assert hosts[1].ansible_facts == 'blah' + hosts[1].save.assert_called_once_with() diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 9310f693ad..c588ac848f 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -32,6 +32,9 @@ import os import memcache import json +import datetime +from dateutil import parser +from dateutil.tz import tzutc from ansible import constants as C @@ -45,21 +48,34 @@ class CacheModule(BaseCacheModule): def __init__(self, *args, **kwargs): self.mc = memcache.Client([C.CACHE_PLUGIN_CONNECTION], debug=0) - self.timeout = int(C.CACHE_PLUGIN_TIMEOUT) - self.inventory_id = os.environ['INVENTORY_ID'] + self._timeout = int(C.CACHE_PLUGIN_TIMEOUT) + self._inventory_id = os.environ['INVENTORY_ID'] @property def host_names_key(self): - return '{}'.format(self.inventory_id) + return '{}'.format(self._inventory_id) def translate_host_key(self, host_name): - return '{}-{}'.format(self.inventory_id, host_name) + return '{}-{}'.format(self._inventory_id, host_name) def translate_modified_key(self, host_name): - return '{}-{}-modified'.format(self.inventory_id, host_name) + return '{}-{}-modified'.format(self._inventory_id, host_name) def get(self, key): host_key = self.translate_host_key(key) + modified_key = self.translate_modified_key(key) + + ''' + Cache entry expired + ''' + modified = self.mc.get(modified_key) + if modified is None: + raise KeyError + modified = parser.parse(modified).replace(tzinfo=tzutc()) + now_utc = datetime.datetime.now(tzutc()) + if self._timeout != 0 and (modified + datetime.timedelta(seconds=self._timeout)) < now_utc: + raise KeyError + value_json = self.mc.get(host_key) if value_json is None: raise KeyError @@ -75,17 +91,17 @@ class CacheModule(BaseCacheModule): modified_key = self.translate_modified_key(key) self.mc.set(host_key, json.dumps(value)) - self.mc.set(modified_key, True) + self.mc.set(modified_key, datetime.datetime.now(tzutc()).isoformat()) def keys(self): return self.mc.get(self.host_names_key) def contains(self, key): - host_key = self.translate_host_key(key) - val = self.mc.get(host_key) - if val is None: + try: + self.get(key) + return True + except KeyError: return False - return True def delete(self, key): self.mc.delete(self.translate_host_key(key)) @@ -106,5 +122,7 @@ class CacheModule(BaseCacheModule): if not host_names: return - return [self.mc.get(self.translate_host_key(k)) for k in host_names] + for k in host_names: + ret[k] = self.mc.get(self.translate_host_key(k)) + return ret From 4c118159ed9a12ad4a4abb04f81a3310a0d74aeb Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 19 Jun 2017 13:45:52 -0400 Subject: [PATCH 04/10] system_tracking logging for fact cache --- awx/main/models/jobs.py | 5 ++++ awx/main/tests/unit/models/test_jobs.py | 12 ++++----- awx/main/tests/unit/utils/test_handlers.py | 31 +++++++++------------- awx/main/utils/formatters.py | 21 +++++++-------- awx/main/utils/handlers.py | 22 ++------------- 5 files changed, 35 insertions(+), 56 deletions(-) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 8384a8f7a9..31952ae952 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -46,6 +46,7 @@ from awx.main.consumers import emit_channel_notification logger = logging.getLogger('awx.main.models.jobs') analytics_logger = logging.getLogger('awx.analytics.job_events') +system_tracking_logger = logging.getLogger('awx.analytics.system_tracking') __all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent', 'SystemJobOptions', 'SystemJobTemplate', 'SystemJob'] @@ -774,6 +775,10 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): host.ansible_facts = ansible_facts host.ansible_facts_modified = modified host.save() + system_tracking_logger.info('New fact for inventory {} host {}'.format(host.inventory.name, host.name), + extra=dict(inventory_id=host.inventory.id, host_name=host.name, + ansible_facts=host.ansible_facts, + ansible_facts_modified=host.ansible_facts_modified.isoformat())) class JobHostSummary(CreatedModifiedModel): diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py index afc833a47a..69c5f262a2 100644 --- a/awx/main/tests/unit/models/test_jobs.py +++ b/awx/main/tests/unit/models/test_jobs.py @@ -38,18 +38,18 @@ def new_time(): @pytest.fixture -def hosts(old_time): +def hosts(old_time, inventory): return [ - Host(name='host1', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time), - Host(name='host2', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time), - Host(name='host3', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time), + Host(name='host1', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time, inventory=inventory), + Host(name='host2', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time, inventory=inventory), + Host(name='host3', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=old_time, inventory=inventory), ] @pytest.fixture -def hosts2(): +def hosts2(inventory): return [ - Host(name='host2', ansible_facts="foobar", ansible_facts_modified=old_time), + Host(name='host2', ansible_facts="foobar", ansible_facts_modified=old_time, inventory=inventory), ] diff --git a/awx/main/tests/unit/utils/test_handlers.py b/awx/main/tests/unit/utils/test_handlers.py index 055e2d3286..3cf5c6eeee 100644 --- a/awx/main/tests/unit/utils/test_handlers.py +++ b/awx/main/tests/unit/utils/test_handlers.py @@ -4,6 +4,8 @@ import cStringIO import json import logging import socket +import datetime +from dateutil.tz import tzutc from uuid import uuid4 import mock @@ -135,7 +137,7 @@ def test_base_logging_handler_emit(dummy_log_record): assert body['message'] == 'User joe logged in' -def test_base_logging_handler_emit_one_record_per_fact(): +def test_base_logging_handler_emit_system_tracking(): handler = BaseHandler(host='127.0.0.1', enabled_flag=True, message_type='logstash', indv_facts=True, enabled_loggers=['awx', 'activity_stream', 'job_events', 'system_tracking']) @@ -149,27 +151,20 @@ def test_base_logging_handler_emit_one_record_per_fact(): tuple(), # args, None # exc_info ) - record.module_name = 'packages' - record.facts_data = [{ - "name": "ansible", - "version": "2.2.1.0" - }, { - "name": "ansible-tower", - "version": "3.1.0" - }] + record.inventory_id = 11 + record.host_name = 'my_lucky_host' + record.ansible_facts = { + "ansible_kernel": "4.4.66-boot2docker", + "ansible_machine": "x86_64", + "ansible_swapfree_mb": 4663, + } + record.ansible_facts_modified = datetime.datetime.now(tzutc()).isoformat() sent_payloads = handler.emit(record) - assert len(sent_payloads) == 2 - sent_payloads.sort(key=lambda payload: payload['version']) - + assert len(sent_payloads) == 1 + assert sent_payloads[0]['ansible_facts'] == record.ansible_facts assert sent_payloads[0]['level'] == 'INFO' assert sent_payloads[0]['logger_name'] == 'awx.analytics.system_tracking' - assert sent_payloads[0]['name'] == 'ansible' - assert sent_payloads[0]['version'] == '2.2.1.0' - assert sent_payloads[1]['level'] == 'INFO' - assert sent_payloads[1]['logger_name'] == 'awx.analytics.system_tracking' - assert sent_payloads[1]['name'] == 'ansible-tower' - assert sent_payloads[1]['version'] == '3.1.0' @pytest.mark.parametrize('host, port, normalized, hostname_only', [ diff --git a/awx/main/utils/formatters.py b/awx/main/utils/formatters.py index a046f02b37..8e34e77818 100644 --- a/awx/main/utils/formatters.py +++ b/awx/main/utils/formatters.py @@ -44,7 +44,7 @@ class LogstashFormatter(LogstashFormatterVersion1): 'processName', 'relativeCreated', 'thread', 'threadName', 'extra', 'auth_token', 'tags', 'host', 'host_id', 'level', 'port', 'uuid')) if kind == 'system_tracking': - data = copy(raw_data['facts_data']) + data = copy(raw_data['ansible_facts']) elif kind == 'job_events': data = copy(raw_data['event_model_data']) else: @@ -99,17 +99,14 @@ class LogstashFormatter(LogstashFormatterVersion1): val = self.format_timestamp(time_float) data_for_log[key] = val elif kind == 'system_tracking': - module_name = raw_data['module_name'] - if module_name in ['services', 'packages', 'files']: - data_for_log[module_name] = index_by_name(data) - elif module_name == 'ansible': - data_for_log['ansible'] = data - # Remove sub-keys with data type conflicts in elastic search - data_for_log['ansible'].pop('ansible_python_version', None) - data_for_log['ansible']['ansible_python'].pop('version_info', None) - else: - data_for_log['facts'] = data - data_for_log['module_name'] = module_name + data.pop('ansible_python_version', None) + if 'ansible_python' in data: + data['ansible_python'].pop('version_info', None) + + data_for_log['ansible_facts'] = data + data_for_log['ansible_facts_modified'] = raw_data['ansible_facts_modified'] + data_for_log['inventory_id'] = raw_data['inventory_id'] + data_for_log['host_name'] = raw_data['host_name'] elif kind == 'performance': request = raw_data['python_objects']['request'] response = raw_data['python_objects']['response'] diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index 9d8a3572ea..3cdbbb261a 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -112,28 +112,10 @@ class BaseHandler(logging.Handler): """ return payload - def _send_and_queue_system_tracking(self, payload_data): - # Special action for System Tracking, queue up multiple log messages - ret = [] - module_name = payload_data['module_name'] - if module_name in ['services', 'packages', 'files']: - facts_dict = payload_data.pop(module_name) - for key in facts_dict: - fact_payload = copy(payload_data) - fact_payload.update(facts_dict[key]) - ret.append(self._send(fact_payload)) - return ret - def _format_and_send_record(self, record): - ret = [] - payload = self.format(record) if self.indv_facts: - payload_data = json.loads(payload) - if record.name.startswith('awx.analytics.system_tracking'): - ret.extend(self._send_and_queue_system_tracking(payload_data)) - if len(ret) == 0: - ret.append(self._send(payload)) - return ret + return [self._send(json.loads(self.format(record)))] + return [self._send(self.format(record))] def _skip_log(self, logger_name): if self.host == '' or (not self.enabled_flag): From ec2e537f630a3224c0370ba753a527e68aafe297 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 19 Jun 2017 15:33:24 -0400 Subject: [PATCH 05/10] remove fache cache receiver --- Makefile | 7 - .../commands/run_fact_cache_receiver.py | 151 ------------------ .../commands/test_run_fact_cache_receiver.py | 129 --------------- awx/main/tests/unit/utils/test_reload.py | 2 +- awx/settings/defaults.py | 3 - awx/settings/development.py | 1 - awx/settings/production.py | 3 +- tools/docker-compose/Procfile | 1 - tools/docker-compose/supervisor.conf | 10 +- tools/docker-isolated/Procfile | 1 - 10 files changed, 3 insertions(+), 305 deletions(-) delete mode 100644 awx/main/management/commands/run_fact_cache_receiver.py delete mode 100644 awx/main/tests/functional/commands/test_run_fact_cache_receiver.py diff --git a/Makefile b/Makefile index c346268767..d4e9cb7d41 100644 --- a/Makefile +++ b/Makefile @@ -390,7 +390,6 @@ server_noattach: tmux new-window 'exec make receiver' tmux select-window -t tower:2 tmux rename-window 'Extra Services' - tmux split-window -h 'exec make factcacher' tmux select-window -t tower:0 server: server_noattach @@ -472,12 +471,6 @@ socketservice: fi; \ $(PYTHON) manage.py run_socketio_service -factcacher: - @if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/tower/bin/activate; \ - fi; \ - $(PYTHON) manage.py run_fact_cache_receiver - nginx: nginx -g "daemon off;" diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py deleted file mode 100644 index 4d00f5c157..0000000000 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import logging -from datetime import datetime - -from kombu import Connection, Exchange, Queue -from kombu.mixins import ConsumerMixin - -# Django -from django.core.management.base import NoArgsCommand -from django.conf import settings -from django.utils import timezone -from django.db import IntegrityError - -# AWX -from awx.main.models.jobs import Job -from awx.main.models.fact import Fact -from awx.main.models.inventory import Host -from awx.main.models.base import PERM_INVENTORY_SCAN - -logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') -analytics_logger = logging.getLogger('awx.analytics.system_tracking') - - -class FactBrokerWorker(ConsumerMixin): - - def __init__(self, connection): - self.connection = connection - self.timestamp = None - - def get_consumers(self, Consumer, channel): - return [Consumer(queues=[Queue(settings.FACT_QUEUE, - Exchange(settings.FACT_QUEUE, type='direct'), - routing_key=settings.FACT_QUEUE)], - accept=['json'], - callbacks=[self.process_fact_message])] - - def _determine_module(self, facts): - # Symantically determine the module type - if len(facts) == 1: - return facts.iterkeys().next() - return 'ansible' - - def _extract_module_facts(self, module, facts): - if module in facts: - f = facts[module] - return f - return facts - - def process_facts(self, facts): - module = self._determine_module(facts) - facts = self._extract_module_facts(module, facts) - return (module, facts) - - def _do_fact_scan_create_update(self, host_obj, module_name, facts, timestamp): - try: - fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=timestamp) - fact_obj.facts = facts - fact_obj.save() - logger.info('Updated existing fact <%s>' % (fact_obj.id)) - except Fact.DoesNotExist: - # Create new Fact entry - fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts) - logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) - analytics_logger.info('Received message with fact data', extra=dict( - module_name=module_name, facts_data=facts)) - return fact_obj - - def _do_gather_facts_update(self, host_obj, module_name, facts, timestamp): - host_obj.update_ansible_facts(module=module_name, facts=facts, timestamp=self.timestamp) - return host_obj - - def process_fact_message(self, body, message): - hostname = body['host'] - inventory_id = body['inventory_id'] - job_id = body.get('job_id', -1) - facts_data = body['facts'] - date_key = body['date_key'] - - is_fact_scan = False - job = None - - ''' - In Tower < 3.2 we neglected to ack the incoming message. - In Tower 3.2 we add the job_id parameter. - To account for this, we need to fail gracefully when the job is not - found. - ''' - - try: - job = Job.objects.get(id=job_id) - is_fact_scan = True if job.job_type == PERM_INVENTORY_SCAN else False - except Job.DoesNotExist: - logger.warn('Failed to find job %s while processing facts' % job_id) - message.ack() - return None - - try: - host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id) - except Fact.DoesNotExist: - logger.warn('Failed to intake fact. Host does not exist <%s, %s>' % (hostname, inventory_id)) - message.ack() - return None - except Fact.MultipleObjectsReturned: - logger.warn('Database inconsistent. Multiple Hosts found for <%s, %s>.' % (hostname, inventory_id)) - message.ack() - return None - except Exception as e: - logger.error("Exception communicating with Fact Cache Database: %s" % str(e)) - message.ack() - return None - - (module_name, facts) = self.process_facts(facts_data) - self.timestamp = datetime.fromtimestamp(date_key, timezone.utc) - - ret = None - # Update existing Fact entry - if is_fact_scan is True: - ret = self._do_fact_scan_create_update(host_obj, module_name, facts, self.timestamp) - - if job.store_facts is True: - if module_name == 'insights': - system_id = facts.get('system_id', None) - host_obj.insights_system_id = system_id - try: - host_obj.save() - except IntegrityError: - host_obj.insights_system_id = None - logger.warn('Inisghts system_id %s not assigned to host %s because it already exists.' % (system_id, host_obj.pk)) - self._do_gather_facts_update(host_obj, module_name, facts, self.timestamp) - - message.ack() - return ret - - -class Command(NoArgsCommand): - ''' - Save Fact Event packets to the database as emitted from a Tower Scan Job - ''' - help = 'Launch the Fact Cache Receiver' - - def handle_noargs(self, **options): - with Connection(settings.BROKER_URL) as conn: - try: - worker = FactBrokerWorker(conn) - worker.run() - except KeyboardInterrupt: - pass - diff --git a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py b/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py deleted file mode 100644 index 1d52c56cdb..0000000000 --- a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import pytest -from datetime import datetime -import json - -# Django -from django.utils import timezone - -# AWX -from awx.main.management.commands.run_fact_cache_receiver import FactBrokerWorker -from awx.main.models.fact import Fact -from awx.main.models.inventory import Host -from awx.main.models.base import PERM_INVENTORY_SCAN - - -@pytest.fixture -def mock_message(mocker): - class Message(): - def ack(): - pass - msg = Message() - mocker.patch.object(msg, 'ack') - return msg - - -@pytest.fixture -def mock_job_generator(mocker): - def fn(store_facts=True, job_type=PERM_INVENTORY_SCAN): - class Job(): - def __init__(self): - self.store_facts = store_facts - self.job_type = job_type - job = Job() - mocker.patch('awx.main.models.Job.objects.get', return_value=job) - return job - return fn - - -# TODO: Check that timestamp and other attributes are as expected -def check_process_fact_message_module(fact_returned, data, module_name, message): - date_key = data['date_key'] - - message.ack.assert_called_with() - - # Ensure 1, and only 1, fact created - timestamp = datetime.fromtimestamp(date_key, timezone.utc) - assert 1 == Fact.objects.all().count() - - host_obj = Host.objects.get(name=data['host'], inventory__id=data['inventory_id']) - assert host_obj is not None - fact_known = Fact.get_host_fact(host_obj.id, module_name, timestamp) - assert fact_known is not None - assert fact_known == fact_returned - - assert host_obj == fact_returned.host - if module_name == 'ansible': - assert data['facts'] == fact_returned.facts - else: - assert data['facts'][module_name] == fact_returned.facts - assert timestamp == fact_returned.timestamp - assert module_name == fact_returned.module - - -@pytest.mark.django_db -def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message) - check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible', mock_message) - - -@pytest.mark.django_db -def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - fact_returned = receiver.process_fact_message(fact_msg_packages, mock_message) - check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages', mock_message) - - -@pytest.mark.django_db -def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - fact_returned = receiver.process_fact_message(fact_msg_services, mock_message) - check_process_fact_message_module(fact_returned, fact_msg_services, 'services', mock_message) - - -@pytest.mark.django_db -def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - ''' - We pickypack our fact sending onto the Ansible fact interface. - The interface is . Where facts is a json blob of all the facts. - This makes it hard to decipher what facts are new/changed. - Because of this, we handle the same fact module data being sent multiple times - and just keep the newest version. - ''' - #epoch = timezone.now() - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - epoch = datetime.fromtimestamp(fact_msg_ansible['date_key']) - fact_scans(fact_scans=1, timestamp_epoch=epoch) - key = 'ansible.overwrite' - value = 'hello world' - - receiver = FactBrokerWorker(None) - receiver.process_fact_message(fact_msg_ansible, mock_message) - - fact_msg_ansible['facts'][key] = value - fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message) - - fact_obj = Fact.objects.get(id=fact_returned.id) - assert key in fact_obj.facts - assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug - - -@pytest.mark.django_db -def test_process_fact_store_facts(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=True, job_type='run') - receiver.process_fact_message(fact_msg_services, mock_message) - - host_obj = Host.objects.get(name=fact_msg_services['host'], inventory__id=fact_msg_services['inventory_id']) - assert host_obj is not None - - assert host_obj.ansible_facts == fact_msg_services['facts'] - - diff --git a/awx/main/tests/unit/utils/test_reload.py b/awx/main/tests/unit/utils/test_reload.py index 26f2d42ece..6d09d6105b 100644 --- a/awx/main/tests/unit/utils/test_reload.py +++ b/awx/main/tests/unit/utils/test_reload.py @@ -10,7 +10,7 @@ def test_produce_supervisor_command(mocker): with mocker.patch.object(reload.subprocess, 'Popen', Popen_mock): reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart") reload.subprocess.Popen.assert_called_once_with( - ['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher'], + ['supervisorctl', 'restart', 'tower-processes:receiver',], stderr=-1, stdin=-1, stdout=-1) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 537f5f07b6..22d768836c 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1053,9 +1053,6 @@ LOGGING = { 'awx.main.consumers': { 'handlers': ['null'] }, - 'awx.main.commands.run_fact_cache_receiver': { - 'handlers': ['fact_receiver'], - }, 'awx.main.access': { 'handlers': ['null'], 'propagate': False, diff --git a/awx/settings/development.py b/awx/settings/development.py index 0edf353f6a..3bb087e0c9 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -129,7 +129,6 @@ SERVICE_NAME_DICT = { "runworker": "channels", "uwsgi": "uwsgi", "daphne": "daphne", - "fact": "factcacher", "nginx": "nginx"} # Used for sending commands in automatic restart UWSGI_FIFO_LOCATION = '/awxfifo' diff --git a/awx/settings/production.py b/awx/settings/production.py index ad668784d2..cd0e09885c 100644 --- a/awx/settings/production.py +++ b/awx/settings/production.py @@ -66,8 +66,7 @@ SERVICE_NAME_DICT = { "callback": "awx-callback-receiver", "channels": "awx-channels-worker", "uwsgi": "awx-uwsgi", - "daphne": "awx-daphne", - "fact": "awx-fact-cache-receiver"} + "daphne": "awx-daphne"} # Used for sending commands in automatic restart UWSGI_FIFO_LOCATION = '/var/lib/awx/awxfifo' diff --git a/tools/docker-compose/Procfile b/tools/docker-compose/Procfile index b30dfcad2b..83488b891d 100644 --- a/tools/docker-compose/Procfile +++ b/tools/docker-compose/Procfile @@ -3,6 +3,5 @@ runworker: make runworker daphne: make daphne celeryd: make celeryd receiver: make receiver -factcacher: make factcacher flower: make flower uwsgi: make uwsgi diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index b46d05bf4c..f3ea26f95f 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -43,14 +43,6 @@ redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 -[program:factcacher] -command = python manage.py run_fact_cache_receiver -autostart = true -autorestart = true -redirect_stderr=true -stdout_logfile=/dev/fd/1 -stdout_logfile_maxbytes=0 - [program:nginx] command = nginx -g "daemon off;" autostart = true @@ -68,7 +60,7 @@ stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 [group:tower-processes] -programs=celeryd,receiver,runworker,uwsgi,daphne,factcacher,nginx,flower +programs=celeryd,receiver,runworker,uwsgi,daphne,nginx,flower priority=5 [unix_http_server] diff --git a/tools/docker-isolated/Procfile b/tools/docker-isolated/Procfile index 35d7db873c..42cf77b7d5 100644 --- a/tools/docker-isolated/Procfile +++ b/tools/docker-isolated/Procfile @@ -3,6 +3,5 @@ runworker: make runworker daphne: make daphne celeryd: make celeryd EXTRA_GROUP_QUEUES=thepentagon receiver: make receiver -factcacher: make factcacher flower: make flower uwsgi: make uwsgi From 965159a1fbc1e18dbaba589d5919752f662b0d50 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 19 Jun 2017 15:45:32 -0400 Subject: [PATCH 06/10] rename store_facts to use_fact_cache --- awx/api/serializers.py | 2 +- awx/main/migrations/0038_v320_release.py | 8 ++++---- awx/main/models/jobs.py | 6 +++--- awx/main/tasks.py | 6 +++--- awx/main/utils/handlers.py | 1 - docs/inventory_refresh.md | 2 +- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 3c8ae05c75..38dd74702d 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2227,7 +2227,7 @@ class JobOptionsSerializer(LabelsListMixin, BaseSerializer): fields = ('*', 'job_type', 'inventory', 'project', 'playbook', 'credential', 'vault_credential', 'forks', 'limit', 'verbosity', 'extra_vars', 'job_tags', 'force_handlers', - 'skip_tags', 'start_at_task', 'timeout', 'store_facts',) + 'skip_tags', 'start_at_task', 'timeout', 'use_fact_cache',) def get_fields(self): fields = super(JobOptionsSerializer, self).get_fields() diff --git a/awx/main/migrations/0038_v320_release.py b/awx/main/migrations/0038_v320_release.py index f04d280727..16f9ce1688 100644 --- a/awx/main/migrations/0038_v320_release.py +++ b/awx/main/migrations/0038_v320_release.py @@ -88,13 +88,13 @@ class Migration(migrations.Migration): ), migrations.AddField( model_name='job', - name='store_facts', - field=models.BooleanField(default=False, help_text='During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'), + name='use_fact_cache', + field=models.BooleanField(default=False, help_text='If enabled, Tower will act as an Ansible Fact Cache Plugin; persisting facts at the end of a playbook run to the database and caching facts for use by Ansible.'), ), migrations.AddField( model_name='jobtemplate', - name='store_facts', - field=models.BooleanField(default=False, help_text='During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'), + name='use_fact_cache', + field=models.BooleanField(default=False, help_text='If enabled, Tower will act as an Ansible Fact Cache Plugin; persisting facts at the end of a playbook run to the database and caching facts for use by Ansible.'), ), migrations.RunSQL([("CREATE INDEX host_ansible_facts_default_gin ON %s USING gin" "(ansible_facts jsonb_path_ops);", [AsIs(Host._meta.db_table)])], diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 31952ae952..6ac43cc98a 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -153,9 +153,9 @@ class JobOptions(BaseModel): default=0, help_text=_("The amount of time (in seconds) to run before the task is canceled."), ) - store_facts = models.BooleanField( + use_fact_cache = models.BooleanField( default=False, - help_text=_('During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'), + help_text=_("If enabled, Tower will act as an Ansible Fact Cache Plugin; persisting facts at the end of a playbook run to the database and caching facts for use by Ansible."), ) extra_vars_dict = VarsDictProperty('extra_vars', True) @@ -288,7 +288,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour 'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', 'labels', 'survey_passwords', - 'allow_simultaneous', 'timeout', 'store_facts',] + 'allow_simultaneous', 'timeout', 'use_fact_cache',] def resource_validation_data(self): ''' diff --git a/awx/main/tasks.py b/awx/main/tasks.py index dd344ea587..c2b4af4b77 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -878,7 +878,7 @@ class RunJob(BaseTask): # callbacks to work. env['JOB_ID'] = str(job.pk) env['INVENTORY_ID'] = str(job.inventory.pk) - if job.store_facts: + if job.use_fact_cache: env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') env['ANSIBLE_CACHE_PLUGIN'] = "tower" @@ -1140,13 +1140,13 @@ class RunJob(BaseTask): ('project_update', local_project_sync.name, local_project_sync.id))) raise - if job.store_facts: + if job.use_fact_cache: job.start_job_fact_cache() def final_run_hook(self, job, status, **kwargs): super(RunJob, self).final_run_hook(job, status, **kwargs) - if job.store_facts: + if job.use_fact_cache: job.finish_job_fact_cache() try: inventory = job.inventory diff --git a/awx/main/utils/handlers.py b/awx/main/utils/handlers.py index 3cdbbb261a..8103f910b4 100644 --- a/awx/main/utils/handlers.py +++ b/awx/main/utils/handlers.py @@ -11,7 +11,6 @@ import socket import select import six from concurrent.futures import ThreadPoolExecutor -from copy import copy from requests.exceptions import RequestException # loggly diff --git a/docs/inventory_refresh.md b/docs/inventory_refresh.md index e5840c8bbe..ae58cf9c7b 100644 --- a/docs/inventory_refresh.md +++ b/docs/inventory_refresh.md @@ -13,7 +13,7 @@ from `InventorySource` completely in Tower 3.3. As a result the related field on ## Fact Searching Facts generated by an Ansible playbook during a Job Template run are stored by Tower into the database -whenever `store_facts=True` is set per-Job-Template. New facts are merged with existing +whenever `use_fact_cache=True` is set per-Job-Template. New facts are merged with existing facts and are per-host. These stored facts can be used to filter hosts via the `/api/v2/hosts` endpoint, using the GET query parameter `host_filter` i.e. `/api/v2/hosts?host_filter=ansible_facts__ansible_processor_vcpus=8` From 45421c84a68f4b0963f385932dfae1e55f421339 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 20 Jun 2017 15:03:19 -0400 Subject: [PATCH 07/10] fact cache docs --- docs/fact_cache.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 docs/fact_cache.md diff --git a/docs/fact_cache.md b/docs/fact_cache.md new file mode 100644 index 0000000000..8bea9b6189 --- /dev/null +++ b/docs/fact_cache.md @@ -0,0 +1,19 @@ +# Tower as an Ansible Fact Cache +Tower can store and retrieve per-host facts via an Ansible Fact Cache Plugin. This behavior is configurable on a per-job-template basis. When enabled, Tower will serve fact requests for all Hosts in an Inventory related to the Job running. This allows users to use Job Templates with `--limit` while still having access to the entire Inventory of Host facts. The Tower Ansible Fact Cache supports a global timeout settings that it enforces per-host. The setting is available in the CTiT interface under the Jobs category with the name `ANSIBLE_FACT_CACHE_TIMEOUT` and is in seconds. + +## Tower Fact Cache Implementation Details +### Tower Injection +In order to understand the behavior of Tower as a fact cache you will need to understand how fact caching is achieved in Tower. Upon a Job invocation with `use_fact_cache=True`, Tower will inject, into memcached, all `ansible_facts` associated with each Host in the Inventory associated with the Job. The cache key is of the form `inventory_id-host_name` so that hosts with the same name in different inventories do not clash. A list of all hosts in the inventory is also injected into memcached with key `inventory_id` and value `[host_name1, host_name2, ..., host_name3]`. + +### Ansible plugin usage +The fact cache plugin running in Ansible will connect to the same memcached instance. A `get()` call to the fact cache interface in Ansible will result in a looked into memcached for the host-specific set of facts. A `set()` call to the fact cache will result in an update to memcached record along with the modified time. + +### Tower Cache to DB +When a Job finishes running that has `use_fact_cache=True` enabled, Tower will go through memcached and get all records for the hosts in the Inventory. Any records with update times newer than the database per-host `ansible_facts_modified` value will result in the `ansible_facts`, `ansible_facts_modified` from memcached being saved to the database. + +### Caching Behavior +Tower will always inject the host `ansible_facts` into memcached. The Ansible Tower Fact Cache Plugin will choose to present the cached values to the user or not based on the per-host `ansible_facts_modified` time and the global `ANSIBLE_FACT_CACHE_TIMEOUT`. + +## Tower Fact Logging +New and changed facts will be logged via Tower's logging facility. Specifically, to the `system_tracking` namespace or logger. The logging payload will include the fields: `host_name`, `inventory_id`, and `ansible_facts`. Where `ansible_facts` is a dictionary of all ansible facts for `host_name` in Tower Inventory `inventory_id`. + From 6c678346941c7c3c2256bb4650d78c61f609fea2 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 21 Jun 2017 11:46:17 -0400 Subject: [PATCH 08/10] beautify code --- awx/main/tasks.py | 2 -- awx/main/tests/unit/models/test_jobs.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c2b4af4b77..a64c06dae9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -773,7 +773,6 @@ class BaseTask(Task): self.final_run_hook(instance, status, **kwargs) instance.websocket_emit_status(status) if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): - print("Status is not successful!") # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute if status == 'canceled': @@ -1858,7 +1857,6 @@ class RunInventoryUpdate(BaseTask): raise def final_run_hook(self, instance, status, **kwargs): - print("In final run hook") if self.custom_dir_path: for p in self.custom_dir_path: try: diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py index 69c5f262a2..5592407013 100644 --- a/awx/main/tests/unit/models/test_jobs.py +++ b/awx/main/tests/unit/models/test_jobs.py @@ -87,7 +87,7 @@ def test_start_job_fact_cache(hosts, job, inventory, mocker): job.start_job_fact_cache() - job._get_memcache_connection().set.assert_any_call('{}'.format(5), [h.name for h in hosts]) + job._get_memcache_connection().set.assert_any_call('5', [h.name for h in hosts]) for host in hosts: job._get_memcache_connection().set.assert_any_call('{}-{}'.format(5, host.name), json.dumps(host.ansible_facts)) job._get_memcache_connection().set.assert_any_call('{}-{}-modified'.format(5, host.name), host.ansible_facts_modified.isoformat()) From d679dc7407e65d16a2ace36b4fb54721d1a94daa Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 21 Jun 2017 11:45:46 -0400 Subject: [PATCH 09/10] do not inject facts for isolated node runs --- awx/main/tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a64c06dae9..4a7901a065 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -877,7 +877,7 @@ class RunJob(BaseTask): # callbacks to work. env['JOB_ID'] = str(job.pk) env['INVENTORY_ID'] = str(job.inventory.pk) - if job.use_fact_cache: + if job.use_fact_cache and not kwargs.get('isolated'): env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') env['ANSIBLE_CACHE_PLUGIN'] = "tower" @@ -1139,13 +1139,13 @@ class RunJob(BaseTask): ('project_update', local_project_sync.name, local_project_sync.id))) raise - if job.use_fact_cache: + if job.use_fact_cache and not kwargs.get('isolated'): job.start_job_fact_cache() def final_run_hook(self, job, status, **kwargs): super(RunJob, self).final_run_hook(job, status, **kwargs) - if job.use_fact_cache: + if job.use_fact_cache and not kwargs.get('isolated'): job.finish_job_fact_cache() try: inventory = job.inventory From d6082a976ba0a674b201b1a7de0bcdd99dd8de9e Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 22 Jun 2017 09:43:10 -0400 Subject: [PATCH 10/10] retain support for insights system_id discovery --- awx/main/models/jobs.py | 7 +++++++ awx/main/tests/unit/models/test_jobs.py | 6 ++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 6ac43cc98a..a071cb48c3 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -769,11 +769,18 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin): modified = parser.parse(modified, tzinfos=[tzutc()]) if not host.ansible_facts_modified or modified > host.ansible_facts_modified: ansible_facts = cache.get(host_key) + try: + ansible_facts = json.loads(ansible_facts) + except Exception: + ansible_facts = None + if ansible_facts is None: cache.delete(host_key) continue host.ansible_facts = ansible_facts host.ansible_facts_modified = modified + if 'insights' in ansible_facts and 'system_id' in ansible_facts['insights']: + host.insights_system_id = ansible_facts['insights']['system_id'] host.save() system_tracking_logger.info('New fact for inventory {} host {}'.format(host.inventory.name, host.name), extra=dict(inventory_id=host.inventory.id, host_name=host.name, diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py index 5592407013..9b2e3a60d3 100644 --- a/awx/main/tests/unit/models/test_jobs.py +++ b/awx/main/tests/unit/models/test_jobs.py @@ -119,13 +119,15 @@ def test_finish_job_fact_cache(job, hosts, inventory, mocker, new_time): host_key = job.memcached_fact_host_key(hosts[1].name) modified_key = job.memcached_fact_modified_key(hosts[1].name) - job._get_memcache_connection().set(host_key, 'blah') + ansible_facts_new = {"foo": "bar", "insights": {"system_id": "updated_by_scan"}} + job._get_memcache_connection().set(host_key, json.dumps(ansible_facts_new)) job._get_memcache_connection().set(modified_key, new_time.isoformat()) job.finish_job_fact_cache() hosts[0].save.assert_not_called() hosts[2].save.assert_not_called() - assert hosts[1].ansible_facts == 'blah' + assert hosts[1].ansible_facts == ansible_facts_new + assert hosts[1].insights_system_id == "updated_by_scan" hosts[1].save.assert_called_once_with()