added average event processing metric (in seconds) that can be served to

grafana via prometheus.

This metric is a good indicator of how far behind the callback receiver
is. The higher the load the further behind/the greater the number of
seconds the metric will display.

This number being high may indicate the need for horizontal scaling in
the control plane or vertically scaling the number of callback
receivers.
This commit is contained in:
Rebeccah 2022-05-31 14:23:48 -04:00
parent 4dc956c76f
commit 5f9326b131
No known key found for this signature in database
GPG Key ID: A102D27DFFAF3552
2 changed files with 20 additions and 6 deletions

View File

@ -34,7 +34,7 @@ class BaseM:
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance].get(self.field, -1)}\n' # TODO: fix because this -1 is neccessary when dealing with old instances (ex. you didn't clean up your database)
return output_text
@ -161,7 +161,7 @@ class Metrics:
IntM('callback_receiver_events_popped_redis', 'Number of events popped from redis'),
IntM('callback_receiver_events_in_memory', 'Current number of events in memory (in transfer from redis to db)'),
IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'),
FloatM('callback_receiver_events_insert_db_seconds', 'Total time spent saving events to database'),
IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'),
HistogramM(
@ -170,6 +170,7 @@ class Metrics:
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
@ -179,9 +180,11 @@ class Metrics:
# track last time metrics were sent to other nodes
self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call')
def clear_values(self):
for m in self.METRICS.values():
m.clear_value(self.conn)
def clear_values(self, fields=None):
if not fields:
fields = self.METRICS.keys()
for m in fields:
self.METRICS[m].clear_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")

View File

@ -4,6 +4,7 @@ import os
import signal
import time
import traceback
import datetime
from django.conf import settings
from django.utils.functional import cached_property
@ -151,12 +152,17 @@ class CallbackBrokerWorker(BaseWorker):
metrics_singular_events_saved = 0
metrics_events_batch_save_errors = 0
metrics_events_broadcast = 0
metrics_events_missing_created = 0
metrics_total_job_event_processing_seconds = datetime.timedelta(seconds=0)
for cls, events in self.buff.items():
logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})')
for e in events:
e.modified = now # this can be set before created because now is set above on line 149
if not e.created:
e.created = now
e.modified = now
metrics_events_missing_created += 1
else: # only calculate the seconds if the created time already has been set
metrics_total_job_event_processing_seconds += e.modified - e.created
metrics_duration_to_save = time.perf_counter()
try:
cls.objects.bulk_create(events)
@ -189,6 +195,11 @@ class CallbackBrokerWorker(BaseWorker):
self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', metrics_bulk_events_saved)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(metrics_bulk_events_saved + metrics_singular_events_saved))
self.subsystem_metrics.inc('callback_receiver_events_broadcast', metrics_events_broadcast)
self.subsystem_metrics.set(
'callback_receiver_event_processing_avg_seconds',
metrics_total_job_event_processing_seconds.total_seconds()
/ (metrics_bulk_events_saved + metrics_singular_events_saved - metrics_events_missing_created),
)
if self.subsystem_metrics.should_pipe_execute() is True:
self.subsystem_metrics.pipe_execute()