From 355fb125cb447701ffb4316e4cddbc363264dc80 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Tue, 7 Jan 2020 15:18:16 -0500 Subject: [PATCH] redis events --- awx/main/dispatch/worker/__init__.py | 2 +- awx/main/dispatch/worker/base.py | 87 +++++++++++++++++++ .../commands/run_callback_receiver.py | 12 +-- awx/main/queue.py | 43 +-------- 4 files changed, 95 insertions(+), 49 deletions(-) diff --git a/awx/main/dispatch/worker/__init__.py b/awx/main/dispatch/worker/__init__.py index 009386914f..06d64c437c 100644 --- a/awx/main/dispatch/worker/__init__.py +++ b/awx/main/dispatch/worker/__init__.py @@ -1,3 +1,3 @@ -from .base import AWXConsumer, BaseWorker # noqa +from .base import AWXConsumer, AWXRedisConsumer, BaseWorker # noqa from .callback import CallbackBrokerWorker # noqa from .task import TaskWorker # noqa diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 8c0a8a4b77..0a6bf4396b 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -5,12 +5,15 @@ import os import logging import signal import sys +import redis +import json from uuid import UUID from queue import Empty as QueueEmpty from django import db from kombu import Producer from kombu.mixins import ConsumerMixin +from django.conf import settings from awx.main.dispatch.pool import WorkerPool @@ -117,6 +120,90 @@ class AWXConsumer(ConsumerMixin): raise SystemExit() +class AWXRedisConsumer(object): + + def __init__(self, name, connection, worker, queues=[], pool=None): + self.should_stop = False + + self.name = name + self.connection = connection + self.total_messages = 0 + self.queues = queues + self.worker = worker + self.pool = pool + if pool is None: + self.pool = WorkerPool() + self.pool.init_workers(self.worker.work_loop) + + @property + def listening_on(self): + return f'listening on {self.queues}' + + ''' + def control(self, body, message): + logger.warn(body) + control = body.get('control') + if control in ('status', 'running'): + producer = Producer( + channel=self.connection, + routing_key=message.properties['reply_to'] + ) + if control == 'status': + msg = '\n'.join([self.listening_on, self.pool.debug()]) + elif control == 'running': + msg = [] + for worker in self.pool.workers: + worker.calculate_managed_tasks() + msg.extend(worker.managed_tasks.keys()) + producer.publish(msg) + elif control == 'reload': + for worker in self.pool.workers: + worker.quit() + else: + logger.error('unrecognized control message: {}'.format(control)) + message.ack() + ''' + + def process_task(self, body, message): + if 'control' in body: + try: + return self.control(body, message) + except Exception: + logger.exception("Exception handling control message:") + return + if len(self.pool): + if "uuid" in body and body['uuid']: + try: + queue = UUID(body['uuid']).int % len(self.pool) + except Exception: + queue = self.total_messages % len(self.pool) + else: + queue = self.total_messages % len(self.pool) + else: + queue = 0 + self.pool.write(queue, body) + self.total_messages += 1 + + def run(self, *args, **kwargs): + signal.signal(signal.SIGINT, self.stop) + signal.signal(signal.SIGTERM, self.stop) + self.worker.on_start() + + queue = redis.Redis.from_url(settings.BROKER_URL) + while True: + res = queue.blpop(self.queues) + res = json.loads(res[1]) + self.process_task(res, res) + if self.should_stop: + return + + def stop(self, signum, frame): + self.should_stop = True # this makes the kombu mixin stop consuming + logger.warn('received {}, stopping'.format(signame(signum))) + self.worker.on_stop() + raise SystemExit() + + class BaseWorker(object): def read(self, queue): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 58e311f2bb..269b01d98a 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -5,7 +5,7 @@ from django.conf import settings from django.core.management.base import BaseCommand from kombu import Exchange, Queue, Connection -from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker +from awx.main.dispatch.worker import AWXRedisConsumer, CallbackBrokerWorker class Command(BaseCommand): @@ -20,17 +20,11 @@ class Command(BaseCommand): with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn: consumer = None try: - consumer = AWXConsumer( + consumer = AWXRedisConsumer( 'callback_receiver', conn, CallbackBrokerWorker(), - [ - Queue( - settings.CALLBACK_QUEUE, - Exchange(settings.CALLBACK_QUEUE, type='direct'), - routing_key=settings.CALLBACK_QUEUE - ) - ] + queues=[getattr(settings, 'CALLBACK_QUEUE', '')], ) consumer.run() except KeyboardInterrupt: diff --git a/awx/main/queue.py b/awx/main/queue.py index 3d8a8384eb..40cd58ce23 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -5,13 +5,13 @@ import json import logging import os +import redis # Django from django.conf import settings # Kombu from kombu import Exchange, Producer, Connection -from kombu.serialization import registry __all__ = ['CallbackQueueDispatcher'] @@ -27,48 +27,13 @@ class AnsibleJSONEncoder(json.JSONEncoder): return super(AnsibleJSONEncoder, self).default(o) -registry.register( - 'json-ansible', - lambda obj: json.dumps(obj, cls=AnsibleJSONEncoder), - lambda obj: json.loads(obj), - content_type='application/json', - content_encoding='utf-8' -) - - class CallbackQueueDispatcher(object): def __init__(self): self.callback_connection = getattr(settings, 'BROKER_URL', None) - self.callback_connection_options = getattr(settings, 'BROKER_TRANSPORT_OPTIONS', {}) - self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '') - self.connection = None - self.exchange = None + self.queue = getattr(settings, 'CALLBACK_QUEUE', '') self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher') + self.connection = redis.Redis.from_url(settings.BROKER_URL) def dispatch(self, obj): - if not self.callback_connection or not self.connection_queue: - return - active_pid = os.getpid() - for retry_count in range(4): - try: - if not hasattr(self, 'connection_pid'): - self.connection_pid = active_pid - if self.connection_pid != active_pid: - self.connection = None - if self.connection is None: - self.connection = Connection(self.callback_connection, transport_options=self.callback_connection_options) - self.exchange = Exchange(self.connection_queue, type='direct') - - producer = Producer(self.connection) - producer.publish(obj, - serializer='json-ansible', - compression='bzip2', - exchange=self.exchange, - declare=[self.exchange], - delivery_mode="transient", - routing_key=self.connection_queue) - return - except Exception as e: - self.logger.info('Publish Job Event Exception: %r, retry=%d', e, - retry_count, exc_info=True) + self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))