mirror of
https://github.com/ansible/awx.git
synced 2026-05-11 11:27:36 -02:30
always gather facts
* bolt on the fact scan gather to all jobs
This commit is contained in:
@@ -14,8 +14,10 @@ from django.conf import settings
|
|||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
|
from awx.main.models.jobs import Job
|
||||||
from awx.main.models.fact import Fact
|
from awx.main.models.fact import Fact
|
||||||
from awx.main.models.inventory import Host
|
from awx.main.models.inventory import Host
|
||||||
|
from awx.main.models.base import PERM_INVENTORY_SCAN
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver')
|
logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver')
|
||||||
analytics_logger = logging.getLogger('awx.analytics.system_tracking')
|
analytics_logger = logging.getLogger('awx.analytics.system_tracking')
|
||||||
@@ -54,15 +56,34 @@ class FactBrokerWorker(ConsumerMixin):
|
|||||||
def process_fact_message(self, body, message):
|
def process_fact_message(self, body, message):
|
||||||
hostname = body['host']
|
hostname = body['host']
|
||||||
inventory_id = body['inventory_id']
|
inventory_id = body['inventory_id']
|
||||||
|
job_id = body.get('job_id', -1)
|
||||||
facts_data = body['facts']
|
facts_data = body['facts']
|
||||||
date_key = body['date_key']
|
date_key = body['date_key']
|
||||||
|
|
||||||
|
is_fact_scan = False
|
||||||
|
job = None
|
||||||
|
|
||||||
|
'''
|
||||||
|
In Tower < 3.2 we neglected to ack the incoming message.
|
||||||
|
In Tower 3.2 we add the job_id parameter.
|
||||||
|
To account for this, we need to fail gracefully when the job is not
|
||||||
|
found.
|
||||||
|
'''
|
||||||
|
|
||||||
|
try:
|
||||||
|
job = Job.objects.get(id=job_id)
|
||||||
|
is_fact_scan = True if job.job_type == PERM_INVENTORY_SCAN else False
|
||||||
|
except Job.DoesNotExist:
|
||||||
|
logger.warn('Failed to find job %s while processing facts' % job_id)
|
||||||
|
message.ack()
|
||||||
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
|
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
|
||||||
except Fact.DoesNotExist:
|
except Fact.DoesNotExist:
|
||||||
logger.warn('Failed to intake fact. Host does not exist <hostname, inventory_id> <%s, %s>' % (hostname, inventory_id))
|
logger.warn('Failed to intake fact. Host does not exist <hostname, inventory_id> <%s, %s>' % (hostname, inventory_id))
|
||||||
message.ack()
|
message.ack()
|
||||||
return
|
return None
|
||||||
except Fact.MultipleObjectsReturned:
|
except Fact.MultipleObjectsReturned:
|
||||||
logger.warn('Database inconsistent. Multiple Hosts found for <hostname, inventory_id> <%s, %s>.' % (hostname, inventory_id))
|
logger.warn('Database inconsistent. Multiple Hosts found for <hostname, inventory_id> <%s, %s>.' % (hostname, inventory_id))
|
||||||
message.ack()
|
message.ack()
|
||||||
@@ -75,20 +96,26 @@ class FactBrokerWorker(ConsumerMixin):
|
|||||||
(module_name, facts) = self.process_facts(facts_data)
|
(module_name, facts) = self.process_facts(facts_data)
|
||||||
self.timestamp = datetime.fromtimestamp(date_key, timezone.utc)
|
self.timestamp = datetime.fromtimestamp(date_key, timezone.utc)
|
||||||
|
|
||||||
|
ret = None
|
||||||
# Update existing Fact entry
|
# Update existing Fact entry
|
||||||
try:
|
if is_fact_scan is True:
|
||||||
fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=self.timestamp)
|
try:
|
||||||
fact_obj.facts = facts
|
fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=self.timestamp)
|
||||||
fact_obj.save()
|
fact_obj.facts = facts
|
||||||
logger.info('Updated existing fact <%s>' % (fact_obj.id))
|
fact_obj.save()
|
||||||
except Fact.DoesNotExist:
|
logger.info('Updated existing fact <%s>' % (fact_obj.id))
|
||||||
# Create new Fact entry
|
except Fact.DoesNotExist:
|
||||||
fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts)
|
# Create new Fact entry
|
||||||
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
|
fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts)
|
||||||
analytics_logger.info('Received message with fact data', extra=dict(
|
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
|
||||||
module_name=module_name, facts_data=facts))
|
analytics_logger.info('Received message with fact data', extra=dict(
|
||||||
|
module_name=module_name, facts_data=facts))
|
||||||
|
ret = fact_obj
|
||||||
|
else:
|
||||||
|
host_obj.update_ansible_facts(module=module_name, facts=facts, timestamp=self.timestamp)
|
||||||
|
|
||||||
message.ack()
|
message.ack()
|
||||||
return fact_obj
|
return ret
|
||||||
|
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
class Command(NoArgsCommand):
|
||||||
|
|||||||
@@ -943,13 +943,12 @@ class RunJob(BaseTask):
|
|||||||
if authorize:
|
if authorize:
|
||||||
env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password')
|
env['ANSIBLE_NET_AUTH_PASS'] = decrypt_field(network_cred, 'authorize_password')
|
||||||
|
|
||||||
# Set environment variables related to scan jobs
|
# Set environment variables related to gathering facts from the cache
|
||||||
if job.job_type == PERM_INVENTORY_SCAN:
|
env['FACT_QUEUE'] = settings.FACT_QUEUE
|
||||||
env['FACT_QUEUE'] = settings.FACT_QUEUE
|
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
|
||||||
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
|
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
|
||||||
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
|
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
|
||||||
env['ANSIBLE_CACHE_PLUGIN'] = "tower"
|
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT)
|
||||||
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = "tcp://127.0.0.1:%s" % str(settings.FACT_CACHE_PORT)
|
|
||||||
return env
|
return env
|
||||||
|
|
||||||
def build_args(self, job, **kwargs):
|
def build_args(self, job, **kwargs):
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ class CacheModule(BaseCacheModule):
|
|||||||
packet = {
|
packet = {
|
||||||
'host': key,
|
'host': key,
|
||||||
'inventory_id': os.environ['INVENTORY_ID'],
|
'inventory_id': os.environ['INVENTORY_ID'],
|
||||||
|
'job_id': os.getenv('JOB_ID', ''),
|
||||||
'facts': facts,
|
'facts': facts,
|
||||||
'date_key': self.date_key,
|
'date_key': self.date_key,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user