diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 74d2a6a6c3..3ff20609b6 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -8,6 +8,7 @@ import sys import redis import json import psycopg2 +import time from uuid import UUID from queue import Empty as QueueEmpty @@ -116,18 +117,23 @@ class AWXConsumerRedis(AWXConsumerBase): super(AWXConsumerRedis, self).run(*args, **kwargs) self.worker.on_start() - queue = redis.Redis.from_url(settings.BROKER_URL) + retry = 0 while True: - try: - res = queue.blpop(self.queues) - res = json.loads(res[1]) - self.process_task(res) - except redis.exceptions.RedisError: - logger.exception("encountered an error communicating with redis") - except (json.JSONDecodeError, KeyError): - logger.exception("failed to decode JSON message from redis") - if self.should_stop: - return + queue = redis.Redis.from_url(settings.BROKER_URL) + while True: + try: + res = queue.blpop(self.queues) + retry = 0 + res = json.loads(res[1]) + self.process_task(res) + except redis.exceptions.RedisError: + logger.exception(f"encountered an error communicating with redis. Reconnect attempt {retry}") + retry += 1 + time.sleep(min(retry * 2, 30)) + except (json.JSONDecodeError, KeyError): + logger.exception("failed to decode JSON message from redis") + if self.should_stop: + return class AWXConsumerPG(AWXConsumerBase):