diff --git a/awx/main/tasks/facts.py b/awx/main/tasks/facts.py index 48772e9c2c..3bfd5c7950 100644 --- a/awx/main/tasks/facts.py +++ b/awx/main/tasks/facts.py @@ -99,64 +99,99 @@ def finish_fact_cache(host_qs, artifacts_dir, job_id=None, inventory_id=None, jo try: with open(summary_path, 'r', encoding='utf-8') as f: summary = json.load(f) - facts_write_time = os.path.getmtime(summary_path) # After successful read + facts_write_time = os.path.getmtime(summary_path) except (json.JSONDecodeError, OSError) as e: logger.error(f'Error reading summary file at {summary_path}: {e}') return hosts_cached_map = summary.get('hosts_cached', {}) - host_names = list(hosts_cached_map.keys()) - hosts_cached = host_qs.filter(name__in=host_names).order_by('id').iterator() - # Path where individual fact files were written fact_cache_dir = os.path.join(artifacts_dir, 'fact_cache') - hosts_to_update = [] - for host in hosts_cached: - filepath = os.path.join(fact_cache_dir, host.name) - if not os.path.realpath(filepath).startswith(fact_cache_dir): - logger.error(f'Invalid path for facts file: {filepath}') - continue + # Phase 1: Scan files on disk to discover which hosts have updated or missing facts + hosts_with_updates = set() # hostnames whose fact file was modified by Ansible + hosts_to_clear = [] # hostnames where Ansible removed the fact file + seen_in_dir = set() # hostnames we found as files on disk - if os.path.exists(filepath): - # If the file changed since we wrote the last facts file, pre-playbook run... - modified = os.path.getmtime(filepath) - if not facts_write_time or modified >= facts_write_time: - try: - with codecs.open(filepath, 'r', encoding='utf-8') as f: - ansible_facts = json.load(f) - except ValueError: - continue + if os.path.isdir(fact_cache_dir): + for filename in os.listdir(fact_cache_dir): + if filename not in hosts_cached_map: + continue # not an expected host for this job - if ansible_facts != host.ansible_facts: - host.ansible_facts = ansible_facts - host.ansible_facts_modified = now() - hosts_to_update.append(host) - logger.info( - f'New fact for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}', - extra=dict( - inventory_id=host.inventory.id, - host_name=host.name, - ansible_facts=host.ansible_facts, - ansible_facts_modified=host.ansible_facts_modified.isoformat(), - job_id=job_id, - ), - ) - log_data['updated_ct'] += 1 - else: - log_data['unmodified_ct'] += 1 - else: - log_data['unmodified_ct'] += 1 - else: - # File is missing. Only interpret this as "ansible cleared facts" if - # start_fact_cache actually wrote a file for this host (i.e. the host - # had valid, non-expired facts before the job ran). If no file was - # ever written, the missing file is expected and not a clear signal. - if not hosts_cached_map.get(host.name): - log_data['unmodified_ct'] += 1 + filepath = os.path.join(fact_cache_dir, filename) + if os.path.islink(filepath): + logger.error(f'Invalid path for facts file: {filepath}') + continue + if not os.path.isfile(filepath): continue - # if the file goes missing, ansible removed it (likely via clear_facts) - # if the file goes missing, but the host has not started facts, then we should not clear the facts + seen_in_dir.add(filename) + try: + modified = os.path.getmtime(filepath) + except OSError as e: + logger.warning(f'Could not stat facts file {filepath}: {e}') + continue + if modified >= facts_write_time: + hosts_with_updates.add(filename) + else: + log_data['unmodified_ct'] += 1 + + # Check for files we wrote pre-job that are now missing (Ansible cleared facts) + for hostname, was_written in hosts_cached_map.items(): + if hostname in seen_in_dir: + continue # already handled above + if was_written: + hosts_to_clear.append(hostname) + else: + log_data['unmodified_ct'] += 1 + + # Phase 2: Stream updated facts to database in batches + if hosts_with_updates: + hosts_to_save = [] + total_rows_updated = 0 + for host in host_qs.filter(name__in=list(hosts_with_updates)).select_related('inventory').iterator(): + filepath = os.path.join(fact_cache_dir, host.name) + try: + with codecs.open(filepath, 'r', encoding='utf-8') as f: + new_facts = json.load(f) + except (ValueError, OSError): + continue + + if new_facts != host.ansible_facts: + host.ansible_facts = new_facts + host.ansible_facts_modified = now() + hosts_to_save.append(host) + logger.info( + f'New fact for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}', + extra=dict( + inventory_id=host.inventory.id, + host_name=host.name, + ansible_facts=host.ansible_facts, + ansible_facts_modified=host.ansible_facts_modified.isoformat(), + job_id=job_id, + ), + ) + log_data['updated_ct'] += 1 + else: + log_data['unmodified_ct'] += 1 + + if len(hosts_to_save) >= 100: + total_rows_updated += bulk_update_sorted_by_id(Host, hosts_to_save, fields=['ansible_facts', 'ansible_facts_modified']) + hosts_to_save = [] + + if hosts_to_save: + total_rows_updated += bulk_update_sorted_by_id(Host, hosts_to_save, fields=['ansible_facts', 'ansible_facts_modified']) + + # Mismatch means a concurrent process changed or deleted hosts between our read and bulk update + if total_rows_updated != log_data['updated_ct']: + logger.warning( + f'Fact update for inventory {inventory_id} job {job_id}: expected to update {log_data["updated_ct"]} hosts but {total_rows_updated} rows were changed' + ) + + # Phase 3: Clear facts for hosts whose files were removed by Ansible + if hosts_to_clear: + hosts = list(host_qs.filter(name__in=hosts_to_clear).select_related('inventory')) + clear_hosts = [] + for host in hosts: if job_created and host.ansible_facts_modified and host.ansible_facts_modified > job_created: logger.warning( f'Skipping fact clear for host {smart_str(host.name)} in job {job_id} ' @@ -169,13 +204,13 @@ def finish_fact_cache(host_qs, artifacts_dir, job_id=None, inventory_id=None, jo else: host.ansible_facts = {} host.ansible_facts_modified = now() - hosts_to_update.append(host) + clear_hosts.append(host) logger.info(f'Facts cleared for inventory {smart_str(host.inventory.name)} host {smart_str(host.name)}') log_data['cleared_ct'] += 1 - if len(hosts_to_update) >= 100: - bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified']) - hosts_to_update = [] + if clear_hosts: + rows = bulk_update_sorted_by_id(Host, clear_hosts, fields=['ansible_facts', 'ansible_facts_modified']) + if rows != len(clear_hosts): + logger.warning(f'Fact clear for inventory {inventory_id} job {job_id}: expected to clear {len(clear_hosts)} hosts but {rows} rows were changed') - bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified']) logger.debug(f'Updated {log_data["updated_ct"]} host facts for inventory {inventory_id} in job {job_id}') diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py index bd3422d7b3..763dc0b4e0 100644 --- a/awx/main/tests/unit/models/test_jobs.py +++ b/awx/main/tests/unit/models/test_jobs.py @@ -112,7 +112,9 @@ def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir): os.remove(os.path.join(fact_cache_dir, hosts[1].name)) hosts_qs = mock.MagicMock() - hosts_qs.filter.return_value.order_by.return_value.iterator.return_value = iter(hosts) + # The new code calls host_qs.filter(name__in=...).select_related('inventory') + # Only hosts[1] needs clearing (its file was removed), so return just that host + hosts_qs.filter.return_value.select_related.return_value = [hosts[1]] finish_fact_cache(hosts_qs, artifacts_dir=artifacts_dir, inventory_id=inventory_id) @@ -145,10 +147,8 @@ def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir): os.utime(filepath, (new_modification_time, new_modification_time)) hosts_qs = mock.MagicMock() - hosts_qs.filter.return_value.order_by.return_value.iterator.return_value = iter(hosts) finish_fact_cache(hosts_qs, artifacts_dir=artifacts_dir, inventory_id=inventory_id) - # Invalid JSON should be skipped — no hosts updated - updated_hosts = bulk_update.call_args[0][1] - assert updated_hosts == [] + # Invalid JSON should be skipped — no hosts updated, bulk_update never called + bulk_update.assert_not_called()