diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 61b87f4807..ae9e66ae5f 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -2,12 +2,8 @@ # All Rights Reserved. # Python -import codecs -import datetime import logging -import os import time -import json from urllib.parse import urljoin @@ -15,11 +11,8 @@ from urllib.parse import urljoin from django.conf import settings from django.core.exceptions import ValidationError from django.db import models -from django.db.models.query import QuerySet # from django.core.cache import cache -from django.utils.encoding import smart_str -from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from django.core.exceptions import FieldDoesNotExist @@ -44,7 +37,7 @@ from awx.main.models.notifications import ( NotificationTemplate, JobNotificationMixin, ) -from awx.main.utils import parse_yaml_or_json, getattr_dne, NullablePromptPseudoField, polymorphic, log_excess_runtime +from awx.main.utils import parse_yaml_or_json, getattr_dne, NullablePromptPseudoField, polymorphic from awx.main.fields import ImplicitRoleField, AskForField, JSONBlob, OrderedManyToManyField from awx.main.models.mixins import ( ResourceMixin, @@ -60,8 +53,6 @@ from awx.main.constants import JOB_VARIABLE_PREFIXES logger = logging.getLogger('awx.main.models.jobs') -analytics_logger = logging.getLogger('awx.analytics.job_events') -system_tracking_logger = logging.getLogger('awx.analytics.system_tracking') __all__ = ['JobTemplate', 'JobLaunchConfig', 'Job', 'JobHostSummary', 'SystemJobTemplate', 'SystemJob'] @@ -848,110 +839,6 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana def get_notification_friendly_name(self): return "Job" - def _get_inventory_hosts(self, only=('name', 'ansible_facts', 'ansible_facts_modified', 'modified', 'inventory_id'), **filters): - """Return value is an iterable for the relevant hosts for this job""" - if not self.inventory: - return [] - host_queryset = self.inventory.hosts.only(*only) - if filters: - host_queryset = host_queryset.filter(**filters) - host_queryset = self.inventory.get_sliced_hosts(host_queryset, self.job_slice_number, self.job_slice_count) - if isinstance(host_queryset, QuerySet): - return host_queryset.iterator() - return host_queryset - - @log_excess_runtime(logger, debug_cutoff=0.01, msg='Job {job_id} host facts prepared for {written_ct} hosts, took {delta:.3f} s', add_log_data=True) - def start_job_fact_cache(self, destination, log_data, timeout=None): - self.log_lifecycle("start_job_fact_cache") - log_data['job_id'] = self.id - log_data['written_ct'] = 0 - os.makedirs(destination, mode=0o700) - - if timeout is None: - timeout = settings.ANSIBLE_FACT_CACHE_TIMEOUT - if timeout > 0: - # exclude hosts with fact data older than `settings.ANSIBLE_FACT_CACHE_TIMEOUT seconds` - timeout = now() - datetime.timedelta(seconds=timeout) - hosts = self._get_inventory_hosts(ansible_facts_modified__gte=timeout) - else: - hosts = self._get_inventory_hosts() - - last_filepath_written = None - for host in hosts: - filepath = os.sep.join(map(str, [destination, host.name])) - if not os.path.realpath(filepath).startswith(destination): - system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name))) - continue - try: - with codecs.open(filepath, 'w', encoding='utf-8') as f: - os.chmod(f.name, 0o600) - json.dump(host.ansible_facts, f) - log_data['written_ct'] += 1 - last_filepath_written = filepath - except IOError: - system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name))) - continue - # make note of the time we wrote the last file so we can check if any file changed later - if last_filepath_written: - return os.path.getmtime(last_filepath_written) - return None - - @log_excess_runtime( - logger, - debug_cutoff=0.01, - msg='Job {job_id} host facts: updated {updated_ct}, cleared {cleared_ct}, unchanged {unmodified_ct}, took {delta:.3f} s', - add_log_data=True, - ) - def finish_job_fact_cache(self, destination, facts_write_time, log_data): - self.log_lifecycle("finish_job_fact_cache") - log_data['job_id'] = self.id - log_data['updated_ct'] = 0 - log_data['unmodified_ct'] = 0 - log_data['cleared_ct'] = 0 - hosts_to_update = [] - for host in self._get_inventory_hosts(): - filepath = os.sep.join(map(str, [destination, host.name])) - if not os.path.realpath(filepath).startswith(destination): - system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name))) - continue - if os.path.exists(filepath): - # If the file changed since we wrote the last facts file, pre-playbook run... - modified = os.path.getmtime(filepath) - if (not facts_write_time) or modified > facts_write_time: - with codecs.open(filepath, 'r', encoding='utf-8') as f: - try: - ansible_facts = json.load(f) - except ValueError: - continue - host.ansible_facts = ansible_facts - host.ansible_facts_modified = now() - hosts_to_update.append(host) - system_tracking_logger.info( - 'New fact for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)), - extra=dict( - inventory_id=host.inventory.id, - host_name=host.name, - ansible_facts=host.ansible_facts, - ansible_facts_modified=host.ansible_facts_modified.isoformat(), - job_id=self.id, - ), - ) - log_data['updated_ct'] += 1 - else: - log_data['unmodified_ct'] += 1 - else: - # if the file goes missing, ansible removed it (likely via clear_facts) - host.ansible_facts = {} - host.ansible_facts_modified = now() - hosts_to_update.append(host) - system_tracking_logger.info('Facts cleared for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name))) - log_data['cleared_ct'] += 1 - if len(hosts_to_update) > 100: - self.inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified']) - hosts_to_update = [] - if hosts_to_update: - self.inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified']) - class LaunchTimeConfigBase(BaseModel): """ diff --git a/awx/main/tasks/facts.py b/awx/main/tasks/facts.py new file mode 100644 index 0000000000..ba48bc2249 --- /dev/null +++ b/awx/main/tasks/facts.py @@ -0,0 +1,126 @@ +import codecs +import datetime +import os +import json +import logging + +# Django +from django.conf import settings +from django.db.models.query import QuerySet +from django.utils.encoding import smart_str +from django.utils.timezone import now + +# AWX +from awx.main.utils.common import log_excess_runtime + + +logger = logging.getLogger('awx.main.tasks.facts') +system_tracking_logger = logging.getLogger('awx.analytics.system_tracking') + + +def _get_inventory_hosts(inventory, slice_number, slice_count, only=('name', 'ansible_facts', 'ansible_facts_modified', 'modified', 'inventory_id'), **filters): + """Return value is an iterable for the relevant hosts for this job""" + if not inventory: + return [] + host_queryset = inventory.hosts.only(*only) + if filters: + host_queryset = host_queryset.filter(**filters) + host_queryset = inventory.get_sliced_hosts(host_queryset, slice_number, slice_count) + if isinstance(host_queryset, QuerySet): + return host_queryset.iterator() + return host_queryset + + +@log_excess_runtime(logger, debug_cutoff=0.01, msg='Inventory {inventory_id} host facts prepared for {written_ct} hosts, took {delta:.3f} s', add_log_data=True) +def start_fact_cache(inventory, destination, log_data, timeout=None, slice_number=0, slice_count=1): + log_data['inventory_id'] = inventory.id + log_data['written_ct'] = 0 + try: + os.makedirs(destination, mode=0o700) + except FileExistsError: + pass + + if timeout is None: + timeout = settings.ANSIBLE_FACT_CACHE_TIMEOUT + if timeout > 0: + # exclude hosts with fact data older than `settings.ANSIBLE_FACT_CACHE_TIMEOUT seconds` + timeout = now() - datetime.timedelta(seconds=timeout) + hosts = _get_inventory_hosts(inventory, slice_number, slice_count, ansible_facts_modified__gte=timeout) + else: + hosts = _get_inventory_hosts(inventory, slice_number, slice_count) + + last_filepath_written = None + for host in hosts: + filepath = os.sep.join(map(str, [destination, host.name])) + if not os.path.realpath(filepath).startswith(destination): + system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name))) + continue + try: + with codecs.open(filepath, 'w', encoding='utf-8') as f: + os.chmod(f.name, 0o600) + json.dump(host.ansible_facts, f) + log_data['written_ct'] += 1 + last_filepath_written = filepath + except IOError: + system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name))) + continue + # make note of the time we wrote the last file so we can check if any file changed later + if last_filepath_written: + return os.path.getmtime(last_filepath_written) + return None + + +@log_excess_runtime( + logger, + debug_cutoff=0.01, + msg='Inventory {inventory_id} host facts: updated {updated_ct}, cleared {cleared_ct}, unchanged {unmodified_ct}, took {delta:.3f} s', + add_log_data=True, +) +def finish_fact_cache(inventory, destination, facts_write_time, log_data, slice_number=0, slice_count=1, job_id=None): + log_data['inventory_id'] = inventory.id + log_data['updated_ct'] = 0 + log_data['unmodified_ct'] = 0 + log_data['cleared_ct'] = 0 + hosts_to_update = [] + for host in _get_inventory_hosts(inventory, slice_number, slice_count): + filepath = os.sep.join(map(str, [destination, host.name])) + if not os.path.realpath(filepath).startswith(destination): + system_tracking_logger.error('facts for host {} could not be cached'.format(smart_str(host.name))) + continue + if os.path.exists(filepath): + # If the file changed since we wrote the last facts file, pre-playbook run... + modified = os.path.getmtime(filepath) + if (not facts_write_time) or modified > facts_write_time: + with codecs.open(filepath, 'r', encoding='utf-8') as f: + try: + ansible_facts = json.load(f) + except ValueError: + continue + host.ansible_facts = ansible_facts + host.ansible_facts_modified = now() + hosts_to_update.append(host) + system_tracking_logger.info( + 'New fact for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)), + extra=dict( + inventory_id=host.inventory.id, + host_name=host.name, + ansible_facts=host.ansible_facts, + ansible_facts_modified=host.ansible_facts_modified.isoformat(), + job_id=job_id, + ), + ) + log_data['updated_ct'] += 1 + else: + log_data['unmodified_ct'] += 1 + else: + # if the file goes missing, ansible removed it (likely via clear_facts) + host.ansible_facts = {} + host.ansible_facts_modified = now() + hosts_to_update.append(host) + system_tracking_logger.info('Facts cleared for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name))) + log_data['cleared_ct'] += 1 + if len(hosts_to_update) > 100: + inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified']) + hosts_to_update = [] + if hosts_to_update: + inventory.hosts.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified']) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index ef73caacf5..1bb886a557 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -63,6 +63,7 @@ from awx.main.tasks.callback import ( ) from awx.main.tasks.signals import with_signal_handling, signal_callback from awx.main.tasks.receptor import AWXReceptorJob +from awx.main.tasks.facts import start_fact_cache, finish_fact_cache from awx.main.exceptions import AwxTaskError, PostRunError, ReceptorNodeNotFound from awx.main.utils.ansible import read_ansible_config from awx.main.utils.execution_environments import CONTAINER_ROOT, to_container_path @@ -455,6 +456,9 @@ class BaseTask(object): instance.ansible_version = ansible_version_info instance.save(update_fields=['ansible_version']) + def should_use_fact_cache(self): + return False + @with_path_cleanup @with_signal_handling def run(self, pk, **kwargs): @@ -553,7 +557,8 @@ class BaseTask(object): params['module'] = self.build_module_name(self.instance) params['module_args'] = self.build_module_args(self.instance) - if getattr(self.instance, 'use_fact_cache', False): + # TODO: refactor into a better BasTask method + if self.should_use_fact_cache(): # Enable Ansible fact cache. params['fact_cache_type'] = 'jsonfile' else: @@ -1008,6 +1013,9 @@ class RunJob(SourceControlMixin, BaseTask): return args + def should_use_fact_cache(self): + return self.instance.use_fact_cache + def build_playbook_path_relative_to_cwd(self, job, private_data_dir): return job.playbook @@ -1073,8 +1081,14 @@ class RunJob(SourceControlMixin, BaseTask): # Fetch "cached" fact data from prior runs and put on the disk # where ansible expects to find it - if job.use_fact_cache: - self.facts_write_time = self.instance.start_job_fact_cache(os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache')) + if self.should_use_fact_cache(): + job.log_lifecycle("start_job_fact_cache") + self.facts_write_time = start_fact_cache( + job.inventory, + os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'), + slice_number=job.job_slice_number, + slice_count=job.job_slice_count, + ) def build_project_dir(self, job, private_data_dir): self.sync_and_copy(job.project, private_data_dir, scm_branch=job.scm_branch) @@ -1088,10 +1102,15 @@ class RunJob(SourceControlMixin, BaseTask): # actual `run()` call; this _usually_ means something failed in # the pre_run_hook method return - if job.use_fact_cache: - job.finish_job_fact_cache( + if self.should_use_fact_cache(): + job.log_lifecycle("finish_job_fact_cache") + finish_fact_cache( + job.inventory, os.path.join(private_data_dir, 'artifacts', str(job.id), 'fact_cache'), - self.facts_write_time, + facts_write_time=self.facts_write_time, + slice_number=job.job_slice_number, + slice_count=job.job_slice_count, + job_id=job.id, ) def final_run_hook(self, job, status, private_data_dir): @@ -1529,11 +1548,14 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask): # special case for constructed inventories, we pass source inventories from database # these must come in order, and in order _before_ the constructed inventory itself if inventory_update.inventory.kind == 'constructed': + inventory_update.log_lifecycle("start_job_fact_cache") for input_inventory in inventory_update.inventory.input_inventories.all(): args.append('-i') script_params = dict(hostvars=True, towervars=True) source_inv_path = self.write_inventory_file(input_inventory, private_data_dir, f'hosts_{input_inventory.id}', script_params) args.append(to_container_path(source_inv_path, private_data_dir)) + # Include any facts from input inventories so they can be used in filters + start_fact_cache(input_inventory, os.path.join(private_data_dir, 'artifacts', str(inventory_update.id), 'fact_cache')) # Add arguments for the source inventory file/script/thing rel_path = self.pseudo_build_inventory(inventory_update, private_data_dir) @@ -1562,6 +1584,9 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask): return args + def should_use_fact_cache(self): + return bool(self.instance.source == 'constructed') + def build_inventory(self, inventory_update, private_data_dir): return None # what runner expects in order to not deal with inventory diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py index 2f030a57c3..d17a434fb1 100644 --- a/awx/main/tests/unit/models/test_jobs.py +++ b/awx/main/tests/unit/models/test_jobs.py @@ -10,10 +10,12 @@ from awx.main.models import ( Inventory, Host, ) +from awx.main.tasks.facts import start_fact_cache, finish_fact_cache @pytest.fixture -def hosts(inventory): +def hosts(): + inventory = Inventory(id=5) return [ Host(name='host1', ansible_facts={"a": 1, "b": 2}, inventory=inventory), Host(name='host2', ansible_facts={"a": 1, "b": 2}, inventory=inventory), @@ -23,20 +25,21 @@ def hosts(inventory): @pytest.fixture -def inventory(): +def inventory(mocker, hosts): + mocker.patch('awx.main.tasks.facts._get_inventory_hosts', return_value=hosts) return Inventory(id=5) @pytest.fixture -def job(mocker, hosts, inventory): +def job(mocker, inventory): j = Job(inventory=inventory, id=2) - j._get_inventory_hosts = mocker.Mock(return_value=hosts) + # j._get_inventory_hosts = mocker.Mock(return_value=hosts) return j def test_start_job_fact_cache(hosts, job, inventory, tmpdir): fact_cache = os.path.join(tmpdir, 'facts') - last_modified = job.start_job_fact_cache(fact_cache, timeout=0) + last_modified = start_fact_cache(inventory, fact_cache, timeout=0) for host in hosts: filepath = os.path.join(fact_cache, host.name) @@ -47,24 +50,25 @@ def test_start_job_fact_cache(hosts, job, inventory, tmpdir): def test_fact_cache_with_invalid_path_traversal(job, inventory, tmpdir, mocker): - job._get_inventory_hosts = mocker.Mock( + mocker.patch( + 'awx.main.tasks.facts._get_inventory_hosts', return_value=[ Host( name='../foo', ansible_facts={"a": 1, "b": 2}, ), - ] + ], ) fact_cache = os.path.join(tmpdir, 'facts') - job.start_job_fact_cache(fact_cache, timeout=0) + start_fact_cache(inventory, fact_cache, timeout=0) # a file called "foo" should _not_ be written outside the facts dir assert os.listdir(os.path.join(fact_cache, '..')) == ['facts'] def test_finish_job_fact_cache_with_existing_data(job, hosts, inventory, mocker, tmpdir): fact_cache = os.path.join(tmpdir, 'facts') - last_modified = job.start_job_fact_cache(fact_cache, timeout=0) + last_modified = start_fact_cache(inventory, fact_cache, timeout=0) bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update') @@ -80,7 +84,7 @@ def test_finish_job_fact_cache_with_existing_data(job, hosts, inventory, mocker, new_modification_time = time.time() + 3600 os.utime(filepath, (new_modification_time, new_modification_time)) - job.finish_job_fact_cache(fact_cache, last_modified) + finish_fact_cache(inventory, fact_cache, last_modified) for host in (hosts[0], hosts[2], hosts[3]): assert host.ansible_facts == {"a": 1, "b": 2} @@ -91,7 +95,7 @@ def test_finish_job_fact_cache_with_existing_data(job, hosts, inventory, mocker, def test_finish_job_fact_cache_with_bad_data(job, hosts, inventory, mocker, tmpdir): fact_cache = os.path.join(tmpdir, 'facts') - last_modified = job.start_job_fact_cache(fact_cache, timeout=0) + last_modified = start_fact_cache(inventory, fact_cache, timeout=0) bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update') @@ -103,19 +107,19 @@ def test_finish_job_fact_cache_with_bad_data(job, hosts, inventory, mocker, tmpd new_modification_time = time.time() + 3600 os.utime(filepath, (new_modification_time, new_modification_time)) - job.finish_job_fact_cache(fact_cache, last_modified) + finish_fact_cache(inventory, fact_cache, last_modified) bulk_update.assert_not_called() def test_finish_job_fact_cache_clear(job, hosts, inventory, mocker, tmpdir): fact_cache = os.path.join(tmpdir, 'facts') - last_modified = job.start_job_fact_cache(fact_cache, timeout=0) + last_modified = start_fact_cache(inventory, fact_cache, timeout=0) bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update') os.remove(os.path.join(fact_cache, hosts[1].name)) - job.finish_job_fact_cache(fact_cache, last_modified) + finish_fact_cache(inventory, fact_cache, last_modified) for host in (hosts[0], hosts[2], hosts[3]): assert host.ansible_facts == {"a": 1, "b": 2}