diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 74d2a6a6c3..c796e6162e 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -8,6 +8,7 @@ import sys import redis import json import psycopg2 +import time from uuid import UUID from queue import Empty as QueueEmpty @@ -116,18 +117,23 @@ class AWXConsumerRedis(AWXConsumerBase): super(AWXConsumerRedis, self).run(*args, **kwargs) self.worker.on_start() - queue = redis.Redis.from_url(settings.BROKER_URL) + time_to_sleep = 1 while True: - try: - res = queue.blpop(self.queues) - res = json.loads(res[1]) - self.process_task(res) - except redis.exceptions.RedisError: - logger.exception("encountered an error communicating with redis") - except (json.JSONDecodeError, KeyError): - logger.exception("failed to decode JSON message from redis") - if self.should_stop: - return + queue = redis.Redis.from_url(settings.BROKER_URL) + while True: + try: + res = queue.blpop(self.queues) + time_to_sleep = 1 + res = json.loads(res[1]) + self.process_task(res) + except redis.exceptions.RedisError: + time_to_sleep = min(time_to_sleep*2, 30) + logger.exception(f"encountered an error communicating with redis. Reconnect attempt in {time_to_sleep} seconds") + time.sleep(time_to_sleep) + except (json.JSONDecodeError, KeyError): + logger.exception("failed to decode JSON message from redis") + if self.should_stop: + return class AWXConsumerPG(AWXConsumerBase):