mirror of
https://github.com/ansible/awx.git
synced 2026-03-17 00:47:29 -02:30
Add subsystem metrics
- Adds a Metrics() class that can track data such as number of events the callback receiver inserted into database - Exposes this metric data at the api/v2/metrics/ endpoint. This data is prometheus-friendly - Metric data is stored in memory, then periodically saved to Redis. - Metric data is periodically broadcast to other nodes in the cluster, so that each node has a copy of the most recent metric data collected.
This commit is contained in:
@@ -20,7 +20,7 @@ from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, Inv
|
||||
from awx.main.tasks import handle_success_and_failure_notifications
|
||||
from awx.main.models.events import emit_event_detail
|
||||
from awx.main.utils.profiling import AWXProfiler
|
||||
|
||||
import awx.main.analytics.subsystem_metrics as s_metrics
|
||||
from .base import BaseWorker
|
||||
|
||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||
@@ -46,16 +46,22 @@ class CallbackBrokerWorker(BaseWorker):
|
||||
self.buff = {}
|
||||
self.pid = os.getpid()
|
||||
self.redis = redis.Redis.from_url(settings.BROKER_URL)
|
||||
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
|
||||
self.queue_pop = 0
|
||||
self.queue_name = settings.CALLBACK_QUEUE
|
||||
self.prof = AWXProfiler("CallbackBrokerWorker")
|
||||
for key in self.redis.keys('awx_callback_receiver_statistics_*'):
|
||||
self.redis.delete(key)
|
||||
|
||||
def read(self, queue):
|
||||
try:
|
||||
res = self.redis.blpop(settings.CALLBACK_QUEUE, timeout=1)
|
||||
res = self.redis.blpop(self.queue_name, timeout=1)
|
||||
if res is None:
|
||||
return {'event': 'FLUSH'}
|
||||
self.total += 1
|
||||
self.queue_pop += 1
|
||||
self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1)
|
||||
self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1)
|
||||
return json.loads(res[1])
|
||||
except redis.exceptions.RedisError:
|
||||
logger.exception("encountered an error communicating with redis")
|
||||
@@ -64,8 +70,19 @@ class CallbackBrokerWorker(BaseWorker):
|
||||
logger.exception("failed to decode JSON message from redis")
|
||||
finally:
|
||||
self.record_statistics()
|
||||
self.record_read_metrics()
|
||||
|
||||
return {'event': 'FLUSH'}
|
||||
|
||||
def record_read_metrics(self):
|
||||
if self.queue_pop == 0:
|
||||
return
|
||||
if self.subsystem_metrics.should_pipe_execute() is True:
|
||||
queue_size = self.redis.llen(self.queue_name)
|
||||
self.subsystem_metrics.set('callback_receiver_events_queue_size_redis', queue_size)
|
||||
self.subsystem_metrics.pipe_execute()
|
||||
self.queue_pop = 0
|
||||
|
||||
def record_statistics(self):
|
||||
# buffer stat recording to once per (by default) 5s
|
||||
if time.time() - self.last_stats > settings.JOB_EVENT_STATISTICS_INTERVAL:
|
||||
@@ -99,27 +116,44 @@ class CallbackBrokerWorker(BaseWorker):
|
||||
def flush(self, force=False):
|
||||
now = tz_now()
|
||||
if force or (time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or any([len(events) >= 1000 for events in self.buff.values()]):
|
||||
bulk_events_saved = 0
|
||||
singular_events_saved = 0
|
||||
metrics_events_batch_save_errors = 0
|
||||
for cls, events in self.buff.items():
|
||||
logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})')
|
||||
for e in events:
|
||||
if not e.created:
|
||||
e.created = now
|
||||
e.modified = now
|
||||
duration_to_save = time.perf_counter()
|
||||
try:
|
||||
cls.objects.bulk_create(events)
|
||||
bulk_events_saved += len(events)
|
||||
except Exception:
|
||||
# if an exception occurs, we should re-attempt to save the
|
||||
# events one-by-one, because something in the list is
|
||||
# broken/stale
|
||||
metrics_events_batch_save_errors += 1
|
||||
for e in events:
|
||||
try:
|
||||
e.save()
|
||||
singular_events_saved += 1
|
||||
except Exception:
|
||||
logger.exception('Database Error Saving Job Event')
|
||||
duration_to_save = time.perf_counter() - duration_to_save
|
||||
for e in events:
|
||||
emit_event_detail(e)
|
||||
self.buff = {}
|
||||
self.last_flush = time.time()
|
||||
# only update metrics if we saved events
|
||||
if (bulk_events_saved + singular_events_saved) > 0:
|
||||
self.subsystem_metrics.inc('callback_receiver_batch_events_errors', metrics_events_batch_save_errors)
|
||||
self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', duration_to_save)
|
||||
self.subsystem_metrics.inc('callback_receiver_events_insert_db', bulk_events_saved + singular_events_saved)
|
||||
self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', bulk_events_saved)
|
||||
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(bulk_events_saved + singular_events_saved))
|
||||
if self.subsystem_metrics.should_pipe_execute() is True:
|
||||
self.subsystem_metrics.pipe_execute()
|
||||
|
||||
def perform_work(self, body):
|
||||
try:
|
||||
@@ -169,6 +203,7 @@ class CallbackBrokerWorker(BaseWorker):
|
||||
except Exception:
|
||||
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
|
||||
finally:
|
||||
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -1)
|
||||
GuidMiddleware.set_guid('')
|
||||
return
|
||||
|
||||
|
||||
Reference in New Issue
Block a user