[constructed-inventory] Allow filtering based on facts (#13678)

* initial functional filter-on-facts functionality

* Move facts to its own module to make interface more coherent

* Update test
This commit is contained in:
Alan Rominger
2023-03-15 10:35:12 -04:00
committed by Rick Elrod
parent 771b831da8
commit aa631a1ba7
4 changed files with 176 additions and 134 deletions

View File

@@ -2,12 +2,8 @@
# All Rights Reserved.
# Python
import codecs
import datetime
import logging
import os
import time
import json
from urllib.parse import urljoin
@@ -15,11 +11,8 @@ from urllib.parse import urljoin
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models.query import QuerySet
# from django.core.cache import cache
from django.utils.encoding import smart_str
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import FieldDoesNotExist
@@ -44,7 +37,7 @@ from awx.main.models.notifications import (
NotificationTemplate,
JobNotificationMixin,
)
from awx.main.utils import parse_yaml_or_json, getattr_dne, NullablePromptPseudoField, polymorphic, log_excess_runtime
from awx.main.utils import parse_yaml_or_json, getattr_dne, NullablePromptPseudoField, polymorphic
from awx.main.fields import ImplicitRoleField, AskForField, JSONBlob, OrderedManyToManyField
from awx.main.models.mixins import (
ResourceMixin,
@@ -60,8 +53,6 @@ from awx.main.constants import JOB_VARIABLE_PREFIXES
logger = logging.getLogger('awx.main.models.jobs')
analytics_logger = logging.getLogger('awx.analytics.job_events')
system_tracking_logger = logging.getLogger('awx.analytics.system_tracking')
__all__ = ['JobTemplate', 'JobLaunchConfig', 'Job', 'JobHostSummary', 'SystemJobTemplate', 'SystemJob']
@@ -848,110 +839,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
def get_notification_friendly_name(self):
return "Job"
def _get_inventory_hosts(self, only=('name', 'ansible_facts', 'ansible_facts_modified', 'modified', 'inventory_id'), **filters):
"""Return value is an iterable for the relevant hosts for this job"""
if not self.inventory:
return []
host_queryset = self.inventory.hosts.only(*only)
if filters:
host_queryset = host_queryset.filter(**filters)
host_queryset = self.inventory.get_sliced_hosts(host_queryset, self.job_slice_number, self.job_slice_count)
if isinstance(host_queryset, QuerySet):
return host_queryset.iterator()
return host_queryset
@log_excess_runtime(logger, debug_cutoff=0.01, msg='Job {job_id} host facts prepared for {written_ct} hosts, took {delta:.3f} s', add_log_data=True)
def start_job_fact_cache(self, destination, log_data, timeout=None):
self.log_lifecycle("start_job_fact_cache")
log_data['job_id'] = self.id
log_data['written_ct'] = 0
os.makedirs(destination, mode=0o700)
if timeout is None:
timeout = settings.ANSIBLE_FACT_CACHE_TIMEOUT
if timeout > 0:
# exclude hosts with fact data older than `settings.ANSIBLE_FACT_CACHE_TIMEOUT seconds`
timeout = now() - datetime.timedelta(seconds=timeout)
hosts = self._get_inventory_hosts(ansible_facts_modified__gte=timeout)
else:
hosts = self._get_inventory_hosts()
last_filepath_written = None
for host in hosts:
filepath = os.sep.join(map(str, [destination, host.name]))
if not os.path.realpath(filepath).startswith(destination):
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
continue
try:
with codecs.open(filepath, 'w', encoding='utf-8') as f:
os.chmod(f.name, 0o600)
json.dump(host.ansible_facts, f)
log_data['written_ct'] += 1
last_filepath_written = filepath
except IOError:
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
continue
# make note of the time we wrote the last file so we can check if any file changed later
if last_filepath_written:
return os.path.getmtime(last_filepath_written)
return None
@log_excess_runtime(
logger,
debug_cutoff=0.01,
msg='Job {job_id} host facts: updated {updated_ct}, cleared {cleared_ct}, unchanged {unmodified_ct}, took {delta:.3f} s',
add_log_data=True,
)
def finish_job_fact_cache(self, destination, facts_write_time, log_data):
self.log_lifecycle("finish_job_fact_cache")
log_data['job_id'] = self.id
log_data['updated_ct'] = 0
log_data['unmodified_ct'] = 0
log_data['cleared_ct'] = 0
hosts_to_update = []
for host in self._get_inventory_hosts():
filepath = os.sep.join(map(str, [destination, host.name]))
if not os.path.realpath(filepath).startswith(destination):
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
continue
if os.path.exists(filepath):
# If the file changed since we wrote the last facts file, pre-playbook run...
modified = os.path.getmtime(filepath)
if (not facts_write_time) or modified > facts_write_time:
with codecs.open(filepath, 'r', encoding='utf-8') as f:
try:
ansible_facts = json.load(f)
except ValueError:
continue
host.ansible_facts = ansible_facts
host.ansible_facts_modified = now()
hosts_to_update.append(host)
system_tracking_logger.info(
'New fact for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)),
extra=dict(
inventory_id=host.inventory.id,
host_name=host.name,
ansible_facts=host.ansible_facts,
ansible_facts_modified=host.ansible_facts_modified.isoformat(),
job_id=self.id,
),
)
log_data['updated_ct'] += 1
else:
log_data['unmodified_ct'] += 1
else:
# if the file goes missing, ansible removed it (likely via clear_facts)
host.ansible_facts = {}
host.ansible_facts_modified = now()
hosts_to_update.append(host)
system_tracking_logger.info('Facts cleared for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)))
log_data['cleared_ct'] += 1
if len(hosts_to_update) > 100:
self.inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified'])
hosts_to_update = []
if hosts_to_update:
self.inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified'])
class LaunchTimeConfigBase(BaseModel):
"""

126
awx/main/tasks/facts.py Normal file
View File

@@ -0,0 +1,126 @@
import codecs
import datetime
import os
import json
import logging
# Django
from django.conf import settings
from django.db.models.query import QuerySet
from django.utils.encoding import smart_str
from django.utils.timezone import now
# AWX
from awx.main.utils.common import log_excess_runtime
logger = logging.getLogger('awx.main.tasks.facts')
system_tracking_logger = logging.getLogger('awx.analytics.system_tracking')
def _get_inventory_hosts(inventory, slice_number, slice_count, only=('name', 'ansible_facts', 'ansible_facts_modified', 'modified', 'inventory_id'), **filters):
"""Return value is an iterable for the relevant hosts for this job"""
if not inventory:
return []
host_queryset = inventory.hosts.only(*only)
if filters:
host_queryset = host_queryset.filter(**filters)
host_queryset = inventory.get_sliced_hosts(host_queryset, slice_number, slice_count)
if isinstance(host_queryset, QuerySet):
return host_queryset.iterator()
return host_queryset
@log_excess_runtime(logger, debug_cutoff=0.01, msg='Inventory {inventory_id} host facts prepared for {written_ct} hosts, took {delta:.3f} s', add_log_data=True)
def start_fact_cache(inventory, destination, log_data, timeout=None, slice_number=0, slice_count=1):
log_data['inventory_id'] = inventory.id
log_data['written_ct'] = 0
try:
os.makedirs(destination, mode=0o700)
except FileExistsError:
pass
if timeout is None:
timeout = settings.ANSIBLE_FACT_CACHE_TIMEOUT
if timeout > 0:
# exclude hosts with fact data older than `settings.ANSIBLE_FACT_CACHE_TIMEOUT seconds`
timeout = now() - datetime.timedelta(seconds=timeout)
hosts = _get_inventory_hosts(inventory, slice_number, slice_count, ansible_facts_modified__gte=timeout)
else:
hosts = _get_inventory_hosts(inventory, slice_number, slice_count)
last_filepath_written = None
for host in hosts:
filepath = os.sep.join(map(str, [destination, host.name]))
if not os.path.realpath(filepath).startswith(destination):
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
continue
try:
with codecs.open(filepath, 'w', encoding='utf-8') as f:
os.chmod(f.name, 0o600)
json.dump(host.ansible_facts, f)
log_data['written_ct'] += 1
last_filepath_written = filepath
except IOError:
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
continue
# make note of the time we wrote the last file so we can check if any file changed later
if last_filepath_written:
return os.path.getmtime(last_filepath_written)
return None
@log_excess_runtime(
logger,
debug_cutoff=0.01,
msg='Inventory {inventory_id} host facts: updated {updated_ct}, cleared {cleared_ct}, unchanged {unmodified_ct}, took {delta:.3f} s',
add_log_data=True,
)
def finish_fact_cache(inventory, destination, facts_write_time, log_data, slice_number=0, slice_count=1, job_id=None):
log_data['inventory_id'] = inventory.id
log_data['updated_ct'] = 0
log_data['unmodified_ct'] = 0
log_data['cleared_ct'] = 0
hosts_to_update = []
for host in _get_inventory_hosts(inventory, slice_number, slice_count):
filepath = os.sep.join(map(str, [destination, host.name]))
if not os.path.realpath(filepath).startswith(destination):
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
continue
if os.path.exists(filepath):
# If the file changed since we wrote the last facts file, pre-playbook run...
modified = os.path.getmtime(filepath)
if (not facts_write_time) or modified > facts_write_time:
with codecs.open(filepath, 'r', encoding='utf-8') as f:
try:
ansible_facts = json.load(f)
except ValueError:
continue
host.ansible_facts = ansible_facts
host.ansible_facts_modified = now()
hosts_to_update.append(host)
system_tracking_logger.info(
'New fact for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)),
extra=dict(
inventory_id=host.inventory.id,
host_name=host.name,
ansible_facts=host.ansible_facts,
ansible_facts_modified=host.ansible_facts_modified.isoformat(),
job_id=job_id,
),
)
log_data['updated_ct'] += 1
else:
log_data['unmodified_ct'] += 1
else:
# if the file goes missing, ansible removed it (likely via clear_facts)
host.ansible_facts = {}
host.ansible_facts_modified = now()
hosts_to_update.append(host)
system_tracking_logger.info('Facts cleared for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)))
log_data['cleared_ct'] += 1
if len(hosts_to_update) > 100:
inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified'])
hosts_to_update = []
if hosts_to_update:
inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified'])

View File

@@ -63,6 +63,7 @@ from awx.main.tasks.callback import (
)
from awx.main.tasks.signals import with_signal_handling, signal_callback
from awx.main.tasks.receptor import AWXReceptorJob
from awx.main.tasks.facts import start_fact_cache, finish_fact_cache
from awx.main.exceptions import AwxTaskError, PostRunError, ReceptorNodeNotFound
from awx.main.utils.ansible import read_ansible_config
from awx.main.utils.execution_environments import CONTAINER_ROOT, to_container_path
@@ -455,6 +456,9 @@ class BaseTask(object):
instance.ansible_version = ansible_version_info
instance.save(update_fields=['ansible_version'])
def should_use_fact_cache(self):
return False
@with_path_cleanup
@with_signal_handling
def run(self, pk, **kwargs):
@@ -553,7 +557,8 @@ class BaseTask(object):
params['module'] = self.build_module_name(self.instance)
params['module_args'] = self.build_module_args(self.instance)
if getattr(self.instance, 'use_fact_cache', False):
# TODO: refactor into a better BasTask method
if self.should_use_fact_cache():
# Enable Ansible fact cache.
params['fact_cache_type'] = 'jsonfile'
else:
@@ -1008,6 +1013,9 @@ class RunJob(SourceControlMixin, BaseTask):
return args
def should_use_fact_cache(self):
return self.instance.use_fact_cache
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
return job.playbook
@@ -1073,8 +1081,14 @@ class RunJob(SourceControlMixin, BaseTask):
# Fetch "cached" fact data from prior runs and put on the disk
# where ansible expects to find it
if job.use_fact_cache:
self.facts_write_time = self.instance.start_job_fact_cache(os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'))
if self.should_use_fact_cache():
job.log_lifecycle("start_job_fact_cache")
self.facts_write_time = start_fact_cache(
job.inventory,
os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'),
slice_number=job.job_slice_number,
slice_count=job.job_slice_count,
)
def build_project_dir(self, job, private_data_dir):
self.sync_and_copy(job.project, private_data_dir, scm_branch=job.scm_branch)
@@ -1088,10 +1102,15 @@ class RunJob(SourceControlMixin, BaseTask):
# actual `run()` call; this _usually_ means something failed in
# the pre_run_hook method
return
if job.use_fact_cache:
job.finish_job_fact_cache(
if self.should_use_fact_cache():
job.log_lifecycle("finish_job_fact_cache")
finish_fact_cache(
job.inventory,
os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'),
self.facts_write_time,
facts_write_time=self.facts_write_time,
slice_number=job.job_slice_number,
slice_count=job.job_slice_count,
job_id=job.id,
)
def final_run_hook(self, job, status, private_data_dir):
@@ -1529,11 +1548,14 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
# special case for constructed inventories, we pass source inventories from database
# these must come in order, and in order _before_ the constructed inventory itself
if inventory_update.inventory.kind == 'constructed':
inventory_update.log_lifecycle("start_job_fact_cache")
for input_inventory in inventory_update.inventory.input_inventories.all():
args.append('-i')
script_params = dict(hostvars=True, towervars=True)
source_inv_path = self.write_inventory_file(input_inventory, private_data_dir, f'hosts_{input_inventory.id}', script_params)
args.append(to_container_path(source_inv_path, private_data_dir))
# Include any facts from input inventories so they can be used in filters
start_fact_cache(input_inventory, os.path.join(private_data_dir, 'artifacts', str(inventory_update.id), 'fact_cache'))
# Add arguments for the source inventory file/script/thing
rel_path = self.pseudo_build_inventory(inventory_update, private_data_dir)
@@ -1562,6 +1584,9 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
return args
def should_use_fact_cache(self):
return bool(self.instance.source == 'constructed')
def build_inventory(self, inventory_update, private_data_dir):
return None # what runner expects in order to not deal with inventory

View File

@@ -10,10 +10,12 @@ from awx.main.models import (
Inventory,
Host,
)
from awx.main.tasks.facts import start_fact_cache, finish_fact_cache
@pytest.fixture
def hosts(inventory):
def hosts():
inventory = Inventory(id=5)
return [
Host(name='host1', ansible_facts={"a": 1, "b": 2}, inventory=inventory),
Host(name='host2', ansible_facts={"a": 1, "b": 2}, inventory=inventory),
@@ -23,20 +25,21 @@ def hosts(inventory):
@pytest.fixture
def inventory():
def inventory(mocker, hosts):
mocker.patch('awx.main.tasks.facts._get_inventory_hosts', return_value=hosts)
return Inventory(id=5)
@pytest.fixture
def job(mocker, hosts, inventory):
def job(mocker, inventory):
j = Job(inventory=inventory, id=2)
j._get_inventory_hosts = mocker.Mock(return_value=hosts)
# j._get_inventory_hosts = mocker.Mock(return_value=hosts)
return j
def test_start_job_fact_cache(hosts, job, inventory, tmpdir):
fact_cache = os.path.join(tmpdir, 'facts')
last_modified = job.start_job_fact_cache(fact_cache, timeout=0)
last_modified = start_fact_cache(inventory, fact_cache, timeout=0)
for host in hosts:
filepath = os.path.join(fact_cache, host.name)
@@ -47,24 +50,25 @@ def test_start_job_fact_cache(hosts, job, inventory, tmpdir):
def test_fact_cache_with_invalid_path_traversal(job, inventory, tmpdir, mocker):
job._get_inventory_hosts = mocker.Mock(
mocker.patch(
'awx.main.tasks.facts._get_inventory_hosts',
return_value=[
Host(
name='../foo',
ansible_facts={"a": 1, "b": 2},
),
]
],
)
fact_cache = os.path.join(tmpdir, 'facts')
job.start_job_fact_cache(fact_cache, timeout=0)
start_fact_cache(inventory, fact_cache, timeout=0)
# a file called "foo" should _not_ be written outside the facts dir
assert os.listdir(os.path.join(fact_cache, '..')) == ['facts']
def test_finish_job_fact_cache_with_existing_data(job, hosts, inventory, mocker, tmpdir):
fact_cache = os.path.join(tmpdir, 'facts')
last_modified = job.start_job_fact_cache(fact_cache, timeout=0)
last_modified = start_fact_cache(inventory, fact_cache, timeout=0)
bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update')
@@ -80,7 +84,7 @@ def test_finish_job_fact_cache_with_existing_data(job, hosts, inventory, mocker,
new_modification_time = time.time() + 3600
os.utime(filepath, (new_modification_time, new_modification_time))
job.finish_job_fact_cache(fact_cache, last_modified)
finish_fact_cache(inventory, fact_cache, last_modified)
for host in (hosts[0], hosts[2], hosts[3]):
assert host.ansible_facts == {"a": 1, "b": 2}
@@ -91,7 +95,7 @@ def test_finish_job_fact_cache_with_existing_data(job, hosts, inventory, mocker,
def test_finish_job_fact_cache_with_bad_data(job, hosts, inventory, mocker, tmpdir):
fact_cache = os.path.join(tmpdir, 'facts')
last_modified = job.start_job_fact_cache(fact_cache, timeout=0)
last_modified = start_fact_cache(inventory, fact_cache, timeout=0)
bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update')
@@ -103,19 +107,19 @@ def test_finish_job_fact_cache_with_bad_data(job, hosts, inventory, mocker, tmpd
new_modification_time = time.time() + 3600
os.utime(filepath, (new_modification_time, new_modification_time))
job.finish_job_fact_cache(fact_cache, last_modified)
finish_fact_cache(inventory, fact_cache, last_modified)
bulk_update.assert_not_called()
def test_finish_job_fact_cache_clear(job, hosts, inventory, mocker, tmpdir):
fact_cache = os.path.join(tmpdir, 'facts')
last_modified = job.start_job_fact_cache(fact_cache, timeout=0)
last_modified = start_fact_cache(inventory, fact_cache, timeout=0)
bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update')
os.remove(os.path.join(fact_cache, hosts[1].name))
job.finish_job_fact_cache(fact_cache, last_modified)
finish_fact_cache(inventory, fact_cache, last_modified)
for host in (hosts[0], hosts[2], hosts[3]):
assert host.ansible_facts == {"a": 1, "b": 2}