mirror of
https://github.com/ansible/awx.git
synced 2026-01-10 15:32:07 -03:30
AAP-45933 [2.5 Backport] AAP-4865 bug fact storage (#6945)
* Revise start_fact_cache and finish_fact_cache to use JSON file (#15970) * Revise start_fact_cache and finish_fact_cache to use JSON file with host list inside it * Revise artifacts path to be relative to the job private_data_dir * Update calls to start_fact_cache and finish_fact_cache to agree with new reference to artifacts_dir * Prevents unnecessary updates to ansible_facts_modified, fixing timestamp-related test failures. * Import bulk_update_sorted_by_id * Removed assert that calls ansible_facts_new which was removed in the backported pr * Add import of Host back
This commit is contained in:
parent
8fe4223eac
commit
4e332ac2c7
@ -20,27 +20,29 @@ system_tracking_logger = logging.getLogger('awx.analytics.system_tracking')
|
||||
|
||||
|
||||
@log_excess_runtime(logger, debug_cutoff=0.01, msg='Inventory {inventory_id} host facts prepared for {written_ct} hosts, took {delta:.3f} s', add_log_data=True)
|
||||
def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=None):
|
||||
def start_fact_cache(hosts, artifacts_dir, timeout=None, inventory_id=None, log_data=None):
|
||||
log_data = log_data or {}
|
||||
log_data['inventory_id'] = inventory_id
|
||||
log_data['written_ct'] = 0
|
||||
hosts_cached = list()
|
||||
try:
|
||||
os.makedirs(destination, mode=0o700)
|
||||
except FileExistsError:
|
||||
pass
|
||||
hosts_cached = []
|
||||
|
||||
# Create the fact_cache directory inside artifacts_dir
|
||||
fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache')
|
||||
os.makedirs(fact_cache_dir, mode=0o700, exist_ok=True)
|
||||
|
||||
if timeout is None:
|
||||
timeout = settings.ANSIBLE_FACT_CACHE_TIMEOUT
|
||||
|
||||
last_filepath_written = None
|
||||
last_write_time = None
|
||||
|
||||
for host in hosts:
|
||||
hosts_cached.append(host)
|
||||
hosts_cached.append(host.name)
|
||||
if not host.ansible_facts_modified or (timeout and host.ansible_facts_modified < now() - datetime.timedelta(seconds=timeout)):
|
||||
continue # facts are expired - do not write them
|
||||
|
||||
filepath = os.sep.join(map(str, [destination, host.name]))
|
||||
if not os.path.realpath(filepath).startswith(destination):
|
||||
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
|
||||
filepath = os.path.join(fact_cache_dir, host.name)
|
||||
if not os.path.realpath(filepath).startswith(fact_cache_dir):
|
||||
logger.error(f'facts for host {smart_str(host.name)} could not be cached')
|
||||
continue
|
||||
|
||||
try:
|
||||
@ -48,15 +50,21 @@ def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=No
|
||||
os.chmod(f.name, 0o600)
|
||||
json.dump(host.ansible_facts, f)
|
||||
log_data['written_ct'] += 1
|
||||
last_filepath_written = filepath
|
||||
last_write_time = os.path.getmtime(filepath)
|
||||
except IOError:
|
||||
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
|
||||
logger.error(f'facts for host {smart_str(host.name)} could not be cached')
|
||||
continue
|
||||
|
||||
if last_filepath_written:
|
||||
return os.path.getmtime(last_filepath_written), hosts_cached
|
||||
|
||||
return None, hosts_cached
|
||||
# Write summary file directly to the artifacts_dir
|
||||
if inventory_id is not None:
|
||||
summary_file = os.path.join(artifacts_dir, 'host_cache_summary.json')
|
||||
summary_data = {
|
||||
'last_write_time': last_write_time,
|
||||
'hosts_cached': hosts_cached,
|
||||
'written_ct': log_data['written_ct'],
|
||||
}
|
||||
with open(summary_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(summary_data, f, indent=2)
|
||||
|
||||
|
||||
@log_excess_runtime(
|
||||
@ -65,34 +73,54 @@ def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=No
|
||||
msg='Inventory {inventory_id} host facts: updated {updated_ct}, cleared {cleared_ct}, unchanged {unmodified_ct}, took {delta:.3f} s',
|
||||
add_log_data=True,
|
||||
)
|
||||
def finish_fact_cache(hosts_cached, destination, facts_write_time, log_data, job_id=None, inventory_id=None):
|
||||
def finish_fact_cache(artifacts_dir, job_id=None, inventory_id=None, log_data=None):
|
||||
log_data = log_data or {}
|
||||
log_data['inventory_id'] = inventory_id
|
||||
log_data['updated_ct'] = 0
|
||||
log_data['unmodified_ct'] = 0
|
||||
log_data['cleared_ct'] = 0
|
||||
# The summary file is directly inside the artifacts dir
|
||||
summary_path = os.path.join(artifacts_dir, 'host_cache_summary.json')
|
||||
if not os.path.exists(summary_path):
|
||||
logger.error(f'Missing summary file at {summary_path}')
|
||||
return
|
||||
|
||||
hosts_cached = sorted((h for h in hosts_cached if h.id is not None), key=lambda h: h.id)
|
||||
try:
|
||||
with open(summary_path, 'r', encoding='utf-8') as f:
|
||||
summary = json.load(f)
|
||||
facts_write_time = os.path.getmtime(summary_path) # After successful read
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
logger.error(f'Error reading summary file at {summary_path}: {e}')
|
||||
return
|
||||
|
||||
host_names = summary.get('hosts_cached', [])
|
||||
hosts_cached = Host.objects.filter(name__in=host_names).order_by('id').iterator()
|
||||
# Path where individual fact files were written
|
||||
fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache')
|
||||
hosts_to_update = []
|
||||
|
||||
for host in hosts_cached:
|
||||
filepath = os.sep.join(map(str, [destination, host.name]))
|
||||
if not os.path.realpath(filepath).startswith(destination):
|
||||
system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name)))
|
||||
filepath = os.path.join(fact_cache_dir, host.name)
|
||||
if not os.path.realpath(filepath).startswith(fact_cache_dir):
|
||||
logger.error(f'Invalid path for facts file: {filepath}')
|
||||
continue
|
||||
|
||||
if os.path.exists(filepath):
|
||||
# If the file changed since we wrote the last facts file, pre-playbook run...
|
||||
modified = os.path.getmtime(filepath)
|
||||
if (not facts_write_time) or modified > facts_write_time:
|
||||
with codecs.open(filepath, 'r', encoding='utf-8') as f:
|
||||
try:
|
||||
if not facts_write_time or modified >= facts_write_time:
|
||||
try:
|
||||
with codecs.open(filepath, 'r', encoding='utf-8') as f:
|
||||
ansible_facts = json.load(f)
|
||||
except ValueError:
|
||||
continue
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if ansible_facts != host.ansible_facts:
|
||||
host.ansible_facts = ansible_facts
|
||||
host.ansible_facts_modified = now()
|
||||
hosts_to_update.append(host)
|
||||
system_tracking_logger.info(
|
||||
'New fact for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)),
|
||||
logger.info(
|
||||
f'New fact for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}',
|
||||
extra=dict(
|
||||
inventory_id=host.inventory.id,
|
||||
host_name=host.name,
|
||||
@ -102,6 +130,8 @@ def finish_fact_cache(hosts_cached, destination, facts_write_time, log_data, job
|
||||
),
|
||||
)
|
||||
log_data['updated_ct'] += 1
|
||||
else:
|
||||
log_data['unmodified_ct'] += 1
|
||||
else:
|
||||
log_data['unmodified_ct'] += 1
|
||||
else:
|
||||
@ -110,9 +140,11 @@ def finish_fact_cache(hosts_cached, destination, facts_write_time, log_data, job
|
||||
host.ansible_facts = {}
|
||||
host.ansible_facts_modified = now()
|
||||
hosts_to_update.append(host)
|
||||
system_tracking_logger.info('Facts cleared for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)))
|
||||
logger.info(f'Facts cleared for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}')
|
||||
log_data['cleared_ct'] += 1
|
||||
if len(hosts_to_update) > 100:
|
||||
|
||||
if len(hosts_to_update) >= 100:
|
||||
bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified'])
|
||||
hosts_to_update = []
|
||||
|
||||
bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified'])
|
||||
|
||||
@ -1091,8 +1091,8 @@ class RunJob(SourceControlMixin, BaseTask):
|
||||
# where ansible expects to find it
|
||||
if self.should_use_fact_cache():
|
||||
job.log_lifecycle("start_job_fact_cache")
|
||||
self.facts_write_time, self.hosts_with_facts_cached = start_fact_cache(
|
||||
job.get_hosts_for_fact_cache(), os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'), inventory_id=job.inventory_id
|
||||
self.hosts_with_facts_cached = start_fact_cache(
|
||||
job.get_hosts_for_fact_cache(), artifacts_dir=os.path.join(private_data_dir, 'artifacts', str(job.id)), inventory_id=job.inventory_id
|
||||
)
|
||||
|
||||
def build_project_dir(self, job, private_data_dir):
|
||||
@ -1102,7 +1102,7 @@ class RunJob(SourceControlMixin, BaseTask):
|
||||
super(RunJob, self).post_run_hook(job, status)
|
||||
job.refresh_from_db(fields=['job_env'])
|
||||
private_data_dir = job.job_env.get('AWX_PRIVATE_DATA_DIR')
|
||||
if (not private_data_dir) or (not hasattr(self, 'facts_write_time')):
|
||||
if not private_data_dir:
|
||||
# If there's no private data dir, that means we didn't get into the
|
||||
# actual `run()` call; this _usually_ means something failed in
|
||||
# the pre_run_hook method
|
||||
@ -1110,9 +1110,7 @@ class RunJob(SourceControlMixin, BaseTask):
|
||||
if self.should_use_fact_cache() and self.runner_callback.artifacts_processed:
|
||||
job.log_lifecycle("finish_job_fact_cache")
|
||||
finish_fact_cache(
|
||||
self.hosts_with_facts_cached,
|
||||
os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'),
|
||||
facts_write_time=self.facts_write_time,
|
||||
artifacts_dir=os.path.join(private_data_dir, 'artifacts', str(job.id)),
|
||||
job_id=job.id,
|
||||
inventory_id=job.inventory_id,
|
||||
)
|
||||
@ -1578,7 +1576,7 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
||||
# Include any facts from input inventories so they can be used in filters
|
||||
start_fact_cache(
|
||||
input_inventory.hosts.only(*HOST_FACTS_FIELDS),
|
||||
os.path.join(private_data_dir, 'artifacts', str(inventory_update.id), 'fact_cache'),
|
||||
artifacts_dir=os.path.join(private_data_dir, 'artifacts', str(inventory_update.id)),
|
||||
inventory_id=input_inventory.id,
|
||||
)
|
||||
|
||||
|
||||
@ -1,8 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from awx.main.models import (
|
||||
@ -15,6 +13,8 @@ from django.utils.timezone import now
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import time
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ref_time():
|
||||
@ -33,15 +33,23 @@ def hosts(ref_time):
|
||||
|
||||
|
||||
def test_start_job_fact_cache(hosts, tmpdir):
|
||||
fact_cache = os.path.join(tmpdir, 'facts')
|
||||
last_modified, _ = start_fact_cache(hosts, fact_cache, timeout=0)
|
||||
# Create artifacts dir inside tmpdir
|
||||
artifacts_dir = tmpdir.mkdir("artifacts")
|
||||
|
||||
# Assign a mock inventory ID
|
||||
inventory_id = 42
|
||||
|
||||
# Call the function WITHOUT log_data — the decorator handles it
|
||||
start_fact_cache(hosts, artifacts_dir=str(artifacts_dir), timeout=0, inventory_id=inventory_id)
|
||||
|
||||
# Fact files are written into artifacts_dir/fact_cache/
|
||||
fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache')
|
||||
|
||||
for host in hosts:
|
||||
filepath = os.path.join(fact_cache, host.name)
|
||||
filepath = os.path.join(fact_cache_dir, host.name)
|
||||
assert os.path.exists(filepath)
|
||||
with open(filepath, 'r') as f:
|
||||
assert f.read() == json.dumps(host.ansible_facts)
|
||||
assert os.path.getmtime(filepath) <= last_modified
|
||||
with open(filepath, 'r', encoding='utf-8') as f:
|
||||
assert json.load(f) == host.ansible_facts
|
||||
|
||||
|
||||
def test_fact_cache_with_invalid_path_traversal(tmpdir):
|
||||
@ -51,43 +59,63 @@ def test_fact_cache_with_invalid_path_traversal(tmpdir):
|
||||
ansible_facts={"a": 1, "b": 2},
|
||||
),
|
||||
]
|
||||
artifacts_dir = tmpdir.mkdir("artifacts")
|
||||
inventory_id = 42
|
||||
|
||||
fact_cache = os.path.join(tmpdir, 'facts')
|
||||
start_fact_cache(hosts, fact_cache, timeout=0)
|
||||
# a file called "foo" should _not_ be written outside the facts dir
|
||||
assert os.listdir(os.path.join(fact_cache, '..')) == ['facts']
|
||||
start_fact_cache(hosts, artifacts_dir=str(artifacts_dir), timeout=0, inventory_id=inventory_id)
|
||||
|
||||
# Fact cache directory (safe location)
|
||||
fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache')
|
||||
|
||||
# The bad host name should not produce a file
|
||||
assert not os.path.exists(os.path.join(fact_cache_dir, '../foo'))
|
||||
|
||||
# Make sure the fact_cache dir exists and is still empty
|
||||
assert os.listdir(fact_cache_dir) == []
|
||||
|
||||
|
||||
def test_start_job_fact_cache_past_timeout(hosts, tmpdir):
|
||||
fact_cache = os.path.join(tmpdir, 'facts')
|
||||
# the hosts fixture was modified 5s ago, which is more than 2s
|
||||
last_modified, _ = start_fact_cache(hosts, fact_cache, timeout=2)
|
||||
assert last_modified is None
|
||||
start_fact_cache(hosts, fact_cache, timeout=2)
|
||||
|
||||
for host in hosts:
|
||||
assert not os.path.exists(os.path.join(fact_cache, host.name))
|
||||
ret = start_fact_cache(hosts, fact_cache, timeout=2)
|
||||
assert ret is None
|
||||
|
||||
|
||||
def test_start_job_fact_cache_within_timeout(hosts, tmpdir):
|
||||
fact_cache = os.path.join(tmpdir, 'facts')
|
||||
# the hosts fixture was modified 5s ago, which is less than 7s
|
||||
last_modified, _ = start_fact_cache(hosts, fact_cache, timeout=7)
|
||||
assert last_modified
|
||||
artifacts_dir = tmpdir.mkdir("artifacts")
|
||||
|
||||
# The hosts fixture was modified 5s ago, which is less than 7s
|
||||
start_fact_cache(hosts, str(artifacts_dir), timeout=7)
|
||||
|
||||
fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache')
|
||||
for host in hosts:
|
||||
assert os.path.exists(os.path.join(fact_cache, host.name))
|
||||
filepath = os.path.join(fact_cache_dir, host.name)
|
||||
assert os.path.exists(filepath)
|
||||
with open(filepath, 'r') as f:
|
||||
assert json.load(f) == host.ansible_facts
|
||||
|
||||
|
||||
def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir):
|
||||
fact_cache = os.path.join(tmpdir, 'facts')
|
||||
last_modified, _ = start_fact_cache(hosts, fact_cache, timeout=0)
|
||||
start_fact_cache(hosts, fact_cache, timeout=0)
|
||||
|
||||
bulk_update = mocker.patch('awx.main.tasks.facts.bulk_update_sorted_by_id')
|
||||
|
||||
# Mock the os.path.exists behavior for host deletion
|
||||
# Let's assume the fact file for hosts[1] is missing.
|
||||
mocker.patch('os.path.exists', side_effect=lambda path: hosts[1].name not in path)
|
||||
|
||||
# Simulate one host's fact file getting deleted
|
||||
os.remove(os.path.join(fact_cache, hosts[1].name))
|
||||
finish_fact_cache(hosts, fact_cache, last_modified)
|
||||
# Simulate one host's fact file getting deleted manually
|
||||
host_to_delete_filepath = os.path.join(fact_cache, hosts[1].name)
|
||||
|
||||
# Simulate the file being removed by checking existence first, to avoid FileNotFoundError
|
||||
if os.path.exists(host_to_delete_filepath):
|
||||
os.remove(host_to_delete_filepath)
|
||||
|
||||
finish_fact_cache(fact_cache)
|
||||
|
||||
# Simulate side effects that would normally be applied during bulk update
|
||||
hosts[1].ansible_facts = {}
|
||||
@ -97,17 +125,15 @@ def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir):
|
||||
for host in (hosts[0], hosts[2], hosts[3]):
|
||||
assert host.ansible_facts == {"a": 1, "b": 2}
|
||||
assert host.ansible_facts_modified == ref_time
|
||||
|
||||
# Verify facts were cleared for host with deleted cache file
|
||||
assert hosts[1].ansible_facts == {}
|
||||
assert hosts[1].ansible_facts_modified > ref_time
|
||||
|
||||
bulk_update.assert_called_once_with(Host, [], fields=['ansible_facts', 'ansible_facts_modified'])
|
||||
# Current implementation skips the call entirely if hosts_to_update == []
|
||||
bulk_update.assert_not_called()
|
||||
|
||||
|
||||
def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir):
|
||||
fact_cache = os.path.join(tmpdir, 'facts')
|
||||
last_modified, _ = start_fact_cache(hosts, fact_cache, timeout=0)
|
||||
start_fact_cache(hosts, fact_cache, timeout=0)
|
||||
|
||||
bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update')
|
||||
|
||||
@ -119,6 +145,6 @@ def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir):
|
||||
new_modification_time = time.time() + 3600
|
||||
os.utime(filepath, (new_modification_time, new_modification_time))
|
||||
|
||||
finish_fact_cache(hosts, fact_cache, last_modified)
|
||||
finish_fact_cache(fact_cache)
|
||||
|
||||
bulk_update.assert_not_called()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user