Merge pull request #4275 from chrismeyersfsu/redis_throttle_reconnect

exponential backoff on cb receiver reconnect
This commit is contained in:
Chris Meyers
2020-04-28 15:42:45 -04:00
committed by GitHub

View File

@@ -8,6 +8,7 @@ import sys
import redis import redis
import json import json
import psycopg2 import psycopg2
import time
from uuid import UUID from uuid import UUID
from queue import Empty as QueueEmpty from queue import Empty as QueueEmpty
@@ -116,18 +117,23 @@ class AWXConsumerRedis(AWXConsumerBase):
super(AWXConsumerRedis, self).run(*args, **kwargs) super(AWXConsumerRedis, self).run(*args, **kwargs)
self.worker.on_start() self.worker.on_start()
queue = redis.Redis.from_url(settings.BROKER_URL) time_to_sleep = 1
while True: while True:
try: queue = redis.Redis.from_url(settings.BROKER_URL)
res = queue.blpop(self.queues) while True:
res = json.loads(res[1]) try:
self.process_task(res) res = queue.blpop(self.queues)
except redis.exceptions.RedisError: time_to_sleep = 1
logger.exception("encountered an error communicating with redis") res = json.loads(res[1])
except (json.JSONDecodeError, KeyError): self.process_task(res)
logger.exception("failed to decode JSON message from redis") except redis.exceptions.RedisError:
if self.should_stop: time_to_sleep = min(time_to_sleep*2, 30)
return logger.exception(f"encountered an error communicating with redis. Reconnect attempt in {time_to_sleep} seconds")
time.sleep(time_to_sleep)
except (json.JSONDecodeError, KeyError):
logger.exception("failed to decode JSON message from redis")
if self.should_stop:
return
class AWXConsumerPG(AWXConsumerBase): class AWXConsumerPG(AWXConsumerBase):