redis events

This commit is contained in:
chris meyers 2020-01-07 15:18:16 -05:00 committed by Ryan Petrello
parent c8eeacacca
commit 355fb125cb
No known key found for this signature in database
GPG Key ID: F2AA5F2122351777
4 changed files with 95 additions and 49 deletions

View File

@ -1,3 +1,3 @@
from .base import AWXConsumer, BaseWorker # noqa
from .base import AWXConsumer, AWXRedisConsumer, BaseWorker # noqa
from .callback import CallbackBrokerWorker # noqa
from .task import TaskWorker # noqa

View File

@ -5,12 +5,15 @@ import os
import logging
import signal
import sys
import redis
import json
from uuid import UUID
from queue import Empty as QueueEmpty
from django import db
from kombu import Producer
from kombu.mixins import ConsumerMixin
from django.conf import settings
from awx.main.dispatch.pool import WorkerPool
@ -117,6 +120,90 @@ class AWXConsumer(ConsumerMixin):
raise SystemExit()
class AWXRedisConsumer(object):
def __init__(self, name, connection, worker, queues=[], pool=None):
self.should_stop = False
self.name = name
self.connection = connection
self.total_messages = 0
self.queues = queues
self.worker = worker
self.pool = pool
if pool is None:
self.pool = WorkerPool()
self.pool.init_workers(self.worker.work_loop)
@property
def listening_on(self):
return f'listening on {self.queues}'
'''
def control(self, body, message):
logger.warn(body)
control = body.get('control')
if control in ('status', 'running'):
producer = Producer(
channel=self.connection,
routing_key=message.properties['reply_to']
)
if control == 'status':
msg = '\n'.join([self.listening_on, self.pool.debug()])
elif control == 'running':
msg = []
for worker in self.pool.workers:
worker.calculate_managed_tasks()
msg.extend(worker.managed_tasks.keys())
producer.publish(msg)
elif control == 'reload':
for worker in self.pool.workers:
worker.quit()
else:
logger.error('unrecognized control message: {}'.format(control))
message.ack()
'''
def process_task(self, body, message):
if 'control' in body:
try:
return self.control(body, message)
except Exception:
logger.exception("Exception handling control message:")
return
if len(self.pool):
if "uuid" in body and body['uuid']:
try:
queue = UUID(body['uuid']).int % len(self.pool)
except Exception:
queue = self.total_messages % len(self.pool)
else:
queue = self.total_messages % len(self.pool)
else:
queue = 0
self.pool.write(queue, body)
self.total_messages += 1
def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
self.worker.on_start()
queue = redis.Redis.from_url(settings.BROKER_URL)
while True:
res = queue.blpop(self.queues)
res = json.loads(res[1])
self.process_task(res, res)
if self.should_stop:
return
def stop(self, signum, frame):
self.should_stop = True # this makes the kombu mixin stop consuming
logger.warn('received {}, stopping'.format(signame(signum)))
self.worker.on_stop()
raise SystemExit()
class BaseWorker(object):
def read(self, queue):

View File

@ -5,7 +5,7 @@ from django.conf import settings
from django.core.management.base import BaseCommand
from kombu import Exchange, Queue, Connection
from awx.main.dispatch.worker import AWXConsumer, CallbackBrokerWorker
from awx.main.dispatch.worker import AWXRedisConsumer, CallbackBrokerWorker
class Command(BaseCommand):
@ -20,17 +20,11 @@ class Command(BaseCommand):
with Connection(settings.BROKER_URL, transport_options=settings.BROKER_TRANSPORT_OPTIONS) as conn:
consumer = None
try:
consumer = AWXConsumer(
consumer = AWXRedisConsumer(
'callback_receiver',
conn,
CallbackBrokerWorker(),
[
Queue(
settings.CALLBACK_QUEUE,
Exchange(settings.CALLBACK_QUEUE, type='direct'),
routing_key=settings.CALLBACK_QUEUE
)
]
queues=[getattr(settings, 'CALLBACK_QUEUE', '')],
)
consumer.run()
except KeyboardInterrupt:

View File

@ -5,13 +5,13 @@
import json
import logging
import os
import redis
# Django
from django.conf import settings
# Kombu
from kombu import Exchange, Producer, Connection
from kombu.serialization import registry
__all__ = ['CallbackQueueDispatcher']
@ -27,48 +27,13 @@ class AnsibleJSONEncoder(json.JSONEncoder):
return super(AnsibleJSONEncoder, self).default(o)
registry.register(
'json-ansible',
lambda obj: json.dumps(obj, cls=AnsibleJSONEncoder),
lambda obj: json.loads(obj),
content_type='application/json',
content_encoding='utf-8'
)
class CallbackQueueDispatcher(object):
def __init__(self):
self.callback_connection = getattr(settings, 'BROKER_URL', None)
self.callback_connection_options = getattr(settings, 'BROKER_TRANSPORT_OPTIONS', {})
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
self.connection = redis.Redis.from_url(settings.BROKER_URL)
def dispatch(self, obj):
if not self.callback_connection or not self.connection_queue:
return
active_pid = os.getpid()
for retry_count in range(4):
try:
if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self.connection = None
if self.connection is None:
self.connection = Connection(self.callback_connection, transport_options=self.callback_connection_options)
self.exchange = Exchange(self.connection_queue, type='direct')
producer = Producer(self.connection)
producer.publish(obj,
serializer='json-ansible',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
delivery_mode="transient",
routing_key=self.connection_queue)
return
except Exception as e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))