remove kombu

This commit is contained in:
chris meyers
2020-02-10 15:12:11 -05:00
committed by Ryan Petrello
parent 403e9bbfb5
commit be58906aed
7 changed files with 25 additions and 39 deletions

View File

@@ -5,7 +5,6 @@ import random
import json import json
from awx.main.dispatch import get_local_queuename from awx.main.dispatch import get_local_queuename
from kombu import Queue, Exchange, Producer, Consumer, Connection
from . import pg_bus_conn from . import pg_bus_conn

View File

@@ -13,8 +13,6 @@ from uuid import UUID
from queue import Empty as QueueEmpty from queue import Empty as QueueEmpty
from django import db from django import db
from kombu import Producer
from kombu.mixins import ConsumerMixin
from django.conf import settings from django.conf import settings
from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch.pool import WorkerPool
@@ -44,11 +42,10 @@ class WorkerSignalHandler:
class AWXConsumerBase(object): class AWXConsumerBase(object):
def __init__(self, name, connection, worker, queues=[], pool=None): def __init__(self, name, worker, queues=[], pool=None):
self.should_stop = False self.should_stop = False
self.name = name self.name = name
self.connection = connection
self.total_messages = 0 self.total_messages = 0
self.queues = queues self.queues = queues
self.worker = worker self.worker = worker
@@ -110,7 +107,7 @@ class AWXConsumerBase(object):
# Child should implement other things here # Child should implement other things here
def stop(self, signum, frame): def stop(self, signum, frame):
self.should_stop = True # this makes the kombu mixin stop consuming self.should_stop = True
logger.warn('received {}, stopping'.format(signame(signum))) logger.warn('received {}, stopping'.format(signame(signum)))
self.worker.on_stop() self.worker.on_stop()
raise SystemExit() raise SystemExit()
@@ -142,6 +139,8 @@ class AWXConsumerPG(AWXConsumerBase):
conn.listen(queue) conn.listen(queue)
for e in conn.events(): for e in conn.events():
self.process_task(json.loads(e.payload)) self.process_task(json.loads(e.payload))
if self.should_stop:
return
except psycopg2.InterfaceError: except psycopg2.InterfaceError:
logger.warn("Stale Postgres message bus connection, reconnecting") logger.warn("Stale Postgres message bus connection, reconnecting")
continue continue

View File

@@ -3,7 +3,6 @@
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from kombu import Exchange, Queue, Connection
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
@@ -17,17 +16,15 @@ class Command(BaseCommand):
help = 'Launch the job callback receiver' help = 'Launch the job callback receiver'
def handle(self, *arg, **options): def handle(self, *arg, **options):
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: consumer = None
consumer = None try:
try: consumer = AWXConsumerRedis(
consumer = AWXConsumerRedis( 'callback_receiver',
'callback_receiver', CallbackBrokerWorker(),
conn, queues=[getattr(settings, 'CALLBACK_QUEUE', '')],
CallbackBrokerWorker(), )
queues=[getattr(settings, 'CALLBACK_QUEUE', '')], consumer.run()
) except KeyboardInterrupt:
consumer.run() print('Terminating Callback Receiver')
except KeyboardInterrupt: if consumer:
print('Terminating Callback Receiver') consumer.stop()
if consumer:
consumer.stop()

View File

@@ -5,8 +5,7 @@ import logging
from django.conf import settings from django.conf import settings
from django.core.cache import cache as django_cache from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.db import connection as django_connection from django.db import connection as django_connection, connections
from kombu import Exchange, Queue
from awx.main.utils.handlers import AWXProxyHandler from awx.main.utils.handlers import AWXProxyHandler
from awx.main.dispatch import get_local_queuename, reaper from awx.main.dispatch import get_local_queuename, reaper
@@ -66,7 +65,6 @@ class Command(BaseCommand):
queues = ['tower_broadcast_all', get_local_queuename()] queues = ['tower_broadcast_all', get_local_queuename()]
consumer = AWXConsumerPG( consumer = AWXConsumerPG(
'dispatcher', 'dispatcher',
None,
TaskWorker(), TaskWorker(),
queues, queues,
AutoscalePool(min_workers=4) AutoscalePool(min_workers=4)

View File

@@ -10,8 +10,6 @@ import redis
# Django # Django
from django.conf import settings from django.conf import settings
# Kombu
from kombu import Exchange, Producer, Connection
__all__ = ['CallbackQueueDispatcher'] __all__ = ['CallbackQueueDispatcher']

View File

@@ -8,7 +8,7 @@ import os
import time import time
from django.conf import settings from django.conf import settings
from kombu.utils.url import parse_url import redis
# Mock # Mock
from unittest import mock from unittest import mock
@@ -390,11 +390,10 @@ def test_saml_x509cert_validation(patch, get, admin, headers):
@pytest.mark.django_db @pytest.mark.django_db
def test_broker_url_with_special_characters(): def test_broker_url_with_special_characters():
settings.BROKER_URL = 'amqp://guest:a@ns:ibl3#@rabbitmq:5672//' settings.BROKER_URL = 'redis://unused:a@ns:ibl3#@redis-fancy:5672/?db=mydb'
url = parse_url(settings.BROKER_URL) cli = redis.from_url(settings.BROKER_URL)
assert url['transport'] == 'amqp' assert cli.host == 'redis-fancy'
assert url['hostname'] == 'rabbitmq' assert cli.port == 5672
assert url['port'] == 5672 # Note: There are no usernames in redis
assert url['userid'] == 'guest' assert cli.password == 'a@ns:ibl3#'
assert url['password'] == 'a@ns:ibl3#' assert cli.db == 'mydb'
assert url['virtual_host'] == '/'

View File

@@ -1112,10 +1112,6 @@ LOGGING = {
'handlers': ['console', 'file', 'tower_warnings'], 'handlers': ['console', 'file', 'tower_warnings'],
'level': 'WARNING', 'level': 'WARNING',
}, },
'kombu': {
'handlers': ['console', 'file', 'tower_warnings'],
'level': 'WARNING',
},
'rest_framework.request': { 'rest_framework.request': {
'handlers': ['console', 'file', 'tower_warnings'], 'handlers': ['console', 'file', 'tower_warnings'],
'level': 'WARNING', 'level': 'WARNING',