mirror of
https://github.com/ansible/awx.git
synced 2026-02-12 23:24:48 -03:30
Merge pull request #3947 from ryanpetrello/transient-queues
RFC: add the ability to disable RabbitMQ queue durability Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
@@ -570,6 +570,16 @@ register(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
register(
|
||||||
|
'BROKER_DURABILITY',
|
||||||
|
field_class=fields.BooleanField,
|
||||||
|
label=_('Message Durability'),
|
||||||
|
help_text=_('When set (the default), underlying queues will be persisted to disk. Disable this to enable higher message bus throughput.'),
|
||||||
|
category=_('System'),
|
||||||
|
category_slug='system',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def logging_validate(serializer, attrs):
|
def logging_validate(serializer, attrs):
|
||||||
if not serializer.instance or \
|
if not serializer.instance or \
|
||||||
not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or \
|
not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or \
|
||||||
|
|||||||
@@ -4,7 +4,8 @@ import socket
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
from awx.main.dispatch import get_local_queuename
|
from awx.main.dispatch import get_local_queuename
|
||||||
from kombu import Connection, Queue, Exchange, Producer, Consumer
|
from awx.main.dispatch.kombu import Connection
|
||||||
|
from kombu import Queue, Exchange, Producer, Consumer
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.dispatch')
|
logger = logging.getLogger('awx.main.dispatch')
|
||||||
|
|
||||||
|
|||||||
42
awx/main/dispatch/kombu.py
Normal file
42
awx/main/dispatch/kombu.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
from amqp.exceptions import PreconditionFailed
|
||||||
|
from django.conf import settings
|
||||||
|
from kombu.connection import Connection as KombuConnection
|
||||||
|
from kombu.transport import pyamqp
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger('awx.main.dispatch')
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ['Connection']
|
||||||
|
|
||||||
|
|
||||||
|
class Connection(KombuConnection):
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(Connection, self).__init__(*args, **kwargs)
|
||||||
|
class _Channel(pyamqp.Channel):
|
||||||
|
|
||||||
|
def queue_declare(self, queue, *args, **kwargs):
|
||||||
|
kwargs['durable'] = settings.BROKER_DURABILITY
|
||||||
|
try:
|
||||||
|
return super(_Channel, self).queue_declare(queue, *args, **kwargs)
|
||||||
|
except PreconditionFailed as e:
|
||||||
|
if "inequivalent arg 'durable'" in getattr(e, 'reply_text', None):
|
||||||
|
logger.error(
|
||||||
|
'queue {} durability is not {}, deleting and recreating'.format(
|
||||||
|
|
||||||
|
queue,
|
||||||
|
kwargs['durable']
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.queue_delete(queue)
|
||||||
|
return super(_Channel, self).queue_declare(queue, *args, **kwargs)
|
||||||
|
|
||||||
|
class _Connection(pyamqp.Connection):
|
||||||
|
Channel = _Channel
|
||||||
|
|
||||||
|
class _Transport(pyamqp.Transport):
|
||||||
|
Connection = _Connection
|
||||||
|
|
||||||
|
self.transport_cls = _Transport
|
||||||
@@ -4,7 +4,9 @@ import sys
|
|||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from kombu import Connection, Exchange, Producer
|
from kombu import Exchange, Producer
|
||||||
|
|
||||||
|
from awx.main.dispatch.kombu import Connection
|
||||||
|
|
||||||
logger = logging.getLogger('awx.main.dispatch')
|
logger = logging.getLogger('awx.main.dispatch')
|
||||||
|
|
||||||
|
|||||||
@@ -3,8 +3,9 @@
|
|||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from kombu import Connection, Exchange, Queue
|
from kombu import Exchange, Queue
|
||||||
|
|
||||||
|
from awx.main.dispatch.kombu import Connection
|
||||||
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
|
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -8,10 +8,11 @@ from django.conf import settings
|
|||||||
from django.core.cache import cache as django_cache
|
from django.core.cache import cache as django_cache
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from django.db import connection as django_connection, connections
|
from django.db import connection as django_connection, connections
|
||||||
from kombu import Connection, Exchange, Queue
|
from kombu import Exchange, Queue
|
||||||
|
|
||||||
from awx.main.dispatch import get_local_queuename, reaper
|
from awx.main.dispatch import get_local_queuename, reaper
|
||||||
from awx.main.dispatch.control import Control
|
from awx.main.dispatch.control import Control
|
||||||
|
from awx.main.dispatch.kombu import Connection
|
||||||
from awx.main.dispatch.pool import AutoscalePool
|
from awx.main.dispatch.pool import AutoscalePool
|
||||||
from awx.main.dispatch.worker import AWXConsumer, TaskWorker
|
from awx.main.dispatch.worker import AWXConsumer, TaskWorker
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,8 @@ import os
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
# Kombu
|
# Kombu
|
||||||
from kombu import Connection, Exchange, Producer
|
from awx.main.dispatch.kombu import Connection
|
||||||
|
from kombu import Exchange, Producer
|
||||||
from kombu.serialization import registry
|
from kombu.serialization import registry
|
||||||
|
|
||||||
__all__ = ['CallbackQueueDispatcher']
|
__all__ = ['CallbackQueueDispatcher']
|
||||||
|
|||||||
@@ -436,6 +436,7 @@ DEVSERVER_DEFAULT_PORT = '8013'
|
|||||||
# Set default ports for live server tests.
|
# Set default ports for live server tests.
|
||||||
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
|
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
|
||||||
|
|
||||||
|
BROKER_DURABILITY = True
|
||||||
BROKER_POOL_LIMIT = None
|
BROKER_POOL_LIMIT = None
|
||||||
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
|
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
|
||||||
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
|
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ To accomplish this, AWX makes use of a "Task Queue" abstraction. Task Queues ar
|
|||||||
|
|
||||||
AWX communicates with these worker processes to mediate between clients and workers. This is done via distributed RabbitMQ queues and the already-acknowledged local queue that the Dispatcher is working through. Simply put: to initiate a task, the client (generally, Python code in the AWX API) publishes a message to a queue, and RabbitMQ then delivers that message to one or more workers.
|
AWX communicates with these worker processes to mediate between clients and workers. This is done via distributed RabbitMQ queues and the already-acknowledged local queue that the Dispatcher is working through. Simply put: to initiate a task, the client (generally, Python code in the AWX API) publishes a message to a queue, and RabbitMQ then delivers that message to one or more workers.
|
||||||
|
|
||||||
|
By default, when AWX creates queues in RabbitMQ, it creates them as *durable* queues in RabbitMQ (which allows for message persistence at the cost of lower performance). For increased message throughput (at the risk of message loss on server restarts), set BROKER_DURABILITY=False, and AWX will create _transient_ queues instead.
|
||||||
|
|
||||||
Clustered AWX installations consist of multiple workers spread across every
|
Clustered AWX installations consist of multiple workers spread across every
|
||||||
node, giving way to high availability and horizontal scaling.
|
node, giving way to high availability and horizontal scaling.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user