Integrate multiprocessing writer worker in callback receiver

This commit is contained in:
Matthew Jones 2017-01-18 12:05:29 -05:00
parent 1cdeb4d2af
commit 73f92bba5d

View File

@ -3,10 +3,11 @@
# Python
import logging
from threading import Thread
from Queue import Queue as ThreadQueue
from Queue import Empty as QueueEmpty
import signal
from uuid import UUID
from multiprocessing import Process
from multiprocessing import Queue as MPQueue
from Queue import Empty as QueueEmpty
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
@ -14,6 +15,7 @@ from kombu.mixins import ConsumerMixin
# Django
from django.conf import settings
from django.core.management.base import NoArgsCommand
from django.db import connection as django_connection
from django.db import DatabaseError
# AWX
@ -25,15 +27,28 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver')
class CallbackBrokerWorker(ConsumerMixin):
def __init__(self, connection, use_workers=True):
self.connection = connection
self.partial_events = {}
self.worker_queues = []
self.total_messages = 0
self.init_workers(use_workers)
def init_workers(self, use_workers=True):
def shutdown_handler(active_workers):
def _handler(signum, frame):
try:
for active_worker in active_workers:
active_worker.terminate()
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
except Exception:
# TODO: LOG
pass
return _handler
if use_workers:
connection.close()
django_connection.close()
for idx in range(settings.JOB_EVENT_WORKERS):
queue_actual = ThreadQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
w = Thread(target=self.callback_worker, args=(queue_actual, idx,))
queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
w.start()
if settings.DEBUG:
logger.info('Started worker %s' % str(idx))
@ -41,6 +56,9 @@ class CallbackBrokerWorker(ConsumerMixin):
elif settings.DEBUG:
logger.warn('Started callback receiver (no workers)')
signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in self.worker_queues]))
signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in self.worker_queues]))
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE,
Exchange(settings.CALLBACK_QUEUE, type='direct'),