From ec2e537f630a3224c0370ba753a527e68aafe297 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 19 Jun 2017 15:33:24 -0400 Subject: [PATCH] remove fache cache receiver --- Makefile | 7 - .../commands/run_fact_cache_receiver.py | 151 ------------------ .../commands/test_run_fact_cache_receiver.py | 129 --------------- awx/main/tests/unit/utils/test_reload.py | 2 +- awx/settings/defaults.py | 3 - awx/settings/development.py | 1 - awx/settings/production.py | 3 +- tools/docker-compose/Procfile | 1 - tools/docker-compose/supervisor.conf | 10 +- tools/docker-isolated/Procfile | 1 - 10 files changed, 3 insertions(+), 305 deletions(-) delete mode 100644 awx/main/management/commands/run_fact_cache_receiver.py delete mode 100644 awx/main/tests/functional/commands/test_run_fact_cache_receiver.py diff --git a/Makefile b/Makefile index c346268767..d4e9cb7d41 100644 --- a/Makefile +++ b/Makefile @@ -390,7 +390,6 @@ server_noattach: tmux new-window 'exec make receiver' tmux select-window -t tower:2 tmux rename-window 'Extra Services' - tmux split-window -h 'exec make factcacher' tmux select-window -t tower:0 server: server_noattach @@ -472,12 +471,6 @@ socketservice: fi; \ $(PYTHON) manage.py run_socketio_service -factcacher: - @if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/tower/bin/activate; \ - fi; \ - $(PYTHON) manage.py run_fact_cache_receiver - nginx: nginx -g "daemon off;" diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py deleted file mode 100644 index 4d00f5c157..0000000000 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import logging -from datetime import datetime - -from kombu import Connection, Exchange, Queue -from kombu.mixins import ConsumerMixin - -# Django -from django.core.management.base import NoArgsCommand -from django.conf import settings -from django.utils import timezone -from django.db import IntegrityError - -# AWX -from awx.main.models.jobs import Job -from awx.main.models.fact import Fact -from awx.main.models.inventory import Host -from awx.main.models.base import PERM_INVENTORY_SCAN - -logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') -analytics_logger = logging.getLogger('awx.analytics.system_tracking') - - -class FactBrokerWorker(ConsumerMixin): - - def __init__(self, connection): - self.connection = connection - self.timestamp = None - - def get_consumers(self, Consumer, channel): - return [Consumer(queues=[Queue(settings.FACT_QUEUE, - Exchange(settings.FACT_QUEUE, type='direct'), - routing_key=settings.FACT_QUEUE)], - accept=['json'], - callbacks=[self.process_fact_message])] - - def _determine_module(self, facts): - # Symantically determine the module type - if len(facts) == 1: - return facts.iterkeys().next() - return 'ansible' - - def _extract_module_facts(self, module, facts): - if module in facts: - f = facts[module] - return f - return facts - - def process_facts(self, facts): - module = self._determine_module(facts) - facts = self._extract_module_facts(module, facts) - return (module, facts) - - def _do_fact_scan_create_update(self, host_obj, module_name, facts, timestamp): - try: - fact_obj = Fact.objects.get(host__id=host_obj.id, module=module_name, timestamp=timestamp) - fact_obj.facts = facts - fact_obj.save() - logger.info('Updated existing fact <%s>' % (fact_obj.id)) - except Fact.DoesNotExist: - # Create new Fact entry - fact_obj = Fact.add_fact(host_obj.id, module_name, self.timestamp, facts) - logger.info('Created new fact <%s, %s>' % (fact_obj.id, module_name)) - analytics_logger.info('Received message with fact data', extra=dict( - module_name=module_name, facts_data=facts)) - return fact_obj - - def _do_gather_facts_update(self, host_obj, module_name, facts, timestamp): - host_obj.update_ansible_facts(module=module_name, facts=facts, timestamp=self.timestamp) - return host_obj - - def process_fact_message(self, body, message): - hostname = body['host'] - inventory_id = body['inventory_id'] - job_id = body.get('job_id', -1) - facts_data = body['facts'] - date_key = body['date_key'] - - is_fact_scan = False - job = None - - ''' - In Tower < 3.2 we neglected to ack the incoming message. - In Tower 3.2 we add the job_id parameter. - To account for this, we need to fail gracefully when the job is not - found. - ''' - - try: - job = Job.objects.get(id=job_id) - is_fact_scan = True if job.job_type == PERM_INVENTORY_SCAN else False - except Job.DoesNotExist: - logger.warn('Failed to find job %s while processing facts' % job_id) - message.ack() - return None - - try: - host_obj = Host.objects.get(name=hostname, inventory__id=inventory_id) - except Fact.DoesNotExist: - logger.warn('Failed to intake fact. Host does not exist <%s, %s>' % (hostname, inventory_id)) - message.ack() - return None - except Fact.MultipleObjectsReturned: - logger.warn('Database inconsistent. Multiple Hosts found for <%s, %s>.' % (hostname, inventory_id)) - message.ack() - return None - except Exception as e: - logger.error("Exception communicating with Fact Cache Database: %s" % str(e)) - message.ack() - return None - - (module_name, facts) = self.process_facts(facts_data) - self.timestamp = datetime.fromtimestamp(date_key, timezone.utc) - - ret = None - # Update existing Fact entry - if is_fact_scan is True: - ret = self._do_fact_scan_create_update(host_obj, module_name, facts, self.timestamp) - - if job.store_facts is True: - if module_name == 'insights': - system_id = facts.get('system_id', None) - host_obj.insights_system_id = system_id - try: - host_obj.save() - except IntegrityError: - host_obj.insights_system_id = None - logger.warn('Inisghts system_id %s not assigned to host %s because it already exists.' % (system_id, host_obj.pk)) - self._do_gather_facts_update(host_obj, module_name, facts, self.timestamp) - - message.ack() - return ret - - -class Command(NoArgsCommand): - ''' - Save Fact Event packets to the database as emitted from a Tower Scan Job - ''' - help = 'Launch the Fact Cache Receiver' - - def handle_noargs(self, **options): - with Connection(settings.BROKER_URL) as conn: - try: - worker = FactBrokerWorker(conn) - worker.run() - except KeyboardInterrupt: - pass - diff --git a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py b/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py deleted file mode 100644 index 1d52c56cdb..0000000000 --- a/awx/main/tests/functional/commands/test_run_fact_cache_receiver.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import pytest -from datetime import datetime -import json - -# Django -from django.utils import timezone - -# AWX -from awx.main.management.commands.run_fact_cache_receiver import FactBrokerWorker -from awx.main.models.fact import Fact -from awx.main.models.inventory import Host -from awx.main.models.base import PERM_INVENTORY_SCAN - - -@pytest.fixture -def mock_message(mocker): - class Message(): - def ack(): - pass - msg = Message() - mocker.patch.object(msg, 'ack') - return msg - - -@pytest.fixture -def mock_job_generator(mocker): - def fn(store_facts=True, job_type=PERM_INVENTORY_SCAN): - class Job(): - def __init__(self): - self.store_facts = store_facts - self.job_type = job_type - job = Job() - mocker.patch('awx.main.models.Job.objects.get', return_value=job) - return job - return fn - - -# TODO: Check that timestamp and other attributes are as expected -def check_process_fact_message_module(fact_returned, data, module_name, message): - date_key = data['date_key'] - - message.ack.assert_called_with() - - # Ensure 1, and only 1, fact created - timestamp = datetime.fromtimestamp(date_key, timezone.utc) - assert 1 == Fact.objects.all().count() - - host_obj = Host.objects.get(name=data['host'], inventory__id=data['inventory_id']) - assert host_obj is not None - fact_known = Fact.get_host_fact(host_obj.id, module_name, timestamp) - assert fact_known is not None - assert fact_known == fact_returned - - assert host_obj == fact_returned.host - if module_name == 'ansible': - assert data['facts'] == fact_returned.facts - else: - assert data['facts'][module_name] == fact_returned.facts - assert timestamp == fact_returned.timestamp - assert module_name == fact_returned.module - - -@pytest.mark.django_db -def test_process_fact_message_ansible(fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message) - check_process_fact_message_module(fact_returned, fact_msg_ansible, 'ansible', mock_message) - - -@pytest.mark.django_db -def test_process_fact_message_packages(fact_msg_packages, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - fact_returned = receiver.process_fact_message(fact_msg_packages, mock_message) - check_process_fact_message_module(fact_returned, fact_msg_packages, 'packages', mock_message) - - -@pytest.mark.django_db -def test_process_fact_message_services(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - fact_returned = receiver.process_fact_message(fact_msg_services, mock_message) - check_process_fact_message_module(fact_returned, fact_msg_services, 'services', mock_message) - - -@pytest.mark.django_db -def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - ''' - We pickypack our fact sending onto the Ansible fact interface. - The interface is . Where facts is a json blob of all the facts. - This makes it hard to decipher what facts are new/changed. - Because of this, we handle the same fact module data being sent multiple times - and just keep the newest version. - ''' - #epoch = timezone.now() - mock_job_generator(store_facts=False, job_type=PERM_INVENTORY_SCAN) - epoch = datetime.fromtimestamp(fact_msg_ansible['date_key']) - fact_scans(fact_scans=1, timestamp_epoch=epoch) - key = 'ansible.overwrite' - value = 'hello world' - - receiver = FactBrokerWorker(None) - receiver.process_fact_message(fact_msg_ansible, mock_message) - - fact_msg_ansible['facts'][key] = value - fact_returned = receiver.process_fact_message(fact_msg_ansible, mock_message) - - fact_obj = Fact.objects.get(id=fact_returned.id) - assert key in fact_obj.facts - assert fact_msg_ansible['facts'] == (json.loads(fact_obj.facts) if isinstance(fact_obj.facts, unicode) else fact_obj.facts) # TODO: Just make response.data['facts'] when we're only dealing with postgres, or if jsonfields ever fixes this bug - - -@pytest.mark.django_db -def test_process_fact_store_facts(fact_msg_services, monkeypatch_jsonbfield_get_db_prep_save, mock_message, mock_job_generator): - receiver = FactBrokerWorker(None) - mock_job_generator(store_facts=True, job_type='run') - receiver.process_fact_message(fact_msg_services, mock_message) - - host_obj = Host.objects.get(name=fact_msg_services['host'], inventory__id=fact_msg_services['inventory_id']) - assert host_obj is not None - - assert host_obj.ansible_facts == fact_msg_services['facts'] - - diff --git a/awx/main/tests/unit/utils/test_reload.py b/awx/main/tests/unit/utils/test_reload.py index 26f2d42ece..6d09d6105b 100644 --- a/awx/main/tests/unit/utils/test_reload.py +++ b/awx/main/tests/unit/utils/test_reload.py @@ -10,7 +10,7 @@ def test_produce_supervisor_command(mocker): with mocker.patch.object(reload.subprocess, 'Popen', Popen_mock): reload._supervisor_service_command(['beat', 'callback', 'fact'], "restart") reload.subprocess.Popen.assert_called_once_with( - ['supervisorctl', 'restart', 'tower-processes:receiver', 'tower-processes:factcacher'], + ['supervisorctl', 'restart', 'tower-processes:receiver',], stderr=-1, stdin=-1, stdout=-1) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 537f5f07b6..22d768836c 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1053,9 +1053,6 @@ LOGGING = { 'awx.main.consumers': { 'handlers': ['null'] }, - 'awx.main.commands.run_fact_cache_receiver': { - 'handlers': ['fact_receiver'], - }, 'awx.main.access': { 'handlers': ['null'], 'propagate': False, diff --git a/awx/settings/development.py b/awx/settings/development.py index 0edf353f6a..3bb087e0c9 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -129,7 +129,6 @@ SERVICE_NAME_DICT = { "runworker": "channels", "uwsgi": "uwsgi", "daphne": "daphne", - "fact": "factcacher", "nginx": "nginx"} # Used for sending commands in automatic restart UWSGI_FIFO_LOCATION = '/awxfifo' diff --git a/awx/settings/production.py b/awx/settings/production.py index ad668784d2..cd0e09885c 100644 --- a/awx/settings/production.py +++ b/awx/settings/production.py @@ -66,8 +66,7 @@ SERVICE_NAME_DICT = { "callback": "awx-callback-receiver", "channels": "awx-channels-worker", "uwsgi": "awx-uwsgi", - "daphne": "awx-daphne", - "fact": "awx-fact-cache-receiver"} + "daphne": "awx-daphne"} # Used for sending commands in automatic restart UWSGI_FIFO_LOCATION = '/var/lib/awx/awxfifo' diff --git a/tools/docker-compose/Procfile b/tools/docker-compose/Procfile index b30dfcad2b..83488b891d 100644 --- a/tools/docker-compose/Procfile +++ b/tools/docker-compose/Procfile @@ -3,6 +3,5 @@ runworker: make runworker daphne: make daphne celeryd: make celeryd receiver: make receiver -factcacher: make factcacher flower: make flower uwsgi: make uwsgi diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index b46d05bf4c..f3ea26f95f 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -43,14 +43,6 @@ redirect_stderr=true stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 -[program:factcacher] -command = python manage.py run_fact_cache_receiver -autostart = true -autorestart = true -redirect_stderr=true -stdout_logfile=/dev/fd/1 -stdout_logfile_maxbytes=0 - [program:nginx] command = nginx -g "daemon off;" autostart = true @@ -68,7 +60,7 @@ stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 [group:tower-processes] -programs=celeryd,receiver,runworker,uwsgi,daphne,factcacher,nginx,flower +programs=celeryd,receiver,runworker,uwsgi,daphne,nginx,flower priority=5 [unix_http_server] diff --git a/tools/docker-isolated/Procfile b/tools/docker-isolated/Procfile index 35d7db873c..42cf77b7d5 100644 --- a/tools/docker-isolated/Procfile +++ b/tools/docker-isolated/Procfile @@ -3,6 +3,5 @@ runworker: make runworker daphne: make daphne celeryd: make celeryd EXTRA_GROUP_QUEUES=thepentagon receiver: make receiver -factcacher: make factcacher flower: make flower uwsgi: make uwsgi