mirror of
https://github.com/ansible/awx.git
synced 2026-05-20 07:17:40 -02:30
remove fache cache receiver
This commit is contained in:
7
Makefile
7
Makefile
@@ -390,7 +390,6 @@ server_noattach:
|
|||||||
tmux new-window 'exec make receiver'
|
tmux new-window 'exec make receiver'
|
||||||
tmux select-window -t tower:2
|
tmux select-window -t tower:2
|
||||||
tmux rename-window 'Extra Services'
|
tmux rename-window 'Extra Services'
|
||||||
tmux split-window -h 'exec make factcacher'
|
|
||||||
tmux select-window -t tower:0
|
tmux select-window -t tower:0
|
||||||
|
|
||||||
server: server_noattach
|
server: server_noattach
|
||||||
@@ -472,12 +471,6 @@ socketservice:
|
|||||||
fi; \
|
fi; \
|
||||||
$(PYTHON) manage.py run_socketio_service
|
$(PYTHON) manage.py run_socketio_service
|
||||||
|
|
||||||
factcacher:
|
|
||||||
@if [ "$(VENV_BASE)" ]; then \
|
|
||||||
. $(VENV_BASE)/tower/bin/activate; \
|
|
||||||
fi; \
|
|
||||||
$(PYTHON) manage.py run_fact_cache_receiver
|
|
||||||
|
|
||||||
nginx:
|
nginx:
|
||||||
nginx -g "daemon off;"
|
nginx -g "daemon off;"
|
||||||
|
|
||||||
|
|||||||
@@ -1,151 +0,0 @@
|
|||||||
# Copyright (c) 2015 Ansible, Inc.
|
|
||||||
# All Rights Reserved
|
|
||||||
|
|
||||||
# Python
|
|
||||||
import logging
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from kombu import Connection, Exchange, Queue
|
|
||||||
from kombu.mixins import ConsumerMixin
|
|
||||||
|
|
||||||
# Django
|
|
||||||
from django.core.management.base import NoArgsCommand
|
|
||||||
from django.conf import settings
|
|
||||||
from django.utils import timezone
|
|
||||||
from django.db import IntegrityError
|
|
||||||
|
|
||||||
# AWX
|
|
||||||
from awx.main.models.jobs import Job
|
|
||||||
from awx.main.models.fact import Fact
|
|
||||||
from awx.main.models.inventory import Host
|
|
||||||
from awx.main.models.base import PERM_INVENTORY_SCAN
|
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver')
|
|
||||||
analytics_logger = logging.getLogger('awx.analytics.system_tracking')
|
|
||||||
|
|
||||||
|
|
||||||
class FactBrokerWorker(ConsumerMixin):
|
|
||||||
|
|
||||||
def __init__(self, connection):
|
|
||||||
self.connection = connection
|
|
||||||
self.timestamp = None
|
|
||||||
|
|
||||||
def get_consumers(self, Consumer, channel):
|
|
||||||
return [Consumer(queues=[Queue(settings.FACT_QUEUE,
|
|
||||||
Exchange(settings.FACT_QUEUE, type='direct'),
|
|
||||||
routing_key=settings.FACT_QUEUE)],
|
|
||||||
accept=['json'],
|
|
||||||
callbacks=[self.process_fact_message])]
|
|
||||||
|
|
||||||
def _determine_module(self, facts):
|
|
||||||
# Symantically determine the module type
|
|
||||||
if len(facts) == 1:
|
|
||||||
return facts.iterkeys().next()
|
|
||||||
return 'ansible'
|
|
||||||
|
|
||||||
def _extract_module_facts(self, module, facts):
|
|
||||||
if module in facts:
|
|
||||||
f = facts[module]
|
|
||||||
return f
|
|
||||||
return facts
|
|
||||||
|
|
||||||
def process_facts(self, facts):
|
|
||||||
module = self._determine_module(facts)
|
|
||||||
facts = self._extract_module_facts(module, facts)
|
|
||||||
return (module, facts)
|
|
||||||
|
|
||||||
def _do_fact_scan_create_update(self, host_obj, module_name, facts, timestamp):
|
|
||||||
try:
|
|
||||||
fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=timestamp)
|
|
||||||
fact_obj.facts = facts
|
|
||||||
fact_obj.save()
|
|
||||||
logger.info('Updated existing fact <%s>' % (fact_obj.id))
|
|
||||||
except Fact.DoesNotExist:
|
|
||||||
# Create new Fact entry
|
|
||||||
fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts)
|
|
||||||
logger.info('Created new fact <fact_id, module> <%s, %s>' % (fact_obj.id, module_name))
|
|
||||||
analytics_logger.info('Received message with fact data', extra=dict(
|
|
||||||
module_name=module_name, facts_data=facts))
|
|
||||||
return fact_obj
|
|
||||||
|
|
||||||
def _do_gather_facts_update(self, host_obj, module_name, facts, timestamp):
|
|
||||||
host_obj.update_ansible_facts(module=module_name, facts=facts, timestamp=self.timestamp)
|
|
||||||
return host_obj
|
|
||||||
|
|
||||||
def process_fact_message(self, body, message):
|
|
||||||
hostname = body['host']
|
|
||||||
inventory_id = body['inventory_id']
|
|
||||||
job_id = body.get('job_id', -1)
|
|
||||||
facts_data = body['facts']
|
|
||||||
date_key = body['date_key']
|
|
||||||
|
|
||||||
is_fact_scan = False
|
|
||||||
job = None
|
|
||||||
|
|
||||||
'''
|
|
||||||
In Tower < 3.2 we neglected to ack the incoming message.
|
|
||||||
In Tower 3.2 we add the job_id parameter.
|
|
||||||
To account for this, we need to fail gracefully when the job is not
|
|
||||||
found.
|
|
||||||
'''
|
|
||||||
|
|
||||||
try:
|
|
||||||
job = Job.objects.get(id=job_id)
|
|
||||||
is_fact_scan = True if job.job_type == PERM_INVENTORY_SCAN else False
|
|
||||||
except Job.DoesNotExist:
|
|
||||||
logger.warn('Failed to find job %s while processing facts' % job_id)
|
|
||||||
message.ack()
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id)
|
|
||||||
except Fact.DoesNotExist:
|
|
||||||
logger.warn('Failed to intake fact. Host does not exist <hostname, inventory_id> <%s, %s>' % (hostname, inventory_id))
|
|
||||||
message.ack()
|
|
||||||
return None
|
|
||||||
except Fact.MultipleObjectsReturned:
|
|
||||||
logger.warn('Database inconsistent. Multiple Hosts found for <hostname, inventory_id> <%s, %s>.' % (hostname, inventory_id))
|
|
||||||
message.ack()
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Exception communicating with Fact Cache Database: %s" % str(e))
|
|
||||||
message.ack()
|
|
||||||
return None
|
|
||||||
|
|
||||||
(module_name, facts) = self.process_facts(facts_data)
|
|
||||||
self.timestamp = datetime.fromtimestamp(date_key, timezone.utc)
|
|
||||||
|
|
||||||
ret = None
|
|
||||||
# Update existing Fact entry
|
|
||||||
if is_fact_scan is True:
|
|
||||||
ret = self._do_fact_scan_create_update(host_obj, module_name, facts, self.timestamp)
|
|
||||||
|
|
||||||
if job.store_facts is True:
|
|
||||||
if module_name == 'insights':
|
|
||||||
system_id = facts.get('system_id', None)
|
|
||||||
host_obj.insights_system_id = system_id
|
|
||||||
try:
|
|
||||||
host_obj.save()
|
|
||||||
except IntegrityError:
|
|
||||||
host_obj.insights_system_id = None
|
|
||||||
logger.warn('Inisghts system_id %s not assigned to host %s because it already exists.' % (system_id, host_obj.pk))
|
|
||||||
self._do_gather_facts_update(host_obj, module_name, facts, self.timestamp)
|
|
||||||
|
|
||||||
message.ack()
|
|
||||||
return ret
|
|
||||||
|
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
|
||||||
'''
|
|
||||||
Save Fact Event packets to the database as emitted from a Tower Scan Job
|
|
||||||
'''
|
|
||||||
help = 'Launch the Fact Cache Receiver'
|
|
||||||
|
|
||||||
def handle_noargs(self, **options):
|
|
||||||
with Connection(settings.BROKER_URL) as conn:
|
|
||||||
try:
|
|
||||||
worker = FactBrokerWorker(conn)
|
|
||||||
worker.run()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@@ -1,129 +0,0 @@
|
|||||||
# Copyright (c) 2015 Ansible, Inc.
|
|
||||||
# All Rights Reserved
|
|
||||||
|
|
||||||
# Python
|
|
||||||
import pytest
|
|
||||||
from datetime import datetime
|
|
||||||
import json
|
|
||||||
|
|
||||||
# Django
|
|
||||||
from django.utils import timezone
|
|
||||||
|
|
||||||
# AWX
|
|
||||||
from awx.main.management.commands.run_fact_cache_receiver import FactBrokerWorker
|
|
||||||
from awx.main.models.fact import Fact
|
|
||||||
from awx.main.models.inventory import Host
|
|
||||||
from awx.main.models.base import PERM_INVENTORY_SCAN
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_message(mocker):
|
|
||||||
class Message():
|
|
||||||
def ack():
|
|
||||||
pass
|
|
||||||
msg = Message()
|
|
||||||
mocker.patch.object(msg, 'ack')
|
|
||||||
return msg
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_job_generator(mocker):
|
|
||||||
def fn(store_facts=True, job_type=PERM_INVENTORY_SCAN):
|
|
||||||
class Job():
|
|
||||||
def __init__(self):
|
|
||||||
self.store_facts = store_facts
|
|
||||||
self.job_type = job_type
|
|
||||||
job = Job()
|
|
||||||
mocker.patch('awx.main.models.Job.objects.get', return_value=job)
|
|
||||||
return job
|
|
||||||
return fn
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: Check that timestamp and other attributes are as expected
|
|
||||||
def check_process_fact_message_module(fact_returned, data, module_name, message):
|
|
||||||
date_key = data['date_key']
|
|
||||||
|
|
||||||
message.ack.assert_called_with()
|
|
||||||
|
|
||||||
# Ensure 1, and only 1, fact created
|
|
||||||
timestamp = datetime.fromtimestamp(date_key, timezone.utc)
|
|
||||||
assert 1 == Fact.objects.all().count()
|
|
||||||
|
|
||||||
host_obj = Host.objects.get(name=data['host'], inventory__id=data['inventory_id'])
|
|
||||||
assert host_obj is not None
|
|
||||||
fact_known = Fact.get_host_fact(host_obj.id, module_name, timestamp)
|
|
||||||
assert fact_known is not None
|
|
||||||
assert fact_known == fact_returned
|
|
||||||
|
|
||||||
assert host_obj == fact_returned.host
|
|
||||||
if module_name == 'ansible':
|
|
||||||
assert data['facts'] == fact_returned.facts
|
|
||||||
else:
|
|
||||||
assert data['facts'][module_name] == fact_returned.facts
|
|
||||||
assert timestamp == fact_returned.timestamp
|
|
||||||
assert module_name == fact_returned.module
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
|
|
||||||
receiver = FactBrokerWorker(None)
|
|
||||||
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
|
|
||||||
fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message)
|
|
||||||
check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible', mock_message)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
|
|
||||||
receiver = FactBrokerWorker(None)
|
|
||||||
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
|
|
||||||
fact_returned = receiver.process_fact_message(fact_msg_packages, mock_message)
|
|
||||||
check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages', mock_message)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
|
|
||||||
receiver = FactBrokerWorker(None)
|
|
||||||
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
|
|
||||||
fact_returned = receiver.process_fact_message(fact_msg_services, mock_message)
|
|
||||||
check_process_fact_message_module(fact_returned, fact_msg_services, 'services', mock_message)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
|
|
||||||
'''
|
|
||||||
We pickypack our fact sending onto the Ansible fact interface.
|
|
||||||
The interface is <hostname, facts>. Where facts is a json blob of all the facts.
|
|
||||||
This makes it hard to decipher what facts are new/changed.
|
|
||||||
Because of this, we handle the same fact module data being sent multiple times
|
|
||||||
and just keep the newest version.
|
|
||||||
'''
|
|
||||||
#epoch = timezone.now()
|
|
||||||
mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN)
|
|
||||||
epoch = datetime.fromtimestamp(fact_msg_ansible['date_key'])
|
|
||||||
fact_scans(fact_scans=1, timestamp_epoch=epoch)
|
|
||||||
key = 'ansible.overwrite'
|
|
||||||
value = 'hello world'
|
|
||||||
|
|
||||||
receiver = FactBrokerWorker(None)
|
|
||||||
receiver.process_fact_message(fact_msg_ansible, mock_message)
|
|
||||||
|
|
||||||
fact_msg_ansible['facts'][key] = value
|
|
||||||
fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message)
|
|
||||||
|
|
||||||
fact_obj = Fact.objects.get(id=fact_returned.id)
|
|
||||||
assert key in fact_obj.facts
|
|
||||||
assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_process_fact_store_facts(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator):
|
|
||||||
receiver = FactBrokerWorker(None)
|
|
||||||
mock_job_generator(store_facts=True, job_type='run')
|
|
||||||
receiver.process_fact_message(fact_msg_services, mock_message)
|
|
||||||
|
|
||||||
host_obj = Host.objects.get(name=fact_msg_services['host'], inventory__id=fact_msg_services['inventory_id'])
|
|
||||||
assert host_obj is not None
|
|
||||||
|
|
||||||
assert host_obj.ansible_facts == fact_msg_services['facts']
|
|
||||||
|
|
||||||
|
|
||||||
@@ -10,7 +10,7 @@ def test_produce_supervisor_command(mocker):
|
|||||||
with mocker.patch.object(reload.subprocess, 'Popen', Popen_mock):
|
with mocker.patch.object(reload.subprocess, 'Popen', Popen_mock):
|
||||||
reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart")
|
reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart")
|
||||||
reload.subprocess.Popen.assert_called_once_with(
|
reload.subprocess.Popen.assert_called_once_with(
|
||||||
['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher'],
|
['supervisorctl', 'restart', 'tower-processes:receiver',],
|
||||||
stderr=-1, stdin=-1, stdout=-1)
|
stderr=-1, stdin=-1, stdout=-1)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1053,9 +1053,6 @@ LOGGING = {
|
|||||||
'awx.main.consumers': {
|
'awx.main.consumers': {
|
||||||
'handlers': ['null']
|
'handlers': ['null']
|
||||||
},
|
},
|
||||||
'awx.main.commands.run_fact_cache_receiver': {
|
|
||||||
'handlers': ['fact_receiver'],
|
|
||||||
},
|
|
||||||
'awx.main.access': {
|
'awx.main.access': {
|
||||||
'handlers': ['null'],
|
'handlers': ['null'],
|
||||||
'propagate': False,
|
'propagate': False,
|
||||||
|
|||||||
@@ -129,7 +129,6 @@ SERVICE_NAME_DICT = {
|
|||||||
"runworker": "channels",
|
"runworker": "channels",
|
||||||
"uwsgi": "uwsgi",
|
"uwsgi": "uwsgi",
|
||||||
"daphne": "daphne",
|
"daphne": "daphne",
|
||||||
"fact": "factcacher",
|
|
||||||
"nginx": "nginx"}
|
"nginx": "nginx"}
|
||||||
# Used for sending commands in automatic restart
|
# Used for sending commands in automatic restart
|
||||||
UWSGI_FIFO_LOCATION = '/awxfifo'
|
UWSGI_FIFO_LOCATION = '/awxfifo'
|
||||||
|
|||||||
@@ -66,8 +66,7 @@ SERVICE_NAME_DICT = {
|
|||||||
"callback": "awx-callback-receiver",
|
"callback": "awx-callback-receiver",
|
||||||
"channels": "awx-channels-worker",
|
"channels": "awx-channels-worker",
|
||||||
"uwsgi": "awx-uwsgi",
|
"uwsgi": "awx-uwsgi",
|
||||||
"daphne": "awx-daphne",
|
"daphne": "awx-daphne"}
|
||||||
"fact": "awx-fact-cache-receiver"}
|
|
||||||
# Used for sending commands in automatic restart
|
# Used for sending commands in automatic restart
|
||||||
UWSGI_FIFO_LOCATION = '/var/lib/awx/awxfifo'
|
UWSGI_FIFO_LOCATION = '/var/lib/awx/awxfifo'
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,5 @@ runworker: make runworker
|
|||||||
daphne: make daphne
|
daphne: make daphne
|
||||||
celeryd: make celeryd
|
celeryd: make celeryd
|
||||||
receiver: make receiver
|
receiver: make receiver
|
||||||
factcacher: make factcacher
|
|
||||||
flower: make flower
|
flower: make flower
|
||||||
uwsgi: make uwsgi
|
uwsgi: make uwsgi
|
||||||
|
|||||||
@@ -43,14 +43,6 @@ redirect_stderr=true
|
|||||||
stdout_logfile=/dev/fd/1
|
stdout_logfile=/dev/fd/1
|
||||||
stdout_logfile_maxbytes=0
|
stdout_logfile_maxbytes=0
|
||||||
|
|
||||||
[program:factcacher]
|
|
||||||
command = python manage.py run_fact_cache_receiver
|
|
||||||
autostart = true
|
|
||||||
autorestart = true
|
|
||||||
redirect_stderr=true
|
|
||||||
stdout_logfile=/dev/fd/1
|
|
||||||
stdout_logfile_maxbytes=0
|
|
||||||
|
|
||||||
[program:nginx]
|
[program:nginx]
|
||||||
command = nginx -g "daemon off;"
|
command = nginx -g "daemon off;"
|
||||||
autostart = true
|
autostart = true
|
||||||
@@ -68,7 +60,7 @@ stdout_logfile=/dev/fd/1
|
|||||||
stdout_logfile_maxbytes=0
|
stdout_logfile_maxbytes=0
|
||||||
|
|
||||||
[group:tower-processes]
|
[group:tower-processes]
|
||||||
programs=celeryd,receiver,runworker,uwsgi,daphne,factcacher,nginx,flower
|
programs=celeryd,receiver,runworker,uwsgi,daphne,nginx,flower
|
||||||
priority=5
|
priority=5
|
||||||
|
|
||||||
[unix_http_server]
|
[unix_http_server]
|
||||||
|
|||||||
@@ -3,6 +3,5 @@ runworker: make runworker
|
|||||||
daphne: make daphne
|
daphne: make daphne
|
||||||
celeryd: make celeryd EXTRA_GROUP_QUEUES=thepentagon
|
celeryd: make celeryd EXTRA_GROUP_QUEUES=thepentagon
|
||||||
receiver: make receiver
|
receiver: make receiver
|
||||||
factcacher: make factcacher
|
|
||||||
flower: make flower
|
flower: make flower
|
||||||
uwsgi: make uwsgi
|
uwsgi: make uwsgi
|
||||||
|
|||||||
Reference in New Issue
Block a user