Make full queue writes silent unless we can't write to any of them

This will still keep general queue errors other than Queue.Full
This commit is contained in:
Matthew Jones
2017-02-06 13:52:55 -05:00
parent 0b410fb6a8
commit 1cbbb6a87c

View File

@@ -8,6 +8,7 @@ from uuid import UUID
from multiprocessing import Process from multiprocessing import Process
from multiprocessing import Queue as MPQueue from multiprocessing import Queue as MPQueue
from Queue import Empty as QueueEmpty from Queue import Empty as QueueEmpty
from Queue import Full as QueueFull
from kombu import Connection, Exchange, Queue from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin from kombu.mixins import ConsumerMixin
@@ -79,18 +80,22 @@ class CallbackBrokerWorker(ConsumerMixin):
def write_queue_worker(self, preferred_queue, body): def write_queue_worker(self, preferred_queue, body):
queue_order = sorted(range(settings.JOB_EVENT_WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) queue_order = sorted(range(settings.JOB_EVENT_WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0)
write_attempt_order = []
for queue_actual in queue_order: for queue_actual in queue_order:
try: try:
worker_actual = self.worker_queues[queue_actual] worker_actual = self.worker_queues[queue_actual]
worker_actual[1].put(body, block=True, timeout=5) worker_actual[1].put(body, block=True, timeout=5)
worker_actual[0] += 1 worker_actual[0] += 1
return queue_actual return queue_actual
except QueueFull:
pass
except Exception: except Exception:
import traceback import traceback
tb = traceback.format_exc() tb = traceback.format_exc()
logger.warn("Could not write to queue %s" % preferred_queue) logger.warn("Could not write to queue %s" % preferred_queue)
logger.warn("Detail: {}".format(tb)) logger.warn("Detail: {}".format(tb))
continue write_attempt_order.append(preferred_queue)
logger.warn("Could not write payload to any queue, attempted order: {}".format(write_attempt_order))
return None return None
def callback_worker(self, queue_actual, idx): def callback_worker(self, queue_actual, idx):