add fact modified time

This commit is contained in:
Chris Meyers 2017-06-16 08:36:32 -04:00
parent 626e2d1c9b
commit 817dbe8d33
6 changed files with 58 additions and 28 deletions

View File

@ -228,6 +228,19 @@ register(
category_slug='jobs',
)
register(
'ANSIBLE_FACT_CACHE_TIMEOUT',
field_class=fields.IntegerField,
min_value=0,
default=0,
label=_('Per-Host Ansible Fact Cache Timeout'),
help_text=_('Maximum time, in seconds, that Tower stored Ansible facts are considered valid since '
'the last time they were modified. Only valid, non-stale, facts will be accessible by '
'a playbook. Note, this does not influence the deletion of ansible_facts from the database.'),
category=_('Jobs'),
category_slug='jobs',
)
register(
'LOG_AGGREGATOR_HOST',
field_class=fields.CharField,

View File

@ -81,6 +81,11 @@ class Migration(migrations.Migration):
name='ansible_facts',
field=awx.main.fields.JSONBField(default={}, help_text='Arbitrary JSON structure of most recent ansible_facts, per-host.', blank=True),
),
migrations.AddField(
model_name='host',
name='ansible_facts_modified',
field=models.DateTimeField(default=None, help_text='The date and time ansible_facts was last modified.', null=True, editable=False),
),
migrations.AddField(
model_name='job',
name='store_facts',

View File

@ -448,6 +448,12 @@ class Host(CommonModelNameNotUnique):
default={},
help_text=_('Arbitrary JSON structure of most recent ansible_facts, per-host.'),
)
ansible_facts_modified = models.DateTimeField(
default=None,
editable=False,
null=True,
help_text=_('The date and time ansible_facts was last modified.'),
)
insights_system_id = models.TextField(
blank=True,
default=None,

View File

@ -7,6 +7,7 @@ import hashlib
import hmac
import logging
import time
import json
from urlparse import urljoin
# Django
@ -707,12 +708,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
self.project_update.cancel(job_explanation=job_explanation)
return res
@property
def store_facts_enabled(self):
if not self.job_template or self.job_template is False:
return False
return True
@property
def memcached_fact_key(self):
return '{}'.format(self.inventory.id)
@ -742,7 +737,7 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin):
modified_key = self.memcached_fact_modified_key(host.name)
# Only add host/facts if host doesn't already exist in the cache
if cache.get(modified_key) is None:
cache.set(host_key, host.ansible_facts)
cache.set(host_key, json.dumps(host.ansible_facts))
cache.set(modified_key, False)
host_names.append(host.name)

View File

@ -773,6 +773,7 @@ class BaseTask(Task):
self.final_run_hook(instance, status, **kwargs)
instance.websocket_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
print("Status is not successful!")
# Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute
if status == 'canceled':
@ -877,9 +878,12 @@ class RunJob(BaseTask):
# callbacks to work.
env['JOB_ID'] = str(job.pk)
env['INVENTORY_ID'] = str(job.inventory.pk)
if job.store_facts_enabled:
env['MEMCACHED_PREPEND_KEY'] = job.memcached_fact_key
env['MEMCACHED_LOCATION'] = settings.CACHES['default']['LOCATION']
if job.store_facts:
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
env['ANSIBLE_FACT_CACHE_TIMEOUT'] = str(settings.ANSIBLE_FACT_CACHE_TIMEOUT)
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
@ -954,13 +958,6 @@ class RunJob(BaseTask):
if authorize:
env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password')
# Set environment variables related to gathering facts from the cache
if (job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True) and not kwargs.get('isolated'):
env['FACT_QUEUE'] = settings.FACT_QUEUE
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT)
return env
def build_args(self, job, **kwargs):
@ -1143,13 +1140,13 @@ class RunJob(BaseTask):
('project_update', local_project_sync.name, local_project_sync.id)))
raise
if job.store_facts_enabled:
if job.store_facts:
job.start_job_fact_cache()
def final_run_hook(self, job, status, **kwargs):
super(RunJob, self).final_run_hook(job, status, **kwargs)
if job.store_facts_enabled:
if job.store_facts:
job.finish_job_fact_cache()
try:
inventory = job.inventory

View File

@ -33,6 +33,8 @@ import os
import memcache
import json
from ansible import constants as C
try:
from ansible.cache.base import BaseCacheModule
except:
@ -42,8 +44,8 @@ except:
class CacheModule(BaseCacheModule):
def __init__(self, *args, **kwargs):
# Basic in-memory caching for typical runs
self.mc = memcache.Client([os.environ['MEMCACHED_LOCATION']], debug=0)
self.mc = memcache.Client([C.CACHE_PLUGIN_CONNECTION], debug=0)
self.timeout = int(C.CACHE_PLUGIN_TIMEOUT)
self.inventory_id = os.environ['INVENTORY_ID']
@property
@ -59,9 +61,14 @@ class CacheModule(BaseCacheModule):
def get(self, key):
host_key = self.translate_host_key(key)
value_json = self.mc.get(host_key)
if not value_json:
if value_json is None:
raise KeyError
try:
return json.loads(value_json)
# If cache entry is corrupt or bad, fail gracefully.
except (TypeError, ValueError):
self.delete(key)
raise KeyError
return json.loads(value_json)
def set(self, key, value):
host_key = self.translate_host_key(key)
@ -74,7 +81,8 @@ class CacheModule(BaseCacheModule):
return self.mc.get(self.host_names_key)
def contains(self, key):
val = self.mc.get(key)
host_key = self.translate_host_key(key)
val = self.mc.get(host_key)
if val is None:
return False
return True
@ -84,13 +92,19 @@ class CacheModule(BaseCacheModule):
self.mc.delete(self.translate_modified_key(key))
def flush(self):
for k in self.mc.get(self.host_names_key):
host_names = self.mc.get(self.host_names_key)
if not host_names:
return
for k in host_names:
self.mc.delete(self.translate_host_key(k))
self.mc.delete(self.translate_modified_key(k))
def copy(self):
ret = dict()
for k in self.mc.get(self.host_names_key):
ret[k] = self.mc.get(self.translate_host_key(k))
return ret
host_names = self.mc.get(self.host_names_key)
if not host_names:
return
return [self.mc.get(self.translate_host_key(k)) for k in host_names]