diff --git a/awx/main/dispatch/__init__.py b/awx/main/dispatch/__init__.py index 50f912427e..d97919dada 100644 --- a/awx/main/dispatch/__init__.py +++ b/awx/main/dispatch/__init__.py @@ -1,5 +1,85 @@ +import psycopg2 +import select +import sys +import logging + +from contextlib import contextmanager + from django.conf import settings +NOT_READY = ([], [], []) +if 'run_callback_receiver' in sys.argv: + logger = logging.getLogger('awx.main.commands.run_callback_receiver') +else: + logger = logging.getLogger('awx.main.dispatch') + + def get_local_queuename(): return settings.CLUSTER_HOST_ID + + +class PubSub(object): + def __init__(self, conn): + assert conn.autocommit, "Connection must be in autocommit mode." + self.conn = conn + + def listen(self, channel): + with self.conn.cursor() as cur: + cur.execute('LISTEN "%s";' % channel) + + def unlisten(self, channel): + with self.conn.cursor() as cur: + cur.execute('UNLISTEN "%s";' % channel) + + def notify(self, channel, payload): + with self.conn.cursor() as cur: + cur.execute('SELECT pg_notify(%s, %s);', (channel, payload)) + + def get_event(self, select_timeout=0): + # poll the connection, then return one event, if we have one. Else + # return None. + select.select([self.conn], [], [], select_timeout) + self.conn.poll() + if self.conn.notifies: + return self.conn.notifies.pop(0) + + def get_events(self, select_timeout=0): + # Poll the connection and return all events, if there are any. Else + # return None. + select.select([self.conn], [], [], select_timeout) # redundant? + self.conn.poll() + events = [] + while self.conn.notifies: + events.append(self.conn.notifies.pop(0)) + if events: + return events + + def events(self, select_timeout=5, yield_timeouts=False): + while True: + if select.select([self.conn], [], [], select_timeout) == NOT_READY: + if yield_timeouts: + yield None + else: + self.conn.poll() + while self.conn.notifies: + yield self.conn.notifies.pop(0) + + def close(self): + self.conn.close() + + +@contextmanager +def pg_bus_conn(): + conf = settings.DATABASES['default'] + conn = psycopg2.connect(dbname=conf['NAME'], + host=conf['HOST'], + user=conf['USER'], + password=conf['PASSWORD']) + # Django connection.cursor().connection doesn't have autocommit=True on + conn.set_session(autocommit=True) + pubsub = PubSub(conn) + yield pubsub + conn.close() + + diff --git a/awx/main/dispatch/control.py b/awx/main/dispatch/control.py index f938aab6b5..a584c9dfe5 100644 --- a/awx/main/dispatch/control.py +++ b/awx/main/dispatch/control.py @@ -1,11 +1,14 @@ import logging import socket - -from django.conf import settings +import string +import random +import json from awx.main.dispatch import get_local_queuename from kombu import Queue, Exchange, Producer, Consumer, Connection +from . import pg_bus_conn + logger = logging.getLogger('awx.main.dispatch') @@ -19,15 +22,10 @@ class Control(object): raise RuntimeError('{} must be in {}'.format(service, self.services)) self.service = service self.queuename = host or get_local_queuename() - self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename) def publish(self, msg, conn, **kwargs): - producer = Producer( - exchange=self.queue.exchange, - channel=conn, - routing_key=self.queuename - ) - producer.publish(msg, expiration=5, **kwargs) + # TODO: delete this method?? + raise RuntimeError("Publish called?!") def status(self, *args, **kwargs): return self.control_with_reply('status', *args, **kwargs) @@ -35,24 +33,29 @@ class Control(object): def running(self, *args, **kwargs): return self.control_with_reply('running', *args, **kwargs) + @classmethod + def generate_reply_queue_name(cls): + letters = string.ascii_lowercase + return 'reply_to_{}'.format(''.join(random.choice(letters) for i in range(8))) + def control_with_reply(self, command, timeout=5): logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename)) - reply_queue = Queue(name="amq.rabbitmq.reply-to") + reply_queue = Control.generate_reply_queue_name() self.result = None - with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: - with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True): - self.publish({'control': command}, conn, reply_to='amq.rabbitmq.reply-to') - try: - conn.drain_events(timeout=timeout) - except socket.timeout: - logger.error('{} did not reply within {}s'.format(self.service, timeout)) - raise - return self.result + + with pg_bus_conn() as conn: + conn.listen(reply_queue) + conn.notify(self.queuename, + json.dumps({'control': command, 'reply_to': reply_queue})) + + for reply in conn.events(select_timeout=timeout, yield_timeouts=True): + if reply is None: + logger.error(f'{self.service} did not reply within {timeout}s') + raise RuntimeError("{self.service} did not reply within {timeout}s") + break + + return json.loads(reply.payload) def control(self, msg, **kwargs): - with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: - self.publish(msg, conn) - - def process_message(self, body, message): - self.result = body - message.ack() + with pg_bus_conn() as conn: + conn.notify(self.queuename, json.dumps(msg)) diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index cf94572e75..fe77bd4c37 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -2,14 +2,13 @@ import inspect import logging import sys import json +import re from uuid import uuid4 -import psycopg2 from django.conf import settings -from kombu import Exchange, Producer from django.db import connection -from pgpubsub import PubSub +from . import pg_bus_conn logger = logging.getLogger('awx.main.dispatch') @@ -42,24 +41,22 @@ class task: add.apply_async([1, 1]) Adder.apply_async([1, 1]) - # Tasks can also define a specific target queue or exchange type: + # Tasks can also define a specific target queue or use the special fan-out queue tower_broadcast: @task(queue='slow-tasks') def snooze(): time.sleep(10) - @task(queue='tower_broadcast', exchange_type='fanout') + @task(queue='tower_broadcast') def announce(): print("Run this everywhere!") """ - def __init__(self, queue=None, exchange_type=None): + def __init__(self, queue=None): self.queue = queue - self.exchange_type = exchange_type def __call__(self, fn=None): queue = self.queue - exchange_type = self.exchange_type class PublisherMixin(object): @@ -89,15 +86,8 @@ class task: if callable(queue): queue = queue() if not settings.IS_TESTING(sys.argv): - conf = settings.DATABASES['default'] - conn = psycopg2.connect(dbname=conf['NAME'], - host=conf['HOST'], - user=conf['USER'], - password=conf['PASSWORD']) - conn.set_session(autocommit=True) - logger.warn(f"Send message to queue {queue}") - pubsub = PubSub(conn) - pubsub.notify(queue, json.dumps(obj)) + with pg_bus_conn() as conn: + conn.notify(queue, json.dumps(obj)) return (obj, queue) # If the object we're wrapping *is* a class (e.g., RunJob), return diff --git a/awx/main/dispatch/worker/__init__.py b/awx/main/dispatch/worker/__init__.py index 5472f83579..6fe8f64608 100644 --- a/awx/main/dispatch/worker/__init__.py +++ b/awx/main/dispatch/worker/__init__.py @@ -1,4 +1,3 @@ -from .base import AWXConsumer, AWXRedisConsumer, BaseWorker # noqa -from .basepg import AWXConsumerPG, BaseWorkerPG # noqa +from .base import AWXConsumerRedis, AWXConsumerPG, BaseWorker # noqa from .callback import CallbackBrokerWorker # noqa from .task import TaskWorker # noqa diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 0a6bf4396b..b90b41ce1c 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -7,6 +7,8 @@ import signal import sys import redis import json +import re +import psycopg2 from uuid import UUID from queue import Empty as QueueEmpty @@ -16,6 +18,7 @@ from kombu.mixins import ConsumerMixin from django.conf import settings from awx.main.dispatch.pool import WorkerPool +from awx.main.dispatch import pg_bus_conn if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -40,88 +43,7 @@ class WorkerSignalHandler: self.kill_now = True -class AWXConsumer(ConsumerMixin): - - def __init__(self, name, connection, worker, queues=[], pool=None): - self.connection = connection - self.total_messages = 0 - self.queues = queues - self.worker = worker - self.pool = pool - if pool is None: - self.pool = WorkerPool() - self.pool.init_workers(self.worker.work_loop) - - def get_consumers(self, Consumer, channel): - logger.debug(self.listening_on) - return [Consumer(queues=self.queues, accept=['json'], - callbacks=[self.process_task])] - - @property - def listening_on(self): - return 'listening on {}'.format([ - '{} [{}]'.format(q.name, q.exchange.type) for q in self.queues - ]) - - def control(self, body, message): - logger.warn('Consumer received control message {}'.format(body)) - control = body.get('control') - if control in ('status', 'running'): - producer = Producer( - channel=self.connection, - routing_key=message.properties['reply_to'] - ) - if control == 'status': - msg = '\n'.join([self.listening_on, self.pool.debug()]) - elif control == 'running': - msg = [] - for worker in self.pool.workers: - worker.calculate_managed_tasks() - msg.extend(worker.managed_tasks.keys()) - producer.publish(msg) - elif control == 'reload': - for worker in self.pool.workers: - worker.quit() - else: - logger.error('unrecognized control message: {}'.format(control)) - message.ack() - - def process_task(self, body, message): - if 'control' in body: - try: - return self.control(body, message) - except Exception: - logger.exception("Exception handling control message:") - return - if len(self.pool): - if "uuid" in body and body['uuid']: - try: - queue = UUID(body['uuid']).int % len(self.pool) - except Exception: - queue = self.total_messages % len(self.pool) - else: - queue = self.total_messages % len(self.pool) - else: - queue = 0 - self.pool.write(queue, body) - self.total_messages += 1 - message.ack() - - def run(self, *args, **kwargs): - signal.signal(signal.SIGINT, self.stop) - signal.signal(signal.SIGTERM, self.stop) - self.worker.on_start() - super(AWXConsumer, self).run(*args, **kwargs) - - def stop(self, signum, frame): - self.should_stop = True # this makes the kombu mixin stop consuming - logger.warn('received {}, stopping'.format(signame(signum))) - self.worker.on_stop() - raise SystemExit() - - -class AWXRedisConsumer(object): - +class AWXConsumerBase(object): def __init__(self, name, connection, worker, queues=[], pool=None): self.should_stop = False @@ -139,15 +61,11 @@ class AWXRedisConsumer(object): def listening_on(self): return f'listening on {self.queues}' - ''' - def control(self, body, message): + def control(self, body): logger.warn(body) control = body.get('control') if control in ('status', 'running'): - producer = Producer( - channel=self.connection, - routing_key=message.properties['reply_to'] - ) + reply_queue = body['reply_to'] if control == 'status': msg = '\n'.join([self.listening_on, self.pool.debug()]) elif control == 'running': @@ -155,21 +73,21 @@ class AWXRedisConsumer(object): for worker in self.pool.workers: worker.calculate_managed_tasks() msg.extend(worker.managed_tasks.keys()) - producer.publish(msg) + + with pg_bus_conn() as conn: + conn.notify(reply_queue, json.dumps(msg)) elif control == 'reload': for worker in self.pool.workers: worker.quit() else: logger.error('unrecognized control message: {}'.format(control)) - message.ack() - ''' - def process_task(self, body, message): + def process_task(self, body): if 'control' in body: try: - return self.control(body, message) + return self.control(body) except Exception: - logger.exception("Exception handling control message:") + logger.exception(f"Exception handling control message: {body}") return if len(self.pool): if "uuid" in body and body['uuid']: @@ -189,13 +107,7 @@ class AWXRedisConsumer(object): signal.signal(signal.SIGTERM, self.stop) self.worker.on_start() - queue = redis.Redis.from_url(settings.BROKER_URL) - while True: - res = queue.blpop(self.queues) - res = json.loads(res[1]) - self.process_task(res, res) - if self.should_stop: - return + # Child should implement other things here def stop(self, signum, frame): self.should_stop = True # this makes the kombu mixin stop consuming @@ -204,6 +116,37 @@ class AWXRedisConsumer(object): raise SystemExit() +class AWXConsumerRedis(AWXConsumerBase): + def run(self, *args, **kwargs): + super(AWXConsumerRedis, self).run(*args, **kwargs) + + queue = redis.Redis.from_url(settings.BROKER_URL) + while True: + res = queue.blpop(self.queues) + res = json.loads(res[1]) + self.process_task(res) + if self.should_stop: + return + + +class AWXConsumerPG(AWXConsumerBase): + def run(self, *args, **kwargs): + super(AWXConsumerPG, self).run(*args, **kwargs) + + logger.warn(f"Running worker {self.name} listening to queues {self.queues}") + + while True: + try: + with pg_bus_conn() as conn: + for queue in self.queues: + conn.listen(queue) + for e in conn.events(): + self.process_task(json.loads(e.payload)) + except psycopg2.InterfaceError: + logger.warn("Stale Postgres message bus connection, reconnecting") + continue + + class BaseWorker(object): def read(self, queue): diff --git a/awx/main/dispatch/worker/basepg.py b/awx/main/dispatch/worker/basepg.py deleted file mode 100644 index 7a35fc59d6..0000000000 --- a/awx/main/dispatch/worker/basepg.py +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright (c) 2018 Ansible by Red Hat -# All Rights Reserved. - -import os -import logging -import signal -import sys -import json -from uuid import UUID -from queue import Empty as QueueEmpty - -from django import db -from django.db import connection as pg_connection - -from pgpubsub import PubSub - -from awx.main.dispatch.pool import WorkerPool - -SHORT_CIRCUIT = False - -if 'run_callback_receiver' in sys.argv: - logger = logging.getLogger('awx.main.commands.run_callback_receiver') -else: - logger = logging.getLogger('awx.main.dispatch') - - -def signame(sig): - return dict( - (k, v) for v, k in signal.__dict__.items() - if v.startswith('SIG') and not v.startswith('SIG_') - )[sig] - - -class WorkerSignalHandler: - - def __init__(self): - self.kill_now = False - signal.signal(signal.SIGINT, self.exit_gracefully) - - def exit_gracefully(self, *args, **kwargs): - self.kill_now = True - - -class AWXConsumerPG(object): - - def __init__(self, name, connection, worker, queues=[], pool=None): - self.name = name - self.connection = pg_connection - self.total_messages = 0 - self.queues = queues - self.worker = worker - self.pool = pool - # TODO, maybe get new connection and reconnect periodically - self.pubsub = PubSub(pg_connection.cursor().connection) - if pool is None: - self.pool = WorkerPool() - self.pool.init_workers(self.worker.work_loop) - - @property - def listening_on(self): - return 'listening on {}'.format([f'{q}' for q in self.queues]) - - def control(self, body, message): - logger.warn(body) - control = body.get('control') - if control in ('status', 'running'): - if control == 'status': - msg = '\n'.join([self.listening_on, self.pool.debug()]) - elif control == 'running': - msg = [] - for worker in self.pool.workers: - worker.calculate_managed_tasks() - msg.extend(worker.managed_tasks.keys()) - self.pubsub.notify(message.properties['reply_to'], msg) - elif control == 'reload': - for worker in self.pool.workers: - worker.quit() - else: - logger.error('unrecognized control message: {}'.format(control)) - - def process_task(self, body, message): - if SHORT_CIRCUIT or 'control' in body: - try: - return self.control(body, message) - except Exception: - logger.exception("Exception handling control message:") - return - if len(self.pool): - if "uuid" in body and body['uuid']: - try: - queue = UUID(body['uuid']).int % len(self.pool) - except Exception: - queue = self.total_messages % len(self.pool) - else: - queue = self.total_messages % len(self.pool) - else: - queue = 0 - self.pool.write(queue, body) - self.total_messages += 1 - - def run(self, *args, **kwargs): - signal.signal(signal.SIGINT, self.stop) - signal.signal(signal.SIGTERM, self.stop) - self.worker.on_start() - - logger.warn(f"Running worker {self.name} listening to queues {self.queues}") - self.pubsub = PubSub(pg_connection.cursor().connection) - for queue in self.queues: - self.pubsub.listen(queue) - for e in self.pubsub.events(): - logger.warn(f"Processing task {e}") - self.process_task(json.loads(e.payload), e) - - def stop(self, signum, frame): - logger.warn('received {}, stopping'.format(signame(signum))) - for queue in self.queues: - self.pubsub.unlisten(queue) - self.worker.on_stop() - raise SystemExit() - - -class BaseWorkerPG(object): - - def work_loop(self, queue, finished, idx, *args): - ppid = os.getppid() - signal_handler = WorkerSignalHandler() - while not signal_handler.kill_now: - # if the parent PID changes, this process has been orphaned - # via e.g., segfault or sigkill, we should exit too - if os.getppid() != ppid: - break - try: - body = queue.get(block=True, timeout=1) - if body == 'QUIT': - break - except QueueEmpty: - continue - except Exception as e: - logger.error("Exception on worker {}, restarting: ".format(idx) + str(e)) - continue - try: - for conn in db.connections.all(): - # If the database connection has a hiccup during the prior message, close it - # so we can establish a new connection - conn.close_if_unusable_or_obsolete() - self.perform_work(body, *args) - finally: - if 'uuid' in body: - uuid = body['uuid'] - logger.debug('task {} is finished'.format(uuid)) - finished.put(uuid) - logger.warn('worker exiting gracefully pid:{}'.format(os.getpid())) - - def perform_work(self, body): - raise NotImplementedError() - - def on_start(self): - pass - - def on_stop(self): - pass diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py index 80c1907fc1..7e7437d445 100644 --- a/awx/main/dispatch/worker/task.py +++ b/awx/main/dispatch/worker/task.py @@ -8,12 +8,12 @@ from kubernetes.config import kube_config from awx.main.tasks import dispatch_startup, inform_cluster_of_shutdown -from .basepg import BaseWorkerPG +from .base import BaseWorker logger = logging.getLogger('awx.main.dispatch') -class TaskWorker(BaseWorkerPG): +class TaskWorker(BaseWorker): ''' A worker implementation that deserializes task messages and runs native Python code. diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 269b01d98a..f1c1aed0c3 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -5,7 +5,7 @@ from django.conf import settings from django.core.management.base import BaseCommand from kombu import Exchange, Queue, Connection -from awx.main.dispatch.worker import AWXRedisConsumer, CallbackBrokerWorker +from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker class Command(BaseCommand): @@ -20,7 +20,7 @@ class Command(BaseCommand): with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: consumer = None try: - consumer = AWXRedisConsumer( + consumer = AWXConsumerRedis( 'callback_receiver', conn, CallbackBrokerWorker(), diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index ea6098db4c..c62dc6c732 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -62,18 +62,17 @@ class Command(BaseCommand): # in cpython itself: # https://bugs.python.org/issue37429 AWXProxyHandler.disable() - with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: - try: - queues = ['tower_broadcast_all'] + settings.AWX_CELERY_QUEUES_STATIC + [get_local_queuename()] - consumer = AWXConsumerPG( - 'dispatcher', - conn, - TaskWorker(), - queues, - AutoscalePool(min_workers=4) - ) - consumer.run() - except KeyboardInterrupt: - logger.debug('Terminating Task Dispatcher') - if consumer: - consumer.stop() + try: + queues = ['tower_broadcast_all'] + settings.AWX_CELERY_QUEUES_STATIC + [get_local_queuename()] + consumer = AWXConsumerPG( + 'dispatcher', + None, + TaskWorker(), + queues, + AutoscalePool(min_workers=4) + ) + consumer.run() + except KeyboardInterrupt: + logger.debug('Terminating Task Dispatcher') + if consumer: + consumer.stop() diff --git a/awx/main/queue.py b/awx/main/queue.py index 40cd58ce23..8d4fffdf87 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -30,7 +30,6 @@ class AnsibleJSONEncoder(json.JSONEncoder): class CallbackQueueDispatcher(object): def __init__(self): - self.callback_connection = getattr(settings, 'BROKER_URL', None) self.queue = getattr(settings, 'CALLBACK_QUEUE', '') self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher') self.connection = redis.Redis.from_url(settings.BROKER_URL) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 292ae0f17e..e45f34cd66 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -264,7 +264,7 @@ def apply_cluster_membership_policies(): logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) -@task(queue='tower_broadcast_all', exchange_type='fanout') +@task(queue='tower_broadcast_all') def handle_setting_changes(setting_keys): orig_len = len(setting_keys) for i in range(orig_len): @@ -275,7 +275,7 @@ def handle_setting_changes(setting_keys): cache.delete_many(cache_keys) -@task(queue='tower_broadcast_all', exchange_type='fanout') +@task(queue='tower_broadcast_all') def delete_project_files(project_path): # TODO: possibly implement some retry logic lock_file = project_path + '.lock' @@ -293,7 +293,7 @@ def delete_project_files(project_path): logger.exception('Could not remove lock file {}'.format(lock_file)) -@task(queue='tower_broadcast_all', exchange_type='fanout') +@task(queue='tower_broadcast_all') def profile_sql(threshold=1, minutes=1): if threshold == 0: cache.delete('awx-profile-sql-threshold') diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 252e39d708..f6ad6f6d7c 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -421,7 +421,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') BROKER_DURABILITY = True BROKER_POOL_LIMIT = None -BROKER_URL = 'redis://localhost:6379;' +BROKER_URL = 'redis://localhost:6379' BROKER_TRANSPORT_OPTIONS = {} CELERY_DEFAULT_QUEUE = 'awx_private_queue' CELERYBEAT_SCHEDULE = { diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index 8ebaa8747e..e2ae322e3d 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -14,6 +14,7 @@ import os import urllib.parse import sys +from urllib import parse # Enable the following lines and install the browser extension to use Django debug toolbar # if your deployment method is not VMWare of Docker-for-Mac you may @@ -53,12 +54,13 @@ if "pytest" in sys.modules: # Default to "just works" for single tower docker BROKER_URL = os.environ.get('BROKER_URL', "redis://redis_1:6379") +redis_parts = parse.urlparse(BROKER_URL) + CHANNEL_LAYERS = { "default": { "BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer", "CONFIG": { - "hosts": [(os.environ.get('REDIS_HOST', 'redis_1'), - int(os.environ.get('REDIS_PORT', 6379)))], + "hosts": [(redis_parts.hostname, redis_parts.port)] }, }, } diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml index caa2a7540f..6a7f393e52 100644 --- a/tools/docker-compose-cluster.yml +++ b/tools/docker-compose-cluster.yml @@ -24,10 +24,7 @@ services: #entrypoint: ["bash"] environment: CURRENT_UID: - # BROKER_URL will go away when we use postgres as our message broker BROKER_URL: "redis://redis_1:63791" - REDIS_HOST: redis_1 - REDIS_PORT: 63791 SDB_HOST: 0.0.0.0 SDB_PORT: 5899 AWX_GROUP_QUEUES: alpha,tower @@ -47,10 +44,7 @@ services: working_dir: "/awx_devel" environment: CURRENT_UID: - # BROKER_URL will go away when we use postgres as our message broker - BROKER_URL: "redis://redis_1:63791" - REDIS_HOST: redis_2 - REDIS_PORT: 63792 + BROKER_URL: "redis://redis_2:63792" SDB_HOST: 0.0.0.0 SDB_PORT: 7899 AWX_GROUP_QUEUES: bravo,tower @@ -69,10 +63,7 @@ services: working_dir: "/awx_devel" environment: CURRENT_UID: - # BROKER_URL will go away when we use postgres as our message broker - BROKER_URL: "redis://redis_1:63791" - REDIS_HOST: redis_3 - REDIS_PORT: 63793 + BROKER_URL: "redis://redis_3:63793" SDB_HOST: 0.0.0.0 SDB_PORT: 8899 AWX_GROUP_QUEUES: charlie,tower