add the ability to disable RabbitMQ queue durability

This commit is contained in:
Ryan Petrello 2019-05-22 14:18:05 -04:00
parent 8c56d1d3a7
commit 40b1e89b67
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
9 changed files with 66 additions and 5 deletions

View File

@ -570,6 +570,16 @@ register(
)
register(
'BROKER_DURABILITY',
field_class=fields.BooleanField,
label=_('Message Durability'),
help_text=_('When set (the default), underlying queues will be persisted to disk. Disable this to enable higher message bus throughput.'),
category=_('System'),
category_slug='system',
)
def logging_validate(serializer, attrs):
if not serializer.instance or \
not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or \

View File

@ -4,7 +4,8 @@ import socket
from django.conf import settings
from awx.main.dispatch import get_local_queuename
from kombu import Connection, Queue, Exchange, Producer, Consumer
from awx.main.dispatch.kombu import Connection
from kombu import Queue, Exchange, Producer, Consumer
logger = logging.getLogger('awx.main.dispatch')

View File

@ -0,0 +1,42 @@
from amqp.exceptions import PreconditionFailed
from django.conf import settings
from kombu.connection import Connection as KombuConnection
from kombu.transport import pyamqp
import logging
logger = logging.getLogger('awx.main.dispatch')
__all__ = ['Connection']
class Connection(KombuConnection):
def __init__(self, *args, **kwargs):
super(Connection, self).__init__(*args, **kwargs)
class _Channel(pyamqp.Channel):
def queue_declare(self, queue, *args, **kwargs):
kwargs['durable'] = settings.BROKER_DURABILITY
try:
return super(_Channel, self).queue_declare(queue, *args, **kwargs)
except PreconditionFailed as e:
if "inequivalent arg 'durable'" in getattr(e, 'reply_text', None):
logger.error(
'queue {} durability is not {}, deleting and recreating'.format(
queue,
kwargs['durable']
)
)
self.queue_delete(queue)
return super(_Channel, self).queue_declare(queue, *args, **kwargs)
class _Connection(pyamqp.Connection):
Channel = _Channel
class _Transport(pyamqp.Transport):
Connection = _Connection
self.transport_cls = _Transport

View File

@ -4,7 +4,9 @@ import sys
from uuid import uuid4
from django.conf import settings
from kombu import Connection, Exchange, Producer
from kombu import Exchange, Producer
from awx.main.dispatch.kombu import Connection
logger = logging.getLogger('awx.main.dispatch')

View File

@ -3,8 +3,9 @@
from django.conf import settings
from django.core.management.base import BaseCommand
from kombu import Connection, Exchange, Queue
from kombu import Exchange, Queue
from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker

View File

@ -8,10 +8,11 @@ from django.conf import settings
from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand
from django.db import connection as django_connection, connections
from kombu import Connection, Exchange, Queue
from kombu import Exchange, Queue
from awx.main.dispatch import get_local_queuename, reaper
from awx.main.dispatch.control import Control
from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumer, TaskWorker

View File

@ -10,7 +10,8 @@ import os
from django.conf import settings
# Kombu
from kombu import Connection, Exchange, Producer
from awx.main.dispatch.kombu import Connection
from kombu import Exchange, Producer
from kombu.serialization import registry
__all__ = ['CallbackQueueDispatcher']

View File

@ -436,6 +436,7 @@ DEVSERVER_DEFAULT_PORT = '8013'
# Set default ports for live server tests.
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
BROKER_DURABILITY = True
BROKER_POOL_LIMIT = None
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_DEFAULT_QUEUE = 'awx_private_queue'

View File

@ -28,6 +28,8 @@ To accomplish this, AWX makes use of a "Task Queue" abstraction. Task Queues ar
AWX communicates with these worker processes to mediate between clients and workers. This is done via distributed RabbitMQ queues and the already-acknowledged local queue that the Dispatcher is working through. Simply put: to initiate a task, the client (generally, Python code in the AWX API) publishes a message to a queue, and RabbitMQ then delivers that message to one or more workers.
By default, when AWX creates queues in RabbitMQ, it creates them as *durable* queues in RabbitMQ (which allows for message persistence at the cost of lower performance). For increased message throughput (at the risk of message loss on server restarts), set BROKER_DURABILITY=False, and AWX will create _transient_ queues instead.
Clustered AWX installations consist of multiple workers spread across every
node, giving way to high availability and horizontal scaling.