Merge pull request #11717 from fosterseth/emit_event_detail_metrics

Add metric for number of events emitted over websocket broadcast
This commit is contained in:
Seth Foster 2022-02-11 12:52:16 -05:00 committed by GitHub
commit 9d0de57fae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 11 deletions

View File

@ -160,6 +160,7 @@ class Metrics:
IntM('callback_receiver_batch_events_errors', 'Number of times batch insertion failed'),
FloatM('callback_receiver_events_insert_db_seconds', 'Time spent saving events to database'),
IntM('callback_receiver_events_insert_db', 'Number of events batch inserted into database'),
IntM('callback_receiver_events_broadcast', 'Number of events broadcast to other control plane nodes'),
HistogramM(
'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
),

View File

@ -116,19 +116,20 @@ class CallbackBrokerWorker(BaseWorker):
def flush(self, force=False):
now = tz_now()
if force or (time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or any([len(events) >= 1000 for events in self.buff.values()]):
bulk_events_saved = 0
singular_events_saved = 0
metrics_bulk_events_saved = 0
metrics_singular_events_saved = 0
metrics_events_batch_save_errors = 0
metrics_events_broadcast = 0
for cls, events in self.buff.items():
logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})')
for e in events:
if not e.created:
e.created = now
e.modified = now
duration_to_save = time.perf_counter()
metrics_duration_to_save = time.perf_counter()
try:
cls.objects.bulk_create(events)
bulk_events_saved += len(events)
metrics_bulk_events_saved += len(events)
except Exception:
# if an exception occurs, we should re-attempt to save the
# events one-by-one, because something in the list is
@ -137,22 +138,24 @@ class CallbackBrokerWorker(BaseWorker):
for e in events:
try:
e.save()
singular_events_saved += 1
metrics_singular_events_saved += 1
except Exception:
logger.exception('Database Error Saving Job Event')
duration_to_save = time.perf_counter() - duration_to_save
metrics_duration_to_save = time.perf_counter() - metrics_duration_to_save
for e in events:
if not getattr(e, '_skip_websocket_message', False):
metrics_events_broadcast += 1
emit_event_detail(e)
self.buff = {}
self.last_flush = time.time()
# only update metrics if we saved events
if (bulk_events_saved + singular_events_saved) > 0:
if (metrics_bulk_events_saved + metrics_singular_events_saved) > 0:
self.subsystem_metrics.inc('callback_receiver_batch_events_errors', metrics_events_batch_save_errors)
self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', duration_to_save)
self.subsystem_metrics.inc('callback_receiver_events_insert_db', bulk_events_saved + singular_events_saved)
self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', bulk_events_saved)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(bulk_events_saved + singular_events_saved))
self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', metrics_duration_to_save)
self.subsystem_metrics.inc('callback_receiver_events_insert_db', metrics_bulk_events_saved + metrics_singular_events_saved)
self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', metrics_bulk_events_saved)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(metrics_bulk_events_saved + metrics_singular_events_saved))
self.subsystem_metrics.inc('callback_receiver_events_broadcast', metrics_events_broadcast)
if self.subsystem_metrics.should_pipe_execute() is True:
self.subsystem_metrics.pipe_execute()