diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index f7d843d5fa..d4dd479090 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -14,8 +14,10 @@ from django.conf import settings from django.utils import timezone # AWX +from awx.main.models.jobs import Job from awx.main.models.fact import Fact from awx.main.models.inventory import Host +from awx.main.models.base import PERM_INVENTORY_SCAN logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') analytics_logger = logging.getLogger('awx.analytics.system_tracking') @@ -54,15 +56,34 @@ class FactBrokerWorker(ConsumerMixin): def process_fact_message(self, body, message): hostname = body['host'] inventory_id = body['inventory_id'] + job_id = body.get('job_id', -1) facts_data = body['facts'] date_key = body['date_key'] + is_fact_scan = False + job = None + + ''' + In Tower < 3.2 we neglected to ack the incoming message. + In Tower 3.2 we add the job_id parameter. + To account for this, we need to fail gracefully when the job is not + found. + ''' + + try: + job = Job.objects.get(id=job_id) + is_fact_scan = True if job.job_type == PERM_INVENTORY_SCAN else False + except Job.DoesNotExist: + logger.warn('Failed to find job %s while processing facts' % job_id) + message.ack() + return None + try: host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id) except Fact.DoesNotExist: logger.warn('Failed to intake fact. Host does not exist <%s, %s>' % (hostname, inventory_id)) message.ack() - return + return None except Fact.MultipleObjectsReturned: logger.warn('Database inconsistent. Multiple Hosts found for <%s, %s>.' % (hostname, inventory_id)) message.ack() @@ -75,20 +96,26 @@ class FactBrokerWorker(ConsumerMixin): (module_name, facts) = self.process_facts(facts_data) self.timestamp = datetime.fromtimestamp(date_key, timezone.utc) + ret = None # Update existing Fact entry - try: - fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=self.timestamp) - fact_obj.facts = facts - fact_obj.save() - logger.info('Updated existing fact <%s>' % (fact_obj.id)) - except Fact.DoesNotExist: - # Create new Fact entry - fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts) - logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) - analytics_logger.info('Received message with fact data', extra=dict( - module_name=module_name, facts_data=facts)) + if is_fact_scan is True: + try: + fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=self.timestamp) + fact_obj.facts = facts + fact_obj.save() + logger.info('Updated existing fact <%s>' % (fact_obj.id)) + except Fact.DoesNotExist: + # Create new Fact entry + fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts) + logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) + analytics_logger.info('Received message with fact data', extra=dict( + module_name=module_name, facts_data=facts)) + ret = fact_obj + else: + host_obj.update_ansible_facts(module=module_name, facts=facts, timestamp=self.timestamp) + message.ack() - return fact_obj + return ret class Command(NoArgsCommand): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 58bf153316..4173b0134b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -943,13 +943,12 @@ class RunJob(BaseTask): if authorize: env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password') - # Set environment variables related to scan jobs - if job.job_type == PERM_INVENTORY_SCAN: - env['FACT_QUEUE'] = settings.FACT_QUEUE - env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') - env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') - env['ANSIBLE_CACHE_PLUGIN'] = "tower" - env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT) + # Set environment variables related to gathering facts from the cache + env['FACT_QUEUE'] = settings.FACT_QUEUE + env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library') + env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching') + env['ANSIBLE_CACHE_PLUGIN'] = "tower" + env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT) return env def build_args(self, job, **kwargs): diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 57d413f018..427cce8501 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -100,6 +100,7 @@ class CacheModule(BaseCacheModule): packet = { 'host': key, 'inventory_id': os.environ['INVENTORY_ID'], + 'job_id': os.getenv('JOB_ID', ''), 'facts': facts, 'date_key': self.date_key, }