diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 5cc40b4e65..1086dd8784 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -2088,7 +2088,7 @@ class JobOptionsSerializer(LabelsListMixin, BaseSerializer): fields = ('*', 'job_type', 'inventory', 'project', 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'limit', 'verbosity', 'extra_vars', 'job_tags', 'force_handlers', - 'skip_tags', 'start_at_task', 'timeout') + 'skip_tags', 'start_at_task', 'timeout', 'store_facts',) def get_related(self, obj): res = super(JobOptionsSerializer, self).get_related(obj) diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index f7d843d5fa..1d03757eb2 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -14,8 +14,10 @@ from django.conf import settings from django.utils import timezone # AWX +from awx.main.models.jobs import Job from awx.main.models.fact import Fact from awx.main.models.inventory import Host +from awx.main.models.base import PERM_INVENTORY_SCAN logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') analytics_logger = logging.getLogger('awx.analytics.system_tracking') @@ -51,18 +53,55 @@ class FactBrokerWorker(ConsumerMixin): facts = self._extract_module_facts(module, facts) return (module, facts) + def _do_fact_scan_create_update(self, host_obj, module_name, facts, timestamp): + try: + fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=timestamp) + fact_obj.facts = facts + fact_obj.save() + logger.info('Updated existing fact <%s>' % (fact_obj.id)) + except Fact.DoesNotExist: + # Create new Fact entry + fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts) + logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) + analytics_logger.info('Received message with fact data', extra=dict( + module_name=module_name, facts_data=facts)) + return fact_obj + + def _do_gather_facts_update(self, host_obj, module_name, facts, timestamp): + host_obj.update_ansible_facts(module=module_name, facts=facts, timestamp=self.timestamp) + return host_obj + def process_fact_message(self, body, message): hostname = body['host'] inventory_id = body['inventory_id'] + job_id = body.get('job_id', -1) facts_data = body['facts'] date_key = body['date_key'] + is_fact_scan = False + job = None + + ''' + In Tower < 3.2 we neglected to ack the incoming message. + In Tower 3.2 we add the job_id parameter. + To account for this, we need to fail gracefully when the job is not + found. + ''' + + try: + job = Job.objects.get(id=job_id) + is_fact_scan = True if job.job_type == PERM_INVENTORY_SCAN else False + except Job.DoesNotExist: + logger.warn('Failed to find job %s while processing facts' % job_id) + message.ack() + return None + try: host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id) except Fact.DoesNotExist: logger.warn('Failed to intake fact. Host does not exist <%s, %s>' % (hostname, inventory_id)) message.ack() - return + return None except Fact.MultipleObjectsReturned: logger.warn('Database inconsistent. Multiple Hosts found for <%s, %s>.' % (hostname, inventory_id)) message.ack() @@ -75,20 +114,16 @@ class FactBrokerWorker(ConsumerMixin): (module_name, facts) = self.process_facts(facts_data) self.timestamp = datetime.fromtimestamp(date_key, timezone.utc) + ret = None # Update existing Fact entry - try: - fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=self.timestamp) - fact_obj.facts = facts - fact_obj.save() - logger.info('Updated existing fact <%s>' % (fact_obj.id)) - except Fact.DoesNotExist: - # Create new Fact entry - fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts) - logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) - analytics_logger.info('Received message with fact data', extra=dict( - module_name=module_name, facts_data=facts)) + if is_fact_scan is True: + ret = self._do_fact_scan_create_update(host_obj, module_name, facts, self.timestamp) + + if job.store_facts is True: + self._do_gather_facts_update(host_obj, module_name, facts, self.timestamp) + message.ack() - return fact_obj + return ret class Command(NoArgsCommand): diff --git a/awx/main/migrations/0038_v320_release.py b/awx/main/migrations/0038_v320_release.py index 60c38009e2..7debb690f5 100644 --- a/awx/main/migrations/0038_v320_release.py +++ b/awx/main/migrations/0038_v320_release.py @@ -48,10 +48,21 @@ class Migration(migrations.Migration): name='ansible_facts', field=awx.main.fields.JSONBField(default={}, help_text='Arbitrary JSON structure of most recent ansible_facts, per-host.', blank=True), ), + migrations.AddField( + model_name='job', + name='store_facts', + field=models.BooleanField(default=False, help_text='During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'), + ), + migrations.AddField( + model_name='jobtemplate', + name='store_facts', + field=models.BooleanField(default=False, help_text='During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'), + ), migrations.RunSQL([("CREATE INDEX host_ansible_facts_default_gin ON %s USING gin" "(ansible_facts jsonb_path_ops);", [AsIs(Host._meta.db_table)])], [('DROP INDEX host_ansible_facts_default_gin;', None)]), + # SCM file-based inventories migrations.AddField( model_name='inventorysource', diff --git a/awx/main/models/fact.py b/awx/main/models/fact.py index 28ba767e3e..984cd9e985 100644 --- a/awx/main/models/fact.py +++ b/awx/main/models/fact.py @@ -7,7 +7,6 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from awx.main.fields import JSONBField -from awx.main.models import Host __all__ = ('Fact',) @@ -65,14 +64,6 @@ class Fact(models.Model): @staticmethod def add_fact(host_id, module, timestamp, facts): - try: - host = Host.objects.get(id=host_id) - except Host.DoesNotExist as e: - logger.warn("Host with id %s not found while trying to update latest fact set." % host_id) - raise e - - host.update_ansible_facts(module=module, facts=facts, timestamp=timestamp) - fact_obj = Fact.objects.create(host_id=host_id, module=module, timestamp=timestamp, facts=facts) fact_obj.save() return fact_obj diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index e9464d5ac2..c1ebb69309 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -159,6 +159,10 @@ class JobOptions(BaseModel): blank=True, default=0, ) + store_facts = models.BooleanField( + default=False, + help_text=_('During a Job run, collect, associate, and persist the most recent per-Host Ansible facts in the ansible_facts namespace.'), + ) extra_vars_dict = VarsDictProperty('extra_vars', True) @@ -261,7 +265,8 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, Resour 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', - 'labels', 'survey_passwords', 'allow_simultaneous', 'timeout'] + 'labels', 'survey_passwords', 'allow_simultaneous', 'timeout', + 'store_facts',] def resource_validation_data(self): ''' diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 58bf153316..e45baef0cb 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -943,8 +943,8 @@ class RunJob(BaseTask): if authorize: env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password') - # Set environment variables related to scan jobs - if job.job_type == PERM_INVENTORY_SCAN: + # Set environment variables related to gathering facts from the cache + if job.job_type == PERM_INVENTORY_SCAN or job.store_facts is True: env['FACT_QUEUE'] = settings.FACT_QUEUE env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') diff --git a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py b/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py index 0a7e4f97c6..1d52c56cdb 100644 --- a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py +++ b/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py @@ -6,9 +6,6 @@ import pytest from datetime import datetime import json -# Mock -import mock - # Django from django.utils import timezone @@ -16,12 +13,38 @@ from django.utils import timezone from awx.main.management.commands.run_fact_cache_receiver import FactBrokerWorker from awx.main.models.fact import Fact from awx.main.models.inventory import Host +from awx.main.models.base import PERM_INVENTORY_SCAN + + +@pytest.fixture +def mock_message(mocker): + class Message(): + def ack(): + pass + msg = Message() + mocker.patch.object(msg, 'ack') + return msg + + +@pytest.fixture +def mock_job_generator(mocker): + def fn(store_facts=True, job_type=PERM_INVENTORY_SCAN): + class Job(): + def __init__(self): + self.store_facts = store_facts + self.job_type = job_type + job = Job() + mocker.patch('awx.main.models.Job.objects.get', return_value=job) + return job + return fn # TODO: Check that timestamp and other attributes are as expected -def check_process_fact_message_module(fact_returned, data, module_name): +def check_process_fact_message_module(fact_returned, data, module_name, message): date_key = data['date_key'] + message.ack.assert_called_with() + # Ensure 1, and only 1, fact created timestamp = datetime.fromtimestamp(date_key, timezone.utc) assert 1 == Fact.objects.all().count() @@ -42,28 +65,31 @@ def check_process_fact_message_module(fact_returned, data, module_name): @pytest.mark.django_db -def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save): +def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): receiver = FactBrokerWorker(None) - fact_returned = receiver.process_fact_message(fact_msg_ansible, mock.MagicMock()) - check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible') + mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) + fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message) + check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible', mock_message) @pytest.mark.django_db -def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save): +def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): receiver = FactBrokerWorker(None) - fact_returned = receiver.process_fact_message(fact_msg_packages, mock.MagicMock()) - check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages') + mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) + fact_returned = receiver.process_fact_message(fact_msg_packages, mock_message) + check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages', mock_message) @pytest.mark.django_db -def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save): +def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): receiver = FactBrokerWorker(None) - fact_returned = receiver.process_fact_message(fact_msg_services, mock.MagicMock()) - check_process_fact_message_module(fact_returned, fact_msg_services, 'services') + mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) + fact_returned = receiver.process_fact_message(fact_msg_services, mock_message) + check_process_fact_message_module(fact_returned, fact_msg_services, 'services', mock_message) @pytest.mark.django_db -def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save): +def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): ''' We pickypack our fact sending onto the Ansible fact interface. The interface is . Where facts is a json blob of all the facts. @@ -72,17 +98,32 @@ def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, m and just keep the newest version. ''' #epoch = timezone.now() + mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) epoch = datetime.fromtimestamp(fact_msg_ansible['date_key']) fact_scans(fact_scans=1, timestamp_epoch=epoch) key = 'ansible.overwrite' value = 'hello world' receiver = FactBrokerWorker(None) - receiver.process_fact_message(fact_msg_ansible, mock.MagicMock()) + receiver.process_fact_message(fact_msg_ansible, mock_message) fact_msg_ansible['facts'][key] = value - fact_returned = receiver.process_fact_message(fact_msg_ansible, mock.MagicMock()) + fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message) fact_obj = Fact.objects.get(id=fact_returned.id) assert key in fact_obj.facts assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug + + +@pytest.mark.django_db +def test_process_fact_store_facts(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): + receiver = FactBrokerWorker(None) + mock_job_generator(store_facts=True, job_type='run') + receiver.process_fact_message(fact_msg_services, mock_message) + + host_obj = Host.objects.get(name=fact_msg_services['host'], inventory__id=fact_msg_services['inventory_id']) + assert host_obj is not None + + assert host_obj.ansible_facts == fact_msg_services['facts'] + + diff --git a/awx/main/tests/unit/test_network_credential.py b/awx/main/tests/unit/test_network_credential.py index ec9c85c816..7435f86617 100644 --- a/awx/main/tests/unit/test_network_credential.py +++ b/awx/main/tests/unit/test_network_credential.py @@ -83,7 +83,8 @@ def mock_job(mocker): 'inventory': mocker.MagicMock(spec=Inventory, id=2), 'force_handlers': False, 'limit': None, 'verbosity': None, 'job_tags': None, 'skip_tags': None, 'start_at_task': None, 'pk': 1, 'launch_type': 'normal', 'job_template':None, - 'created_by': None, 'extra_vars_dict': None, 'project':None, 'playbook': 'test.yml'} + 'created_by': None, 'extra_vars_dict': None, 'project':None, 'playbook': 'test.yml', + 'store_facts': False,} mock_job = mocker.MagicMock(spec=Job, **mock_job_attrs) return mock_job diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 57d413f018..427cce8501 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -100,6 +100,7 @@ class CacheModule(BaseCacheModule): packet = { 'host': key, 'inventory_id': os.environ['INVENTORY_ID'], + 'job_id': os.getenv('JOB_ID', ''), 'facts': facts, 'date_key': self.date_key, }