From be58906aed1d50eb246d5fd89c7439b8388d446f Mon Sep 17 00:00:00 2001 From: chris meyers Date: Mon, 10 Feb 2020 15:12:11 -0500 Subject: [PATCH] remove kombu --- awx/main/dispatch/control.py | 1 - awx/main/dispatch/worker/base.py | 9 +++---- .../commands/run_callback_receiver.py | 27 +++++++++---------- .../management/commands/run_dispatcher.py | 4 +-- awx/main/queue.py | 2 -- .../tests/functional/api/test_settings.py | 17 ++++++------ awx/settings/defaults.py | 4 --- 7 files changed, 25 insertions(+), 39 deletions(-) diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index a584c9dfe5..684cdae806 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -5,7 +5,6 @@ import random import json from awx.main.dispatch import get_local_queuename -from kombu import Queue, Exchange, Producer, Consumer, Connection from . import pg_bus_conn diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index b90b41ce1c..674892a7b5 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -13,8 +13,6 @@ from uuid import UUID from queue import Empty as QueueEmpty from django import db -from kombu import Producer -from kombu.mixins import ConsumerMixin from django.conf import settings from awx.main.dispatch.pool import WorkerPool @@ -44,11 +42,10 @@ class WorkerSignalHandler: class AWXConsumerBase(object): - def __init__(self, name, connection, worker, queues=[], pool=None): + def __init__(self, name, worker, queues=[], pool=None): self.should_stop = False self.name = name - self.connection = connection self.total_messages = 0 self.queues = queues self.worker = worker @@ -110,7 +107,7 @@ class AWXConsumerBase(object): # Child should implement other things here def stop(self, signum, frame): - self.should_stop = True # this makes the kombu mixin stop consuming + self.should_stop = True logger.warn('received {}, stopping'.format(signame(signum))) self.worker.on_stop() raise SystemExit() @@ -142,6 +139,8 @@ class AWXConsumerPG(AWXConsumerBase): conn.listen(queue) for e in conn.events(): self.process_task(json.loads(e.payload)) + if self.should_stop: + return except psycopg2.InterfaceError: logger.warn("Stale Postgres message bus connection, reconnecting") continue diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index f1c1aed0c3..7e28330067 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,7 +3,6 @@ from django.conf import settings from django.core.management.base import BaseCommand -from kombu import Exchange, Queue, Connection from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker @@ -17,17 +16,15 @@ class Command(BaseCommand): help = 'Launch the job callback receiver' def handle(self, *arg, **options): - with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: - consumer = None - try: - consumer = AWXConsumerRedis( - 'callback_receiver', - conn, - CallbackBrokerWorker(), - queues=[getattr(settings, 'CALLBACK_QUEUE', '')], - ) - consumer.run() - except KeyboardInterrupt: - print('Terminating Callback Receiver') - if consumer: - consumer.stop() + consumer = None + try: + consumer = AWXConsumerRedis( + 'callback_receiver', + CallbackBrokerWorker(), + queues=[getattr(settings, 'CALLBACK_QUEUE', '')], + ) + consumer.run() + except KeyboardInterrupt: + print('Terminating Callback Receiver') + if consumer: + consumer.stop() diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index b678797026..d12b23f275 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -5,8 +5,7 @@ import logging from django.conf import settings from django.core.cache import cache as django_cache from django.core.management.base import BaseCommand -from django.db import connection as django_connection -from kombu import Exchange, Queue +from django.db import connection as django_connection, connections from awx.main.utils.handlers import AWXProxyHandler from awx.main.dispatch import get_local_queuename, reaper @@ -66,7 +65,6 @@ class Command(BaseCommand): queues = ['tower_broadcast_all', get_local_queuename()] consumer = AWXConsumerPG( 'dispatcher', - None, TaskWorker(), queues, AutoscalePool(min_workers=4) diff --git a/awx/main/queue.py b/awx/main/queue.py index 8d4fffdf87..38bea6fc2c 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -10,8 +10,6 @@ import redis # Django from django.conf import settings -# Kombu -from kombu import Exchange, Producer, Connection __all__ = ['CallbackQueueDispatcher'] diff --git a/awx/main/tests/functional/api/test_settings.py b/awx/main/tests/functional/api/test_settings.py index a88aa8c20b..723b942c94 100644 --- a/awx/main/tests/functional/api/test_settings.py +++ b/awx/main/tests/functional/api/test_settings.py @@ -8,7 +8,7 @@ import os import time from django.conf import settings -from kombu.utils.url import parse_url +import redis # Mock from unittest import mock @@ -390,11 +390,10 @@ def test_saml_x509cert_validation(patch, get, admin, headers): @pytest.mark.django_db def test_broker_url_with_special_characters(): - settings.BROKER_URL = 'amqp://guest:a@ns:ibl3#@rabbitmq:5672//' - url = parse_url(settings.BROKER_URL) - assert url['transport'] == 'amqp' - assert url['hostname'] == 'rabbitmq' - assert url['port'] == 5672 - assert url['userid'] == 'guest' - assert url['password'] == 'a@ns:ibl3#' - assert url['virtual_host'] == '/' + settings.BROKER_URL = 'redis://unused:a@ns:ibl3#@redis-fancy:5672/?db=mydb' + cli = redis.from_url(settings.BROKER_URL) + assert cli.host == 'redis-fancy' + assert cli.port == 5672 + # Note: There are no usernames in redis + assert cli.password == 'a@ns:ibl3#' + assert cli.db == 'mydb' diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 63a5b74be6..798f8ba131 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1112,10 +1112,6 @@ LOGGING = { 'handlers': ['console', 'file', 'tower_warnings'], 'level': 'WARNING', }, - 'kombu': { - 'handlers': ['console', 'file', 'tower_warnings'], - 'level': 'WARNING', - }, 'rest_framework.request': { 'handlers': ['console', 'file', 'tower_warnings'], 'level': 'WARNING',