mirror of
https://github.com/ansible/awx.git
synced 2026-05-20 15:27:47 -02:30
Sensible log behavior when redis is unavailable (#15466)
* Sensible log behavior when redis is unavailable * Consistent behavior with dispatcher and callback
This commit is contained in:
@@ -9,6 +9,7 @@ from prometheus_client.core import GaugeMetricFamily, HistogramMetricFamily
|
|||||||
from prometheus_client.registry import CollectorRegistry
|
from prometheus_client.registry import CollectorRegistry
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.http import HttpRequest
|
from django.http import HttpRequest
|
||||||
|
import redis.exceptions
|
||||||
from rest_framework.request import Request
|
from rest_framework.request import Request
|
||||||
|
|
||||||
from awx.main.consumers import emit_channel_notification
|
from awx.main.consumers import emit_channel_notification
|
||||||
@@ -290,8 +291,12 @@ class Metrics(MetricsNamespace):
|
|||||||
def send_metrics(self):
|
def send_metrics(self):
|
||||||
# more than one thread could be calling this at the same time, so should
|
# more than one thread could be calling this at the same time, so should
|
||||||
# acquire redis lock before sending metrics
|
# acquire redis lock before sending metrics
|
||||||
lock = self.conn.lock(root_key + '-' + self._namespace + '_lock')
|
try:
|
||||||
if not lock.acquire(blocking=False):
|
lock = self.conn.lock(root_key + '-' + self._namespace + '_lock')
|
||||||
|
if not lock.acquire(blocking=False):
|
||||||
|
return
|
||||||
|
except redis.exceptions.ConnectionError as exc:
|
||||||
|
logger.warning(f'Connection error in send_metrics: {exc}')
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from datetime import timedelta
|
|||||||
|
|
||||||
from django import db
|
from django import db
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
import redis.exceptions
|
||||||
|
|
||||||
from ansible_base.lib.logging.runtime import log_excess_runtime
|
from ansible_base.lib.logging.runtime import log_excess_runtime
|
||||||
|
|
||||||
@@ -130,10 +131,13 @@ class AWXConsumerBase(object):
|
|||||||
@log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2)
|
@log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2)
|
||||||
def record_statistics(self):
|
def record_statistics(self):
|
||||||
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
|
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
|
||||||
|
save_data = self.pool.debug()
|
||||||
try:
|
try:
|
||||||
self.redis.set(f'awx_{self.name}_statistics', self.pool.debug())
|
self.redis.set(f'awx_{self.name}_statistics', save_data)
|
||||||
|
except redis.exceptions.ConnectionError as exc:
|
||||||
|
logger.warning(f'Redis connection error saving {self.name} status data:\n{exc}\nmissed data:\n{save_data}')
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(f"encountered an error communicating with redis to store {self.name} statistics")
|
logger.exception(f"Unknown redis error saving {self.name} status data:\nmissed data:\n{save_data}")
|
||||||
self.last_stats = time.time()
|
self.last_stats = time.time()
|
||||||
|
|
||||||
def run(self, *args, **kwargs):
|
def run(self, *args, **kwargs):
|
||||||
@@ -189,7 +193,10 @@ class AWXConsumerPG(AWXConsumerBase):
|
|||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
|
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
|
||||||
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
|
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
|
||||||
self.subsystem_metrics.pipe_execute()
|
try:
|
||||||
|
self.subsystem_metrics.pipe_execute()
|
||||||
|
except redis.exceptions.ConnectionError as exc:
|
||||||
|
logger.warning(f'Redis connection error saving dispatcher metrics, error:\n{exc}')
|
||||||
self.listen_cumulative_time = 0.0
|
self.listen_cumulative_time = 0.0
|
||||||
self.last_metrics_gather = current_time
|
self.last_metrics_gather = current_time
|
||||||
|
|
||||||
|
|||||||
@@ -86,6 +86,7 @@ class CallbackBrokerWorker(BaseWorker):
|
|||||||
return os.getpid()
|
return os.getpid()
|
||||||
|
|
||||||
def read(self, queue):
|
def read(self, queue):
|
||||||
|
has_redis_error = False
|
||||||
try:
|
try:
|
||||||
res = self.redis.blpop(self.queue_name, timeout=1)
|
res = self.redis.blpop(self.queue_name, timeout=1)
|
||||||
if res is None:
|
if res is None:
|
||||||
@@ -95,14 +96,21 @@ class CallbackBrokerWorker(BaseWorker):
|
|||||||
self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1)
|
self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1)
|
||||||
self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1)
|
self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1)
|
||||||
return json.loads(res[1])
|
return json.loads(res[1])
|
||||||
|
except redis.exceptions.ConnectionError as exc:
|
||||||
|
# Low noise log, because very common and many workers will write this
|
||||||
|
logger.error(f"redis connection error: {exc}")
|
||||||
|
has_redis_error = True
|
||||||
|
time.sleep(5)
|
||||||
except redis.exceptions.RedisError:
|
except redis.exceptions.RedisError:
|
||||||
logger.exception("encountered an error communicating with redis")
|
logger.exception("encountered an error communicating with redis")
|
||||||
|
has_redis_error = True
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
except (json.JSONDecodeError, KeyError):
|
except (json.JSONDecodeError, KeyError):
|
||||||
logger.exception("failed to decode JSON message from redis")
|
logger.exception("failed to decode JSON message from redis")
|
||||||
finally:
|
finally:
|
||||||
self.record_statistics()
|
if not has_redis_error:
|
||||||
self.record_read_metrics()
|
self.record_statistics()
|
||||||
|
self.record_read_metrics()
|
||||||
|
|
||||||
return {'event': 'FLUSH'}
|
return {'event': 'FLUSH'}
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,13 @@
|
|||||||
# Copyright (c) 2015 Ansible, Inc.
|
# Copyright (c) 2015 Ansible, Inc.
|
||||||
# All Rights Reserved.
|
# All Rights Reserved.
|
||||||
|
|
||||||
from django.conf import settings
|
import redis
|
||||||
from django.core.management.base import BaseCommand
|
|
||||||
from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer
|
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
from django.core.management.base import BaseCommand, CommandError
|
||||||
|
import redis.exceptions
|
||||||
|
|
||||||
|
from awx.main.analytics.subsystem_metrics import CallbackReceiverMetricsServer
|
||||||
from awx.main.dispatch.control import Control
|
from awx.main.dispatch.control import Control
|
||||||
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
|
from awx.main.dispatch.worker import AWXConsumerRedis, CallbackBrokerWorker
|
||||||
|
|
||||||
@@ -27,7 +30,10 @@ class Command(BaseCommand):
|
|||||||
return
|
return
|
||||||
consumer = None
|
consumer = None
|
||||||
|
|
||||||
CallbackReceiverMetricsServer().start()
|
try:
|
||||||
|
CallbackReceiverMetricsServer().start()
|
||||||
|
except redis.exceptions.ConnectionError as exc:
|
||||||
|
raise CommandError(f'Callback receiver could not connect to redis, error: {exc}')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
consumer = AWXConsumerRedis(
|
consumer = AWXConsumerRedis(
|
||||||
|
|||||||
@@ -3,8 +3,10 @@
|
|||||||
import logging
|
import logging
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
|
import redis
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand, CommandError
|
||||||
|
|
||||||
from awx.main.dispatch import get_task_queuename
|
from awx.main.dispatch import get_task_queuename
|
||||||
from awx.main.dispatch.control import Control
|
from awx.main.dispatch.control import Control
|
||||||
@@ -63,7 +65,10 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
consumer = None
|
consumer = None
|
||||||
|
|
||||||
DispatcherMetricsServer().start()
|
try:
|
||||||
|
DispatcherMetricsServer().start()
|
||||||
|
except redis.exceptions.ConnectionError as exc:
|
||||||
|
raise CommandError(f'Dispatcher could not connect to redis, error: {exc}')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
|
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ import time
|
|||||||
import sys
|
import sys
|
||||||
import signal
|
import signal
|
||||||
|
|
||||||
|
import redis
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
from django.utils.translation import gettext_lazy as _, gettext_noop
|
from django.utils.translation import gettext_lazy as _, gettext_noop
|
||||||
@@ -120,6 +122,8 @@ class TaskBase:
|
|||||||
self.subsystem_metrics.pipe_execute()
|
self.subsystem_metrics.pipe_execute()
|
||||||
else:
|
else:
|
||||||
logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
logger.debug(f"skipping recording {self.prefix} metrics, last recorded {time_last_recorded} seconds ago")
|
||||||
|
except redis.exceptions.ConnectionError as exc:
|
||||||
|
logger.warning(f"Redis connection error saving metrics for {self.prefix}, error: {exc}")
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(f"Error saving metrics for {self.prefix}")
|
logger.exception(f"Error saving metrics for {self.prefix}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user