combine all the broker replacement pieces

* local redis for event processing
* postgres for message broker
* redis for websockets
This commit is contained in:
chris meyers 2020-01-08 15:44:04 -05:00 committed by Ryan Petrello
parent 558e92806b
commit 2a2c34f567
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
14 changed files with 188 additions and 343 deletions

View File

@ -1,5 +1,85 @@
import psycopg2
import select
import sys
import logging
from contextlib import contextmanager
from django.conf import settings
NOT_READY = ([], [], [])
if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
else:
logger = logging.getLogger('awx.main.dispatch')
def get_local_queuename():
return settings.CLUSTER_HOST_ID
class PubSub(object):
def __init__(self, conn):
assert conn.autocommit, "Connection must be in autocommit mode."
self.conn = conn
def listen(self, channel):
with self.conn.cursor() as cur:
cur.execute('LISTEN "%s";' % channel)
def unlisten(self, channel):
with self.conn.cursor() as cur:
cur.execute('UNLISTEN "%s";' % channel)
def notify(self, channel, payload):
with self.conn.cursor() as cur:
cur.execute('SELECT pg_notify(%s, %s);', (channel, payload))
def get_event(self, select_timeout=0):
# poll the connection, then return one event, if we have one. Else
# return None.
select.select([self.conn], [], [], select_timeout)
self.conn.poll()
if self.conn.notifies:
return self.conn.notifies.pop(0)
def get_events(self, select_timeout=0):
# Poll the connection and return all events, if there are any. Else
# return None.
select.select([self.conn], [], [], select_timeout) # redundant?
self.conn.poll()
events = []
while self.conn.notifies:
events.append(self.conn.notifies.pop(0))
if events:
return events
def events(self, select_timeout=5, yield_timeouts=False):
while True:
if select.select([self.conn], [], [], select_timeout) == NOT_READY:
if yield_timeouts:
yield None
else:
self.conn.poll()
while self.conn.notifies:
yield self.conn.notifies.pop(0)
def close(self):
self.conn.close()
@contextmanager
def pg_bus_conn():
conf = settings.DATABASES['default']
conn = psycopg2.connect(dbname=conf['NAME'],
host=conf['HOST'],
user=conf['USER'],
password=conf['PASSWORD'])
# Django connection.cursor().connection doesn't have autocommit=True on
conn.set_session(autocommit=True)
pubsub = PubSub(conn)
yield pubsub
conn.close()

View File

@ -1,11 +1,14 @@
import logging
import socket
from django.conf import settings
import string
import random
import json
from awx.main.dispatch import get_local_queuename
from kombu import Queue, Exchange, Producer, Consumer, Connection
from . import pg_bus_conn
logger = logging.getLogger('awx.main.dispatch')
@ -19,15 +22,10 @@ class Control(object):
raise RuntimeError('{} must be in {}'.format(service, self.services))
self.service = service
self.queuename = host or get_local_queuename()
self.queue = Queue(self.queuename, Exchange(self.queuename), routing_key=self.queuename)
def publish(self, msg, conn, **kwargs):
producer = Producer(
exchange=self.queue.exchange,
channel=conn,
routing_key=self.queuename
)
producer.publish(msg, expiration=5, **kwargs)
# TODO: delete this method??
raise RuntimeError("Publish called?!")
def status(self, *args, **kwargs):
return self.control_with_reply('status', *args, **kwargs)
@ -35,24 +33,29 @@ class Control(object):
def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs)
@classmethod
def generate_reply_queue_name(cls):
letters = string.ascii_lowercase
return 'reply_to_{}'.format(''.join(random.choice(letters) for i in range(8)))
def control_with_reply(self, command, timeout=5):
logger.warn('checking {} {} for {}'.format(self.service, command, self.queuename))
reply_queue = Queue(name="amq.rabbitmq.reply-to")
reply_queue = Control.generate_reply_queue_name()
self.result = None
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
with Consumer(conn, reply_queue, callbacks=[self.process_message], no_ack=True):
self.publish({'control': command}, conn, reply_to='amq.rabbitmq.reply-to')
try:
conn.drain_events(timeout=timeout)
except socket.timeout:
logger.error('{} did not reply within {}s'.format(self.service, timeout))
raise
return self.result
with pg_bus_conn() as conn:
conn.listen(reply_queue)
conn.notify(self.queuename,
json.dumps({'control': command, 'reply_to': reply_queue}))
for reply in conn.events(select_timeout=timeout, yield_timeouts=True):
if reply is None:
logger.error(f'{self.service} did not reply within {timeout}s')
raise RuntimeError("{self.service} did not reply within {timeout}s")
break
return json.loads(reply.payload)
def control(self, msg, **kwargs):
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
self.publish(msg, conn)
def process_message(self, body, message):
self.result = body
message.ack()
with pg_bus_conn() as conn:
conn.notify(self.queuename, json.dumps(msg))

View File

@ -2,14 +2,13 @@ import inspect
import logging
import sys
import json
import re
from uuid import uuid4
import psycopg2
from django.conf import settings
from kombu import Exchange, Producer
from django.db import connection
from pgpubsub import PubSub
from . import pg_bus_conn
logger = logging.getLogger('awx.main.dispatch')
@ -42,24 +41,22 @@ class task:
add.apply_async([1, 1])
Adder.apply_async([1, 1])
# Tasks can also define a specific target queue or exchange type:
# Tasks can also define a specific target queue or use the special fan-out queue tower_broadcast:
@task(queue='slow-tasks')
def snooze():
time.sleep(10)
@task(queue='tower_broadcast', exchange_type='fanout')
@task(queue='tower_broadcast')
def announce():
print("Run this everywhere!")
"""
def __init__(self, queue=None, exchange_type=None):
def __init__(self, queue=None):
self.queue = queue
self.exchange_type = exchange_type
def __call__(self, fn=None):
queue = self.queue
exchange_type = self.exchange_type
class PublisherMixin(object):
@ -89,15 +86,8 @@ class task:
if callable(queue):
queue = queue()
if not settings.IS_TESTING(sys.argv):
conf = settings.DATABASES['default']
conn = psycopg2.connect(dbname=conf['NAME'],
host=conf['HOST'],
user=conf['USER'],
password=conf['PASSWORD'])
conn.set_session(autocommit=True)
logger.warn(f"Send message to queue {queue}")
pubsub = PubSub(conn)
pubsub.notify(queue, json.dumps(obj))
with pg_bus_conn() as conn:
conn.notify(queue, json.dumps(obj))
return (obj, queue)
# If the object we're wrapping *is* a class (e.g., RunJob), return

View File

@ -1,4 +1,3 @@
from .base import AWXConsumer, AWXRedisConsumer, BaseWorker # noqa
from .basepg import AWXConsumerPG, BaseWorkerPG # noqa
from .base import AWXConsumerRedis, AWXConsumerPG, BaseWorker # noqa
from .callback import CallbackBrokerWorker # noqa
from .task import TaskWorker # noqa

View File

@ -7,6 +7,8 @@ import signal
import sys
import redis
import json
import re
import psycopg2
from uuid import UUID
from queue import Empty as QueueEmpty
@ -16,6 +18,7 @@ from kombu.mixins import ConsumerMixin
from django.conf import settings
from awx.main.dispatch.pool import WorkerPool
from awx.main.dispatch import pg_bus_conn
if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@ -40,88 +43,7 @@ class WorkerSignalHandler:
self.kill_now = True
class AWXConsumer(ConsumerMixin):
def __init__(self, name, connection, worker, queues=[], pool=None):
self.connection = connection
self.total_messages = 0
self.queues = queues
self.worker = worker
self.pool = pool
if pool is None:
self.pool = WorkerPool()
self.pool.init_workers(self.worker.work_loop)
def get_consumers(self, Consumer, channel):
logger.debug(self.listening_on)
return [Consumer(queues=self.queues, accept=['json'],
callbacks=[self.process_task])]
@property
def listening_on(self):
return 'listening on {}'.format([
'{} [{}]'.format(q.name, q.exchange.type) for q in self.queues
])
def control(self, body, message):
logger.warn('Consumer received control message {}'.format(body))
control = body.get('control')
if control in ('status', 'running'):
producer = Producer(
channel=self.connection,
routing_key=message.properties['reply_to']
)
if control == 'status':
msg = '\n'.join([self.listening_on, self.pool.debug()])
elif control == 'running':
msg = []
for worker in self.pool.workers:
worker.calculate_managed_tasks()
msg.extend(worker.managed_tasks.keys())
producer.publish(msg)
elif control == 'reload':
for worker in self.pool.workers:
worker.quit()
else:
logger.error('unrecognized control message: {}'.format(control))
message.ack()
def process_task(self, body, message):
if 'control' in body:
try:
return self.control(body, message)
except Exception:
logger.exception("Exception handling control message:")
return
if len(self.pool):
if "uuid" in body and body['uuid']:
try:
queue = UUID(body['uuid']).int % len(self.pool)
except Exception:
queue = self.total_messages % len(self.pool)
else:
queue = self.total_messages % len(self.pool)
else:
queue = 0
self.pool.write(queue, body)
self.total_messages += 1
message.ack()
def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
self.worker.on_start()
super(AWXConsumer, self).run(*args, **kwargs)
def stop(self, signum, frame):
self.should_stop = True # this makes the kombu mixin stop consuming
logger.warn('received {}, stopping'.format(signame(signum)))
self.worker.on_stop()
raise SystemExit()
class AWXRedisConsumer(object):
class AWXConsumerBase(object):
def __init__(self, name, connection, worker, queues=[], pool=None):
self.should_stop = False
@ -139,15 +61,11 @@ class AWXRedisConsumer(object):
def listening_on(self):
return f'listening on {self.queues}'
'''
def control(self, body, message):
def control(self, body):
logger.warn(body)
control = body.get('control')
if control in ('status', 'running'):
producer = Producer(
channel=self.connection,
routing_key=message.properties['reply_to']
)
reply_queue = body['reply_to']
if control == 'status':
msg = '\n'.join([self.listening_on, self.pool.debug()])
elif control == 'running':
@ -155,21 +73,21 @@ class AWXRedisConsumer(object):
for worker in self.pool.workers:
worker.calculate_managed_tasks()
msg.extend(worker.managed_tasks.keys())
producer.publish(msg)
with pg_bus_conn() as conn:
conn.notify(reply_queue, json.dumps(msg))
elif control == 'reload':
for worker in self.pool.workers:
worker.quit()
else:
logger.error('unrecognized control message: {}'.format(control))
message.ack()
'''
def process_task(self, body, message):
def process_task(self, body):
if 'control' in body:
try:
return self.control(body, message)
return self.control(body)
except Exception:
logger.exception("Exception handling control message:")
logger.exception(f"Exception handling control message: {body}")
return
if len(self.pool):
if "uuid" in body and body['uuid']:
@ -189,13 +107,7 @@ class AWXRedisConsumer(object):
signal.signal(signal.SIGTERM, self.stop)
self.worker.on_start()
queue = redis.Redis.from_url(settings.BROKER_URL)
while True:
res = queue.blpop(self.queues)
res = json.loads(res[1])
self.process_task(res, res)
if self.should_stop:
return
# Child should implement other things here
def stop(self, signum, frame):
self.should_stop = True # this makes the kombu mixin stop consuming
@ -204,6 +116,37 @@ class AWXRedisConsumer(object):
raise SystemExit()
class AWXConsumerRedis(AWXConsumerBase):
def run(self, *args, **kwargs):
super(AWXConsumerRedis, self).run(*args, **kwargs)
queue = redis.Redis.from_url(settings.BROKER_URL)
while True:
res = queue.blpop(self.queues)
res = json.loads(res[1])
self.process_task(res)
if self.should_stop:
return
class AWXConsumerPG(AWXConsumerBase):
def run(self, *args, **kwargs):
super(AWXConsumerPG, self).run(*args, **kwargs)
logger.warn(f"Running worker {self.name} listening to queues {self.queues}")
while True:
try:
with pg_bus_conn() as conn:
for queue in self.queues:
conn.listen(queue)
for e in conn.events():
self.process_task(json.loads(e.payload))
except psycopg2.InterfaceError:
logger.warn("Stale Postgres message bus connection, reconnecting")
continue
class BaseWorker(object):
def read(self, queue):

View File

@ -1,161 +0,0 @@
# Copyright (c) 2018 Ansible by Red Hat
# All Rights Reserved.
import os
import logging
import signal
import sys
import json
from uuid import UUID
from queue import Empty as QueueEmpty
from django import db
from django.db import connection as pg_connection
from pgpubsub import PubSub
from awx.main.dispatch.pool import WorkerPool
SHORT_CIRCUIT = False
if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
else:
logger = logging.getLogger('awx.main.dispatch')
def signame(sig):
return dict(
(k, v) for v, k in signal.__dict__.items()
if v.startswith('SIG') and not v.startswith('SIG_')
)[sig]
class WorkerSignalHandler:
def __init__(self):
self.kill_now = False
signal.signal(signal.SIGINT, self.exit_gracefully)
def exit_gracefully(self, *args, **kwargs):
self.kill_now = True
class AWXConsumerPG(object):
def __init__(self, name, connection, worker, queues=[], pool=None):
self.name = name
self.connection = pg_connection
self.total_messages = 0
self.queues = queues
self.worker = worker
self.pool = pool
# TODO, maybe get new connection and reconnect periodically
self.pubsub = PubSub(pg_connection.cursor().connection)
if pool is None:
self.pool = WorkerPool()
self.pool.init_workers(self.worker.work_loop)
@property
def listening_on(self):
return 'listening on {}'.format([f'{q}' for q in self.queues])
def control(self, body, message):
logger.warn(body)
control = body.get('control')
if control in ('status', 'running'):
if control == 'status':
msg = '\n'.join([self.listening_on, self.pool.debug()])
elif control == 'running':
msg = []
for worker in self.pool.workers:
worker.calculate_managed_tasks()
msg.extend(worker.managed_tasks.keys())
self.pubsub.notify(message.properties['reply_to'], msg)
elif control == 'reload':
for worker in self.pool.workers:
worker.quit()
else:
logger.error('unrecognized control message: {}'.format(control))
def process_task(self, body, message):
if SHORT_CIRCUIT or 'control' in body:
try:
return self.control(body, message)
except Exception:
logger.exception("Exception handling control message:")
return
if len(self.pool):
if "uuid" in body and body['uuid']:
try:
queue = UUID(body['uuid']).int % len(self.pool)
except Exception:
queue = self.total_messages % len(self.pool)
else:
queue = self.total_messages % len(self.pool)
else:
queue = 0
self.pool.write(queue, body)
self.total_messages += 1
def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
self.worker.on_start()
logger.warn(f"Running worker {self.name} listening to queues {self.queues}")
self.pubsub = PubSub(pg_connection.cursor().connection)
for queue in self.queues:
self.pubsub.listen(queue)
for e in self.pubsub.events():
logger.warn(f"Processing task {e}")
self.process_task(json.loads(e.payload), e)
def stop(self, signum, frame):
logger.warn('received {}, stopping'.format(signame(signum)))
for queue in self.queues:
self.pubsub.unlisten(queue)
self.worker.on_stop()
raise SystemExit()
class BaseWorkerPG(object):
def work_loop(self, queue, finished, idx, *args):
ppid = os.getppid()
signal_handler = WorkerSignalHandler()
while not signal_handler.kill_now:
# if the parent PID changes, this process has been orphaned
# via e.g., segfault or sigkill, we should exit too
if os.getppid() != ppid:
break
try:
body = queue.get(block=True, timeout=1)
if body == 'QUIT':
break
except QueueEmpty:
continue
except Exception as e:
logger.error("Exception on worker {}, restarting: ".format(idx) + str(e))
continue
try:
for conn in db.connections.all():
# If the database connection has a hiccup during the prior message, close it
# so we can establish a new connection
conn.close_if_unusable_or_obsolete()
self.perform_work(body, *args)
finally:
if 'uuid' in body:
uuid = body['uuid']
logger.debug('task {} is finished'.format(uuid))
finished.put(uuid)
logger.warn('worker exiting gracefully pid:{}'.format(os.getpid()))
def perform_work(self, body):
raise NotImplementedError()
def on_start(self):
pass
def on_stop(self):
pass

View File

@ -8,12 +8,12 @@ from kubernetes.config import kube_config
from awx.main.tasks import dispatch_startup, inform_cluster_of_shutdown
from .basepg import BaseWorkerPG
from .base import BaseWorker
logger = logging.getLogger('awx.main.dispatch')
class TaskWorker(BaseWorkerPG):
class TaskWorker(BaseWorker):
'''
A worker implementation that deserializes task messages and runs native
Python code.

View File

@ -5,7 +5,7 @@ from django.conf import settings
from django.core.management.base import BaseCommand
from kombu import Exchange, Queue, Connection
from awx.main.dispatch.worker import AWXRedisConsumer, CallbackBrokerWorker
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
class Command(BaseCommand):
@ -20,7 +20,7 @@ class Command(BaseCommand):
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
consumer = None
try:
consumer = AWXRedisConsumer(
consumer = AWXConsumerRedis(
'callback_receiver',
conn,
CallbackBrokerWorker(),

View File

@ -62,18 +62,17 @@ class Command(BaseCommand):
# in cpython itself:
# https://bugs.python.org/issue37429
AWXProxyHandler.disable()
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
try:
queues = ['tower_broadcast_all'] + settings.AWX_CELERY_QUEUES_STATIC + [get_local_queuename()]
consumer = AWXConsumerPG(
'dispatcher',
conn,
TaskWorker(),
queues,
AutoscalePool(min_workers=4)
)
consumer.run()
except KeyboardInterrupt:
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()
try:
queues = ['tower_broadcast_all'] + settings.AWX_CELERY_QUEUES_STATIC + [get_local_queuename()]
consumer = AWXConsumerPG(
'dispatcher',
None,
TaskWorker(),
queues,
AutoscalePool(min_workers=4)
)
consumer.run()
except KeyboardInterrupt:
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()

View File

@ -30,7 +30,6 @@ class AnsibleJSONEncoder(json.JSONEncoder):
class CallbackQueueDispatcher(object):
def __init__(self):
self.callback_connection = getattr(settings, 'BROKER_URL', None)
self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
self.connection = redis.Redis.from_url(settings.BROKER_URL)

View File

@ -264,7 +264,7 @@ def apply_cluster_membership_policies():
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
@task(queue='tower_broadcast_all', exchange_type='fanout')
@task(queue='tower_broadcast_all')
def handle_setting_changes(setting_keys):
orig_len = len(setting_keys)
for i in range(orig_len):
@ -275,7 +275,7 @@ def handle_setting_changes(setting_keys):
cache.delete_many(cache_keys)
@task(queue='tower_broadcast_all', exchange_type='fanout')
@task(queue='tower_broadcast_all')
def delete_project_files(project_path):
# TODO: possibly implement some retry logic
lock_file = project_path + '.lock'
@ -293,7 +293,7 @@ def delete_project_files(project_path):
logger.exception('Could not remove lock file {}'.format(lock_file))
@task(queue='tower_broadcast_all', exchange_type='fanout')
@task(queue='tower_broadcast_all')
def profile_sql(threshold=1, minutes=1):
if threshold == 0:
cache.delete('awx-profile-sql-threshold')

View File

@ -421,7 +421,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
BROKER_DURABILITY = True
BROKER_POOL_LIMIT = None
BROKER_URL = 'redis://localhost:6379;'
BROKER_URL = 'redis://localhost:6379'
BROKER_TRANSPORT_OPTIONS = {}
CELERY_DEFAULT_QUEUE = 'awx_private_queue'
CELERYBEAT_SCHEDULE = {

View File

@ -14,6 +14,7 @@
import os
import urllib.parse
import sys
from urllib import parse
# Enable the following lines and install the browser extension to use Django debug toolbar
# if your deployment method is not VMWare of Docker-for-Mac you may
@ -53,12 +54,13 @@ if "pytest" in sys.modules:
# Default to "just works" for single tower docker
BROKER_URL = os.environ.get('BROKER_URL', "redis://redis_1:6379")
redis_parts = parse.urlparse(BROKER_URL)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "awx.main.channels.RedisGroupBroadcastChannelLayer",
"CONFIG": {
"hosts": [(os.environ.get('REDIS_HOST', 'redis_1'),
int(os.environ.get('REDIS_PORT', 6379)))],
"hosts": [(redis_parts.hostname, redis_parts.port)]
},
},
}

View File

@ -24,10 +24,7 @@ services:
#entrypoint: ["bash"]
environment:
CURRENT_UID:
# BROKER_URL will go away when we use postgres as our message broker
BROKER_URL: "redis://redis_1:63791"
REDIS_HOST: redis_1
REDIS_PORT: 63791
SDB_HOST: 0.0.0.0
SDB_PORT: 5899
AWX_GROUP_QUEUES: alpha,tower
@ -47,10 +44,7 @@ services:
working_dir: "/awx_devel"
environment:
CURRENT_UID:
# BROKER_URL will go away when we use postgres as our message broker
BROKER_URL: "redis://redis_1:63791"
REDIS_HOST: redis_2
REDIS_PORT: 63792
BROKER_URL: "redis://redis_2:63792"
SDB_HOST: 0.0.0.0
SDB_PORT: 7899
AWX_GROUP_QUEUES: bravo,tower
@ -69,10 +63,7 @@ services:
working_dir: "/awx_devel"
environment:
CURRENT_UID:
# BROKER_URL will go away when we use postgres as our message broker
BROKER_URL: "redis://redis_1:63791"
REDIS_HOST: redis_3
REDIS_PORT: 63793
BROKER_URL: "redis://redis_3:63793"
SDB_HOST: 0.0.0.0
SDB_PORT: 8899
AWX_GROUP_QUEUES: charlie,tower