mirror of
https://github.com/ansible/awx.git
synced 2026-05-19 14:57:39 -02:30
AAP-72269 Change fact processing loop to use file listing (#16403)
* Change fact processing loop to use file listing * Fix some test * Address coderabbit comments * Handle saving facts in batches to keep memory low * Improve log about mismatch in response to review comment
This commit is contained in:
@@ -99,64 +99,99 @@ def finish_fact_cache(host_qs, artifacts_dir, job_id=None, inventory_id=None, jo
|
|||||||
try:
|
try:
|
||||||
with open(summary_path, 'r', encoding='utf-8') as f:
|
with open(summary_path, 'r', encoding='utf-8') as f:
|
||||||
summary = json.load(f)
|
summary = json.load(f)
|
||||||
facts_write_time = os.path.getmtime(summary_path) # After successful read
|
facts_write_time = os.path.getmtime(summary_path)
|
||||||
except (json.JSONDecodeError, OSError) as e:
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
logger.error(f'Error reading summary file at {summary_path}: {e}')
|
logger.error(f'Error reading summary file at {summary_path}: {e}')
|
||||||
return
|
return
|
||||||
|
|
||||||
hosts_cached_map = summary.get('hosts_cached', {})
|
hosts_cached_map = summary.get('hosts_cached', {})
|
||||||
host_names = list(hosts_cached_map.keys())
|
|
||||||
hosts_cached = host_qs.filter(name__in=host_names).order_by('id').iterator()
|
|
||||||
# Path where individual fact files were written
|
|
||||||
fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache')
|
fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache')
|
||||||
hosts_to_update = []
|
|
||||||
|
|
||||||
for host in hosts_cached:
|
# Phase 1: Scan files on disk to discover which hosts have updated or missing facts
|
||||||
filepath = os.path.join(fact_cache_dir, host.name)
|
hosts_with_updates = set() # hostnames whose fact file was modified by Ansible
|
||||||
if not os.path.realpath(filepath).startswith(fact_cache_dir):
|
hosts_to_clear = [] # hostnames where Ansible removed the fact file
|
||||||
logger.error(f'Invalid path for facts file: {filepath}')
|
seen_in_dir = set() # hostnames we found as files on disk
|
||||||
continue
|
|
||||||
|
|
||||||
if os.path.exists(filepath):
|
if os.path.isdir(fact_cache_dir):
|
||||||
# If the file changed since we wrote the last facts file, pre-playbook run...
|
for filename in os.listdir(fact_cache_dir):
|
||||||
modified = os.path.getmtime(filepath)
|
if filename not in hosts_cached_map:
|
||||||
if not facts_write_time or modified >= facts_write_time:
|
continue # not an expected host for this job
|
||||||
try:
|
|
||||||
with codecs.open(filepath, 'r', encoding='utf-8') as f:
|
|
||||||
ansible_facts = json.load(f)
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if ansible_facts != host.ansible_facts:
|
filepath = os.path.join(fact_cache_dir, filename)
|
||||||
host.ansible_facts = ansible_facts
|
if os.path.islink(filepath):
|
||||||
host.ansible_facts_modified = now()
|
logger.error(f'Invalid path for facts file: {filepath}')
|
||||||
hosts_to_update.append(host)
|
continue
|
||||||
logger.info(
|
if not os.path.isfile(filepath):
|
||||||
f'New fact for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}',
|
|
||||||
extra=dict(
|
|
||||||
inventory_id=host.inventory.id,
|
|
||||||
host_name=host.name,
|
|
||||||
ansible_facts=host.ansible_facts,
|
|
||||||
ansible_facts_modified=host.ansible_facts_modified.isoformat(),
|
|
||||||
job_id=job_id,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
log_data['updated_ct'] += 1
|
|
||||||
else:
|
|
||||||
log_data['unmodified_ct'] += 1
|
|
||||||
else:
|
|
||||||
log_data['unmodified_ct'] += 1
|
|
||||||
else:
|
|
||||||
# File is missing. Only interpret this as "ansible cleared facts" if
|
|
||||||
# start_fact_cache actually wrote a file for this host (i.e. the host
|
|
||||||
# had valid, non-expired facts before the job ran). If no file was
|
|
||||||
# ever written, the missing file is expected and not a clear signal.
|
|
||||||
if not hosts_cached_map.get(host.name):
|
|
||||||
log_data['unmodified_ct'] += 1
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# if the file goes missing, ansible removed it (likely via clear_facts)
|
seen_in_dir.add(filename)
|
||||||
# if the file goes missing, but the host has not started facts, then we should not clear the facts
|
try:
|
||||||
|
modified = os.path.getmtime(filepath)
|
||||||
|
except OSError as e:
|
||||||
|
logger.warning(f'Could not stat facts file {filepath}: {e}')
|
||||||
|
continue
|
||||||
|
if modified >= facts_write_time:
|
||||||
|
hosts_with_updates.add(filename)
|
||||||
|
else:
|
||||||
|
log_data['unmodified_ct'] += 1
|
||||||
|
|
||||||
|
# Check for files we wrote pre-job that are now missing (Ansible cleared facts)
|
||||||
|
for hostname, was_written in hosts_cached_map.items():
|
||||||
|
if hostname in seen_in_dir:
|
||||||
|
continue # already handled above
|
||||||
|
if was_written:
|
||||||
|
hosts_to_clear.append(hostname)
|
||||||
|
else:
|
||||||
|
log_data['unmodified_ct'] += 1
|
||||||
|
|
||||||
|
# Phase 2: Stream updated facts to database in batches
|
||||||
|
if hosts_with_updates:
|
||||||
|
hosts_to_save = []
|
||||||
|
total_rows_updated = 0
|
||||||
|
for host in host_qs.filter(name__in=list(hosts_with_updates)).select_related('inventory').iterator():
|
||||||
|
filepath = os.path.join(fact_cache_dir, host.name)
|
||||||
|
try:
|
||||||
|
with codecs.open(filepath, 'r', encoding='utf-8') as f:
|
||||||
|
new_facts = json.load(f)
|
||||||
|
except (ValueError, OSError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if new_facts != host.ansible_facts:
|
||||||
|
host.ansible_facts = new_facts
|
||||||
|
host.ansible_facts_modified = now()
|
||||||
|
hosts_to_save.append(host)
|
||||||
|
logger.info(
|
||||||
|
f'New fact for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}',
|
||||||
|
extra=dict(
|
||||||
|
inventory_id=host.inventory.id,
|
||||||
|
host_name=host.name,
|
||||||
|
ansible_facts=host.ansible_facts,
|
||||||
|
ansible_facts_modified=host.ansible_facts_modified.isoformat(),
|
||||||
|
job_id=job_id,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
log_data['updated_ct'] += 1
|
||||||
|
else:
|
||||||
|
log_data['unmodified_ct'] += 1
|
||||||
|
|
||||||
|
if len(hosts_to_save) >= 100:
|
||||||
|
total_rows_updated += bulk_update_sorted_by_id(Host, hosts_to_save, fields=['ansible_facts', 'ansible_facts_modified'])
|
||||||
|
hosts_to_save = []
|
||||||
|
|
||||||
|
if hosts_to_save:
|
||||||
|
total_rows_updated += bulk_update_sorted_by_id(Host, hosts_to_save, fields=['ansible_facts', 'ansible_facts_modified'])
|
||||||
|
|
||||||
|
# Mismatch means a concurrent process changed or deleted hosts between our read and bulk update
|
||||||
|
if total_rows_updated != log_data['updated_ct']:
|
||||||
|
logger.warning(
|
||||||
|
f'Fact update for inventory {inventory_id} job {job_id}: expected to update {log_data["updated_ct"]} hosts but {total_rows_updated} rows were changed'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 3: Clear facts for hosts whose files were removed by Ansible
|
||||||
|
if hosts_to_clear:
|
||||||
|
hosts = list(host_qs.filter(name__in=hosts_to_clear).select_related('inventory'))
|
||||||
|
clear_hosts = []
|
||||||
|
for host in hosts:
|
||||||
if job_created and host.ansible_facts_modified and host.ansible_facts_modified > job_created:
|
if job_created and host.ansible_facts_modified and host.ansible_facts_modified > job_created:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f'Skipping fact clear for host {smart_str(host.name)} in job {job_id} '
|
f'Skipping fact clear for host {smart_str(host.name)} in job {job_id} '
|
||||||
@@ -169,13 +204,13 @@ def finish_fact_cache(host_qs, artifacts_dir, job_id=None, inventory_id=None, jo
|
|||||||
else:
|
else:
|
||||||
host.ansible_facts = {}
|
host.ansible_facts = {}
|
||||||
host.ansible_facts_modified = now()
|
host.ansible_facts_modified = now()
|
||||||
hosts_to_update.append(host)
|
clear_hosts.append(host)
|
||||||
logger.info(f'Facts cleared for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}')
|
logger.info(f'Facts cleared for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}')
|
||||||
log_data['cleared_ct'] += 1
|
log_data['cleared_ct'] += 1
|
||||||
|
|
||||||
if len(hosts_to_update) >= 100:
|
if clear_hosts:
|
||||||
bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified'])
|
rows = bulk_update_sorted_by_id(Host, clear_hosts, fields=['ansible_facts', 'ansible_facts_modified'])
|
||||||
hosts_to_update = []
|
if rows != len(clear_hosts):
|
||||||
|
logger.warning(f'Fact clear for inventory {inventory_id} job {job_id}: expected to clear {len(clear_hosts)} hosts but {rows} rows were changed')
|
||||||
|
|
||||||
bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified'])
|
|
||||||
logger.debug(f'Updated {log_data["updated_ct"]} host facts for inventory {inventory_id} in job {job_id}')
|
logger.debug(f'Updated {log_data["updated_ct"]} host facts for inventory {inventory_id} in job {job_id}')
|
||||||
|
|||||||
@@ -112,7 +112,9 @@ def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir):
|
|||||||
os.remove(os.path.join(fact_cache_dir, hosts[1].name))
|
os.remove(os.path.join(fact_cache_dir, hosts[1].name))
|
||||||
|
|
||||||
hosts_qs = mock.MagicMock()
|
hosts_qs = mock.MagicMock()
|
||||||
hosts_qs.filter.return_value.order_by.return_value.iterator.return_value = iter(hosts)
|
# The new code calls host_qs.filter(name__in=...).select_related('inventory')
|
||||||
|
# Only hosts[1] needs clearing (its file was removed), so return just that host
|
||||||
|
hosts_qs.filter.return_value.select_related.return_value = [hosts[1]]
|
||||||
|
|
||||||
finish_fact_cache(hosts_qs, artifacts_dir=artifacts_dir, inventory_id=inventory_id)
|
finish_fact_cache(hosts_qs, artifacts_dir=artifacts_dir, inventory_id=inventory_id)
|
||||||
|
|
||||||
@@ -145,10 +147,8 @@ def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir):
|
|||||||
os.utime(filepath, (new_modification_time, new_modification_time))
|
os.utime(filepath, (new_modification_time, new_modification_time))
|
||||||
|
|
||||||
hosts_qs = mock.MagicMock()
|
hosts_qs = mock.MagicMock()
|
||||||
hosts_qs.filter.return_value.order_by.return_value.iterator.return_value = iter(hosts)
|
|
||||||
|
|
||||||
finish_fact_cache(hosts_qs, artifacts_dir=artifacts_dir, inventory_id=inventory_id)
|
finish_fact_cache(hosts_qs, artifacts_dir=artifacts_dir, inventory_id=inventory_id)
|
||||||
|
|
||||||
# Invalid JSON should be skipped — no hosts updated
|
# Invalid JSON should be skipped — no hosts updated, bulk_update never called
|
||||||
updated_hosts = bulk_update.call_args[0][1]
|
bulk_update.assert_not_called()
|
||||||
assert updated_hosts == []
|
|
||||||
|
|||||||
Reference in New Issue
Block a user